refactor(server): jobs and processors (#1787)

* refactor: jobs and processors

* refactor: storage migration processor

* fix: tests

* fix: code warning

* chore: ignore coverage from infra

* fix: sync move asset logic between job core and asset core

* refactor: move error handling inside of catch

* refactor(server): job core into dedicated service calls

* refactor: smart info

* fix: tests

* chore: smart info tests

* refactor: use asset repository

* refactor: thumbnail processor

* chore: coverage reqs
This commit is contained in:
Jason Rasmussen
2023-02-25 09:12:03 -05:00
committed by GitHub
parent 71d8567f18
commit 6c7679714b
108 changed files with 1645 additions and 1072 deletions

View File

@@ -0,0 +1,34 @@
import { Logger } from '@nestjs/common';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { AuthService } from '@app/domain';
@WebSocketGateway({ cors: true })
export class CommunicationGateway implements OnGatewayConnection, OnGatewayDisconnect {
private logger = new Logger(CommunicationGateway.name);
constructor(private authService: AuthService) {}
@WebSocketServer() server!: Server;
handleDisconnect(client: Socket) {
client.leave(client.nsp.name);
this.logger.log(`Client ${client.id} disconnected from Websocket`);
}
async handleConnection(client: Socket) {
try {
this.logger.log(`New websocket connection: ${client.id}`);
const user = await this.authService.validate(client.request.headers, {});
if (user) {
client.join(user.id);
} else {
client.emit('error', 'unauthorized');
client.disconnect();
}
} catch (e) {
client.emit('error', 'unauthorized');
client.disconnect();
}
}
}

View File

@@ -0,0 +1,12 @@
import { CommunicationEvent } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { CommunicationGateway } from './communication.gateway';
@Injectable()
export class CommunicationRepository {
constructor(private ws: CommunicationGateway) {}
send(event: CommunicationEvent, userId: string, data: any) {
this.ws.server.to(userId).emit(event, JSON.stringify(data));
}
}

View File

@@ -0,0 +1,2 @@
export * from './communication.gateway';
export * from './communication.repository';

View File

@@ -0,0 +1,14 @@
import { IAlbumRepository } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { AlbumEntity } from '../entities';
@Injectable()
export class AlbumRepository implements IAlbumRepository {
constructor(@InjectRepository(AlbumEntity) private repository: Repository<AlbumEntity>) {}
async deleteAll(userId: string): Promise<void> {
await this.repository.delete({ ownerId: userId });
}
}

View File

@@ -21,6 +21,10 @@ export class APIKeyRepository implements IKeyRepository {
await this.repository.delete({ userId, id });
}
async deleteAll(userId: string): Promise<void> {
await this.repository.delete({ userId });
}
getKey(hashedToken: string): Promise<APIKeyEntity | null> {
return this.repository.findOne({
select: {

View File

@@ -0,0 +1,38 @@
import { IAssetRepository } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Not, Repository } from 'typeorm';
import { AssetEntity, AssetType } from '../entities';
@Injectable()
export class AssetRepository implements IAssetRepository {
constructor(@InjectRepository(AssetEntity) private repository: Repository<AssetEntity>) {}
async deleteAll(ownerId: string): Promise<void> {
await this.repository.delete({ ownerId });
}
async getAll(): Promise<AssetEntity[]> {
return this.repository.find({ relations: { exifInfo: true } });
}
async save(asset: Partial<AssetEntity>): Promise<AssetEntity> {
const { id } = await this.repository.save(asset);
return this.repository.findOneOrFail({ where: { id } });
}
findLivePhotoMatch(livePhotoCID: string, otherAssetId: string, type: AssetType): Promise<AssetEntity | null> {
return this.repository.findOne({
where: {
id: Not(otherAssetId),
type,
exifInfo: {
livePhotoCID,
},
},
relations: {
exifInfo: true,
},
});
}
}

View File

@@ -1,6 +1,9 @@
export * from './album.repository';
export * from './api-key.repository';
export * from './asset.repository';
export * from './device-info.repository';
export * from './shared-link.repository';
export * from './smart-info.repository';
export * from './system-config.repository';
export * from './user-token.repository';
export * from './user.repository';

View File

@@ -0,0 +1,14 @@
import { ISmartInfoRepository } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { SmartInfoEntity } from '../entities';
@Injectable()
export class SmartInfoRepository implements ISmartInfoRepository {
constructor(@InjectRepository(SmartInfoEntity) private repository: Repository<SmartInfoEntity>) {}
async upsert(info: Partial<SmartInfoEntity>): Promise<void> {
await this.repository.upsert(info, { conflictPaths: ['assetId'] });
}
}

View File

@@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { UserTokenEntity } from '@app/infra/db/entities/user-token.entity';
import { UserTokenEntity } from '../entities/user-token.entity';
import { IUserTokenRepository } from '@app/domain/user-token';
@Injectable()
@@ -22,4 +22,8 @@ export class UserTokenRepository implements IUserTokenRepository {
async delete(id: string): Promise<void> {
await this.userTokenRepository.delete(id);
}
async deleteAll(userId: string): Promise<void> {
await this.userTokenRepository.delete({ user: { id: userId } });
}
}

View File

@@ -2,7 +2,7 @@ import { UserEntity } from '../entities';
import { IUserRepository, UserListFilter } from '@app/domain';
import { Injectable, InternalServerErrorException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Not, Repository } from 'typeorm';
import { IsNull, Not, Repository } from 'typeorm';
@Injectable()
export class UserRepository implements IUserRepository {
@@ -33,6 +33,10 @@ export class UserRepository implements IUserRepository {
return this.userRepository.findOne({ where: { oauthId } });
}
async getDeletedUsers(): Promise<UserEntity[]> {
return this.userRepository.find({ withDeleted: true, where: { deletedAt: Not(IsNull()) } });
}
async getList({ excludeId }: UserListFilter = {}): Promise<UserEntity[]> {
if (!excludeId) {
return this.userRepository.find(); // TODO: this should also be ordered the same as below
@@ -61,8 +65,12 @@ export class UserRepository implements IUserRepository {
return updatedUser;
}
async delete(user: UserEntity): Promise<UserEntity> {
return this.userRepository.softRemove(user);
async delete(user: UserEntity, hard?: boolean): Promise<UserEntity> {
if (hard) {
return this.userRepository.remove(user);
} else {
return this.userRepository.softRemove(user);
}
}
async restore(user: UserEntity): Promise<UserEntity> {

View File

@@ -1,43 +1,65 @@
import {
IAlbumRepository,
IAssetRepository,
ICommunicationRepository,
ICryptoRepository,
IDeviceInfoRepository,
IJobRepository,
IKeyRepository,
IMachineLearningRepository,
IMediaRepository,
ISharedLinkRepository,
ISmartInfoRepository,
IStorageRepository,
ISystemConfigRepository,
IUserRepository,
IUserTokenRepository,
QueueName,
} from '@app/domain';
import { IUserTokenRepository } from '@app/domain/user-token';
import { UserTokenRepository } from '@app/infra/db/repository/user-token.repository';
import { BullModule } from '@nestjs/bull';
import { Global, Module, Provider } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CryptoRepository } from './auth/crypto.repository';
import { CommunicationGateway, CommunicationRepository } from './communication';
import {
AlbumEntity,
AlbumRepository,
APIKeyEntity,
APIKeyRepository,
AssetEntity,
AssetRepository,
databaseConfig,
DeviceInfoEntity,
DeviceInfoRepository,
SharedLinkEntity,
SharedLinkRepository,
SmartInfoEntity,
SmartInfoRepository,
SystemConfigEntity,
SystemConfigRepository,
UserEntity,
UserRepository,
UserTokenEntity,
UserTokenRepository,
} from './db';
import { JobRepository } from './job';
import { MachineLearningRepository } from './machine-learning';
import { MediaRepository } from './media';
import { FilesystemProvider } from './storage';
const providers: Provider[] = [
{ provide: IAlbumRepository, useClass: AlbumRepository },
{ provide: IAssetRepository, useClass: AssetRepository },
{ provide: ICommunicationRepository, useClass: CommunicationRepository },
{ provide: ICryptoRepository, useClass: CryptoRepository },
{ provide: ICryptoRepository, useClass: CryptoRepository },
{ provide: IDeviceInfoRepository, useClass: DeviceInfoRepository },
{ provide: IKeyRepository, useClass: APIKeyRepository },
{ provide: IJobRepository, useClass: JobRepository },
{ provide: IMachineLearningRepository, useClass: MachineLearningRepository },
{ provide: IMediaRepository, useClass: MediaRepository },
{ provide: ISharedLinkRepository, useClass: SharedLinkRepository },
{ provide: ISmartInfoRepository, useClass: SmartInfoRepository },
{ provide: IStorageRepository, useClass: FilesystemProvider },
{ provide: ISystemConfigRepository, useClass: SystemConfigRepository },
{ provide: IUserRepository, useClass: UserRepository },
@@ -49,10 +71,13 @@ const providers: Provider[] = [
imports: [
TypeOrmModule.forRoot(databaseConfig),
TypeOrmModule.forFeature([
AssetEntity,
AlbumEntity,
APIKeyEntity,
DeviceInfoEntity,
UserEntity,
SharedLinkEntity,
SmartInfoEntity,
SystemConfigEntity,
UserTokenEntity,
]),
@@ -73,18 +98,9 @@ const providers: Provider[] = [
},
}),
}),
BullModule.registerQueue(
{ name: QueueName.USER_DELETION },
{ name: QueueName.THUMBNAIL_GENERATION },
{ name: QueueName.ASSET_UPLOADED },
{ name: QueueName.METADATA_EXTRACTION },
{ name: QueueName.VIDEO_CONVERSION },
{ name: QueueName.MACHINE_LEARNING },
{ name: QueueName.CONFIG },
{ name: QueueName.BACKGROUND_TASK },
),
BullModule.registerQueue(...Object.values(QueueName).map((name) => ({ name }))),
],
providers: [...providers],
providers: [...providers, CommunicationGateway],
exports: [...providers, BullModule],
})
export class InfraModule {}

View File

@@ -1,15 +1,4 @@
import {
IAssetUploadedJob,
IJobRepository,
IMachineLearningJob,
IMetadataExtractionJob,
IUserDeletionJob,
IVideoTranscodeJob,
JobCounts,
JobItem,
JobName,
QueueName,
} from '@app/domain';
import { IAssetJob, IJobRepository, IMetadataExtractionJob, JobCounts, JobItem, JobName, QueueName } from '@app/domain';
import { InjectQueue } from '@nestjs/bull';
import { BadRequestException, Logger } from '@nestjs/common';
import { Queue } from 'bull';
@@ -18,14 +7,12 @@ export class JobRepository implements IJobRepository {
private logger = new Logger(JobRepository.name);
constructor(
@InjectQueue(QueueName.ASSET_UPLOADED) private assetUploaded: Queue<IAssetUploadedJob>,
@InjectQueue(QueueName.BACKGROUND_TASK) private backgroundTask: Queue,
@InjectQueue(QueueName.MACHINE_LEARNING) private machineLearning: Queue<IMachineLearningJob>,
@InjectQueue(QueueName.MACHINE_LEARNING) private machineLearning: Queue<IAssetJob>,
@InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtraction: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueName.CONFIG) private storageMigration: Queue,
@InjectQueue(QueueName.STORAGE_TEMPLATE_MIGRATION) private storageTemplateMigration: Queue,
@InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnail: Queue,
@InjectQueue(QueueName.USER_DELETION) private userDeletion: Queue<IUserDeletionJob>,
@InjectQueue(QueueName.VIDEO_CONVERSION) private videoTranscode: Queue<IVideoTranscodeJob>,
@InjectQueue(QueueName.VIDEO_CONVERSION) private videoTranscode: Queue<IAssetJob>,
) {}
async isActive(name: QueueName): Promise<boolean> {
@@ -41,13 +28,13 @@ export class JobRepository implements IJobRepository {
return this.getQueue(name).getJobCounts();
}
async add(item: JobItem): Promise<void> {
async queue(item: JobItem): Promise<void> {
switch (item.name) {
case JobName.ASSET_UPLOADED:
await this.assetUploaded.add(item.name, item.data, { jobId: item.data.asset.id });
await this.backgroundTask.add(item.name, item.data, { jobId: item.data.asset.id });
break;
case JobName.DELETE_FILE_ON_DISK:
case JobName.DELETE_FILES:
await this.backgroundTask.add(item.name, item.data);
break;
@@ -62,18 +49,21 @@ export class JobRepository implements IJobRepository {
await this.metadataExtraction.add(item.name, item.data);
break;
case JobName.TEMPLATE_MIGRATION:
case JobName.CONFIG_CHANGE:
await this.storageMigration.add(item.name, {});
break;
case JobName.GENERATE_JPEG_THUMBNAIL:
case JobName.GENERATE_WEBP_THUMBNAIL:
await this.thumbnail.add(item.name, item.data);
break;
case JobName.USER_DELETION:
await this.userDeletion.add(item.name, item.data);
await this.backgroundTask.add(item.name, item.data);
break;
case JobName.STORAGE_TEMPLATE_MIGRATION:
await this.storageTemplateMigration.add(item.name);
break;
case JobName.SYSTEM_CONFIG_CHANGE:
await this.backgroundTask.add(item.name, {});
break;
case JobName.VIDEO_CONVERSION:
@@ -88,14 +78,14 @@ export class JobRepository implements IJobRepository {
private getQueue(name: QueueName) {
switch (name) {
case QueueName.STORAGE_TEMPLATE_MIGRATION:
return this.storageTemplateMigration;
case QueueName.THUMBNAIL_GENERATION:
return this.thumbnail;
case QueueName.METADATA_EXTRACTION:
return this.metadataExtraction;
case QueueName.VIDEO_CONVERSION:
return this.videoTranscode;
case QueueName.CONFIG:
return this.storageMigration;
case QueueName.MACHINE_LEARNING:
return this.machineLearning;
default:

View File

@@ -0,0 +1 @@
export * from './machine-learning.repository';

View File

@@ -0,0 +1,17 @@
import { MACHINE_LEARNING_URL } from '@app/common';
import { IMachineLearningRepository, MachineLearningInput } from '@app/domain';
import { Injectable } from '@nestjs/common';
import axios from 'axios';
const client = axios.create({ baseURL: MACHINE_LEARNING_URL });
@Injectable()
export class MachineLearningRepository implements IMachineLearningRepository {
tagImage(input: MachineLearningInput): Promise<string[]> {
return client.post<string[]>('/image-classifier/tag-image', input).then((res) => res.data);
}
detectObjects(input: MachineLearningInput): Promise<string[]> {
return client.post<string[]>('/object-detection/detect-object', input).then((res) => res.data);
}
}

View File

@@ -0,0 +1 @@
export * from './media.repository';

View File

@@ -0,0 +1,37 @@
import { IMediaRepository, ResizeOptions } from '@app/domain';
import { exiftool } from 'exiftool-vendored';
import ffmpeg from 'fluent-ffmpeg';
import sharp from 'sharp';
export class MediaRepository implements IMediaRepository {
extractThumbnailFromExif(input: string, output: string): Promise<void> {
return exiftool.extractThumbnail(input, output);
}
async resize(input: string, output: string, options: ResizeOptions): Promise<void> {
switch (options.format) {
case 'webp':
await sharp(input, { failOnError: false }).resize(250).webp().rotate().toFile(output);
return;
case 'jpeg':
await sharp(input, { failOnError: false })
.resize(options.size, options.size, { fit: 'outside', withoutEnlargement: true })
.jpeg()
.rotate()
.toFile(output);
return;
}
}
extractVideoThumbnail(input: string, output: string) {
return new Promise<void>((resolve, reject) => {
ffmpeg(input)
.outputOptions(['-ss 00:00:00.000', '-frames:v 1'])
.output(output)
.on('error', reject)
.on('end', resolve)
.run();
});
}
}

View File

@@ -1,13 +1,15 @@
import { ImmichReadStream, IStorageRepository } from '@app/domain';
import { constants, createReadStream, stat } from 'fs';
import { constants, createReadStream, existsSync, mkdirSync } from 'fs';
import fs from 'fs/promises';
import { promisify } from 'util';
import mv from 'mv';
import { promisify } from 'node:util';
import path from 'path';
const fileInfo = promisify(stat);
const moveFile = promisify<string, string, mv.Options>(mv);
export class FilesystemProvider implements IStorageRepository {
async createReadStream(filepath: string, mimeType: string): Promise<ImmichReadStream> {
const { size } = await fileInfo(filepath);
const { size } = await fs.stat(filepath);
await fs.access(filepath, constants.R_OK | constants.W_OK);
return {
stream: createReadStream(filepath),
@@ -15,4 +17,53 @@ export class FilesystemProvider implements IStorageRepository {
type: mimeType,
};
}
async moveFile(source: string, destination: string): Promise<void> {
await moveFile(source, destination, { mkdirp: true, clobber: false });
}
async checkFileExists(filepath: string): Promise<boolean> {
try {
await fs.access(filepath, constants.F_OK);
return true;
} catch (_) {
return false;
}
}
async unlink(file: string) {
await fs.unlink(file);
}
async unlinkDir(folder: string, options: { recursive?: boolean; force?: boolean }) {
await fs.rm(folder, options);
}
async removeEmptyDirs(directory: string) {
this._removeEmptyDirs(directory, false);
}
private async _removeEmptyDirs(directory: string, self: boolean) {
// lstat does not follow symlinks (in contrast to stat)
const stats = await fs.lstat(directory);
if (!stats.isDirectory()) {
return;
}
const files = await fs.readdir(directory);
await Promise.all(files.map((file) => this._removeEmptyDirs(path.join(directory, file), true)));
if (self) {
const updated = await fs.readdir(directory);
if (updated.length === 0) {
await fs.rmdir(directory);
}
}
}
mkdirSync(filepath: string): void {
if (!existsSync(filepath)) {
mkdirSync(filepath, { recursive: true });
}
}
}