feat(server/web) Add manual job trigger mechanism to the web (#767)

This commit is contained in:
Alex
2022-10-06 11:25:54 -05:00
committed by GitHub
parent 854c214bc0
commit 7587f858ae
75 changed files with 3052 additions and 238 deletions

View File

@@ -4,13 +4,7 @@ import { AssetEntity } from '@app/database/entities/asset.entity';
import { ExifEntity } from '@app/database/entities/exif.entity';
import { SmartInfoEntity } from '@app/database/entities/smart-info.entity';
import { UserEntity } from '@app/database/entities/user.entity';
import {
assetUploadedQueueName,
generateChecksumQueueName,
metadataExtractionQueueName,
thumbnailGeneratorQueueName,
videoConversionQueueName,
} from '@app/job/constants/queue-name.constant';
import { QueueNameEnum } from '@app/job/constants/queue-name.constant';
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
@@ -19,6 +13,7 @@ import { CommunicationModule } from '../../immich/src/api-v1/communication/commu
import { MicroservicesService } from './microservices.service';
import { AssetUploadedProcessor } from './processors/asset-uploaded.processor';
import { GenerateChecksumProcessor } from './processors/generate-checksum.processor';
import { MachineLearningProcessor } from './processors/machine-learning.processor';
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';
import { ThumbnailGeneratorProcessor } from './processors/thumbnail.processor';
import { VideoTranscodeProcessor } from './processors/video-transcode.processor';
@@ -42,7 +37,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
}),
BullModule.registerQueue(
{
name: thumbnailGeneratorQueueName,
name: QueueNameEnum.THUMBNAIL_GENERATION,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -50,7 +45,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
},
},
{
name: assetUploadedQueueName,
name: QueueNameEnum.ASSET_UPLOADED,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -58,7 +53,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
},
},
{
name: metadataExtractionQueueName,
name: QueueNameEnum.METADATA_EXTRACTION,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -66,7 +61,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
},
},
{
name: videoConversionQueueName,
name: QueueNameEnum.VIDEO_CONVERSION,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -74,7 +69,15 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
},
},
{
name: generateChecksumQueueName,
name: QueueNameEnum.CHECKSUM_GENERATION,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
removeOnFail: false,
},
},
{
name: QueueNameEnum.MACHINE_LEARNING,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -92,6 +95,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
MetadataExtractionProcessor,
VideoTranscodeProcessor,
GenerateChecksumProcessor,
MachineLearningProcessor,
ConfigService,
],
exports: [],

View File

@@ -1,4 +1,4 @@
import { generateChecksumQueueName } from '@app/job';
import { QueueNameEnum } from '@app/job';
import { InjectQueue } from '@nestjs/bull';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Queue } from 'bull';
@@ -6,14 +6,18 @@ import { randomUUID } from 'node:crypto';
@Injectable()
export class MicroservicesService implements OnModuleInit {
constructor (
@InjectQueue(generateChecksumQueueName)
constructor(
@InjectQueue(QueueNameEnum.CHECKSUM_GENERATION)
private generateChecksumQueue: Queue,
) {}
async onModuleInit() {
await this.generateChecksumQueue.add({}, {
jobId: randomUUID(), delay: 10000 // wait for migration
});
await this.generateChecksumQueue.add(
{},
{
jobId: randomUUID(),
delay: 10000, // wait for migration
},
);
}
}

View File

@@ -4,30 +4,27 @@ import {
IMetadataExtractionJob,
IThumbnailGenerationJob,
IVideoTranscodeJob,
assetUploadedQueueName,
metadataExtractionQueueName,
thumbnailGeneratorQueueName,
videoConversionQueueName,
assetUploadedProcessorName,
exifExtractionProcessorName,
generateJPEGThumbnailProcessorName,
mp4ConversionProcessorName,
videoMetadataExtractionProcessorName,
QueueNameEnum,
} from '@app/job';
import { InjectQueue, Process, Processor } from '@nestjs/bull';
import { Job, Queue } from 'bull';
import { randomUUID } from 'crypto';
@Processor(assetUploadedQueueName)
@Processor(QueueNameEnum.ASSET_UPLOADED)
export class AssetUploadedProcessor {
constructor(
@InjectQueue(thumbnailGeneratorQueueName)
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue<IThumbnailGenerationJob>,
@InjectQueue(metadataExtractionQueueName)
@InjectQueue(QueueNameEnum.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(videoConversionQueueName)
@InjectQueue(QueueNameEnum.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
) {}

View File

@@ -1,5 +1,5 @@
import { AssetEntity } from '@app/database/entities/asset.entity';
import { generateChecksumQueueName } from '@app/job';
import { QueueNameEnum } from '@app/job';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
@@ -8,7 +8,7 @@ import fs from 'node:fs';
import { FindOptionsWhere, IsNull, MoreThan, QueryFailedError, Repository } from 'typeorm';
// TODO: just temporary task to generate previous uploaded assets.
@Processor(generateChecksumQueueName)
@Processor(QueueNameEnum.CHECKSUM_GENERATION)
export class GenerateChecksumProcessor {
constructor(
@InjectRepository(AssetEntity)
@@ -33,7 +33,7 @@ export class GenerateChecksumProcessor {
const assets = await this.assetRepository.find({
where: whereStat,
take: pageSize,
order: { id: 'ASC' }
order: { id: 'ASC' },
});
if (!assets?.length) {

View File

@@ -0,0 +1,60 @@
import { AssetEntity } from '@app/database/entities/asset.entity';
import { SmartInfoEntity } from '@app/database/entities/smart-info.entity';
import { MachineLearningJobNameEnum, QueueNameEnum } from '@app/job';
import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import axios from 'axios';
import { Job } from 'bull';
import { Repository } from 'typeorm';
@Processor(QueueNameEnum.MACHINE_LEARNING)
export class MachineLearningProcessor {
constructor(
@InjectRepository(SmartInfoEntity)
private smartInfoRepository: Repository<SmartInfoEntity>,
) {}
@Process({ name: MachineLearningJobNameEnum.IMAGE_TAGGING, concurrency: 2 })
async tagImage(job: Job<IMachineLearningJob>) {
const { asset } = job.data;
const res = await axios.post('http://immich-machine-learning:3003/image-classifier/tag-image', {
thumbnailPath: asset.resizePath,
});
if (res.status == 201 && res.data.length > 0) {
const smartInfo = new SmartInfoEntity();
smartInfo.assetId = asset.id;
smartInfo.tags = [...res.data];
await this.smartInfoRepository.upsert(smartInfo, {
conflictPaths: ['assetId'],
});
}
}
@Process({ name: MachineLearningJobNameEnum.OBJECT_DETECTION, concurrency: 2 })
async detectObject(job: Job<IMachineLearningJob>) {
try {
const { asset }: { asset: AssetEntity } = job.data;
const res = await axios.post('http://immich-machine-learning:3003/object-detection/detect-object', {
thumbnailPath: asset.resizePath,
});
if (res.status == 201 && res.data.length > 0) {
const smartInfo = new SmartInfoEntity();
smartInfo.assetId = asset.id;
smartInfo.objects = [...res.data];
await this.smartInfoRepository.upsert(smartInfo, {
conflictPaths: ['assetId'],
});
}
} catch (error) {
Logger.error(`Failed to trigger object detection pipe line ${String(error)}`);
}
}
}

View File

@@ -1,23 +1,19 @@
import { ImmichLogLevel } from '@app/common/constants/log-level.constant';
import { AssetEntity } from '@app/database/entities/asset.entity';
import { ExifEntity } from '@app/database/entities/exif.entity';
import { SmartInfoEntity } from '@app/database/entities/smart-info.entity';
import {
IExifExtractionProcessor,
IVideoLengthExtractionProcessor,
exifExtractionProcessorName,
imageTaggingProcessorName,
objectDetectionProcessorName,
videoMetadataExtractionProcessorName,
metadataExtractionQueueName,
reverseGeocodingProcessorName,
IReverseGeocodingProcessor,
QueueNameEnum,
} from '@app/job';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { InjectRepository } from '@nestjs/typeorm';
import axios from 'axios';
import { Job } from 'bull';
import exifr from 'exifr';
import ffmpeg from 'fluent-ffmpeg';
@@ -79,7 +75,7 @@ export interface GeoData {
distance: number;
}
@Processor(metadataExtractionQueueName)
@Processor(QueueNameEnum.METADATA_EXTRACTION)
export class MetadataExtractionProcessor {
private isGeocodeInitialized = false;
private logLevel: ImmichLogLevel;
@@ -91,9 +87,6 @@ export class MetadataExtractionProcessor {
@InjectRepository(ExifEntity)
private exifRepository: Repository<ExifEntity>,
@InjectRepository(SmartInfoEntity)
private smartInfoRepository: Repository<SmartInfoEntity>,
private configService: ConfigService,
) {
if (!configService.get('DISABLE_REVERSE_GEOCODING')) {
@@ -109,7 +102,8 @@ export class MetadataExtractionProcessor {
alternateNames: false,
},
countries: [],
dumpDirectory: configService.get('REVERSE_GEOCODING_DUMP_DIRECTORY') || (process.cwd() + '/.reverse-geocoding-dump/'),
dumpDirectory:
configService.get('REVERSE_GEOCODING_DUMP_DIRECTORY') || process.cwd() + '/.reverse-geocoding-dump/',
}).then(() => {
this.isGeocodeInitialized = true;
Logger.log('Reverse Geocoding Initialised');
@@ -273,48 +267,6 @@ export class MetadataExtractionProcessor {
}
}
@Process({ name: imageTaggingProcessorName, concurrency: 2 })
async tagImage(job: Job) {
const { asset }: { asset: AssetEntity } = job.data;
const res = await axios.post('http://immich-machine-learning:3003/image-classifier/tag-image', {
thumbnailPath: asset.resizePath,
});
if (res.status == 201 && res.data.length > 0) {
const smartInfo = new SmartInfoEntity();
smartInfo.assetId = asset.id;
smartInfo.tags = [...res.data];
await this.smartInfoRepository.upsert(smartInfo, {
conflictPaths: ['assetId'],
});
}
}
@Process({ name: objectDetectionProcessorName, concurrency: 2 })
async detectObject(job: Job) {
try {
const { asset }: { asset: AssetEntity } = job.data;
const res = await axios.post('http://immich-machine-learning:3003/object-detection/detect-object', {
thumbnailPath: asset.resizePath,
});
if (res.status == 201 && res.data.length > 0) {
const smartInfo = new SmartInfoEntity();
smartInfo.assetId = asset.id;
smartInfo.objects = [...res.data];
await this.smartInfoRepository.upsert(smartInfo, {
conflictPaths: ['assetId'],
});
}
} catch (error) {
Logger.error(`Failed to trigger object detection pipe line ${String(error)}`);
}
}
@Process({ name: videoMetadataExtractionProcessorName, concurrency: 2 })
async extractVideoMetadata(job: Job<IVideoLengthExtractionProcessor>) {
const { asset, fileName } = job.data;

View File

@@ -5,11 +5,9 @@ import {
WebpGeneratorProcessor,
generateJPEGThumbnailProcessorName,
generateWEBPThumbnailProcessorName,
imageTaggingProcessorName,
objectDetectionProcessorName,
metadataExtractionQueueName,
thumbnailGeneratorQueueName,
JpegGeneratorProcessor,
QueueNameEnum,
MachineLearningJobNameEnum,
} from '@app/job';
import { InjectQueue, Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@@ -25,8 +23,9 @@ import sharp from 'sharp';
import { Repository } from 'typeorm/repository/Repository';
import { join } from 'path';
import { CommunicationGateway } from 'apps/immich/src/api-v1/communication/communication.gateway';
import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface';
@Processor(thumbnailGeneratorQueueName)
@Processor(QueueNameEnum.THUMBNAIL_GENERATION)
export class ThumbnailGeneratorProcessor {
private logLevel: ImmichLogLevel;
@@ -34,13 +33,13 @@ export class ThumbnailGeneratorProcessor {
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
@InjectQueue(thumbnailGeneratorQueueName)
@InjectQueue(QueueNameEnum.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue,
private wsCommunicationGateway: CommunicationGateway,
@InjectQueue(metadataExtractionQueueName)
private metadataExtractionQueue: Queue,
@InjectQueue(QueueNameEnum.MACHINE_LEARNING)
private machineLearningQueue: Queue<IMachineLearningJob>,
private configService: ConfigService,
) {
@@ -80,8 +79,12 @@ export class ThumbnailGeneratorProcessor {
asset.resizePath = jpegThumbnailPath;
await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.metadataExtractionQueue.add(imageTaggingProcessorName, { asset }, { jobId: randomUUID() });
await this.metadataExtractionQueue.add(objectDetectionProcessorName, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(
MachineLearningJobNameEnum.OBJECT_DETECTION,
{ asset },
{ jobId: randomUUID() },
);
this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset)));
}
@@ -110,8 +113,12 @@ export class ThumbnailGeneratorProcessor {
asset.resizePath = jpegThumbnailPath;
await this.thumbnailGeneratorQueue.add(generateWEBPThumbnailProcessorName, { asset }, { jobId: randomUUID() });
await this.metadataExtractionQueue.add(imageTaggingProcessorName, { asset }, { jobId: randomUUID() });
await this.metadataExtractionQueue.add(objectDetectionProcessorName, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(MachineLearningJobNameEnum.IMAGE_TAGGING, { asset }, { jobId: randomUUID() });
await this.machineLearningQueue.add(
MachineLearningJobNameEnum.OBJECT_DETECTION,
{ asset },
{ jobId: randomUUID() },
);
this.wsCommunicationGateway.server.to(asset.userId).emit('on_upload_success', JSON.stringify(mapAsset(asset)));
}

View File

@@ -1,7 +1,7 @@
import { APP_UPLOAD_LOCATION } from '@app/common/constants';
import { AssetEntity } from '@app/database/entities/asset.entity';
import { QueueNameEnum } from '@app/job';
import { mp4ConversionProcessorName } from '@app/job/constants/job-name.constant';
import { videoConversionQueueName } from '@app/job/constants/queue-name.constant';
import { IMp4ConversionProcessor } from '@app/job/interfaces/video-transcode.interface';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
@@ -11,7 +11,7 @@ import ffmpeg from 'fluent-ffmpeg';
import { existsSync, mkdirSync } from 'fs';
import { Repository } from 'typeorm';
@Processor(videoConversionQueueName)
@Processor(QueueNameEnum.VIDEO_CONVERSION)
export class VideoTranscodeProcessor {
constructor(
@InjectRepository(AssetEntity)