refactor(server): job repository (#1382)

* refactor(server): job repository

* refactor: job repository

* chore: generate open-api

* fix: job panel

* Remove incorrect subtitle

Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
Jason Rasmussen
2023-01-21 23:13:36 -05:00
committed by GitHub
parent f4c90426a5
commit 4cfac47674
34 changed files with 418 additions and 1124 deletions

View File

@@ -295,7 +295,7 @@ export class AssetController {
deleteAssetList.filter((a) => a.id == res.id && res.status == DeleteAssetStatusEnum.SUCCESS);
});
await this.backgroundTaskService.deleteFileOnDisk(deleteAssetList);
await this.backgroundTaskService.deleteFileOnDisk(deleteAssetList as any[]);
return result;
}

View File

@@ -9,11 +9,11 @@ import { TimeGroupEnum } from './dto/get-asset-count-by-time-bucket.dto';
import { AssetCountByUserIdResponseDto } from './response-dto/asset-count-by-user-id-response.dto';
import { DownloadService } from '../../modules/download/download.service';
import { BackgroundTaskService } from '../../modules/background-task/background-task.service';
import { IAssetUploadedJob, IVideoTranscodeJob } from '@app/domain';
import { Queue } from 'bull';
import { IAlbumRepository } from '../album/album-repository';
import { StorageService } from '@app/storage';
import { ISharedLinkRepository } from '../share/shared-link.repository';
import { IJobRepository } from '@app/domain';
import { newJobRepositoryMock } from '@app/domain/../test';
describe('AssetService', () => {
let sui: AssetService;
@@ -22,10 +22,9 @@ describe('AssetService', () => {
let albumRepositoryMock: jest.Mocked<IAlbumRepository>;
let downloadServiceMock: jest.Mocked<Partial<DownloadService>>;
let backgroundTaskServiceMock: jest.Mocked<BackgroundTaskService>;
let assetUploadedQueueMock: jest.Mocked<Queue<IAssetUploadedJob>>;
let videoConversionQueueMock: jest.Mocked<Queue<IVideoTranscodeJob>>;
let storageSeriveMock: jest.Mocked<StorageService>;
let sharedLinkRepositoryMock: jest.Mocked<ISharedLinkRepository>;
let jobMock: jest.Mocked<IJobRepository>;
const authUser: AuthUserDto = Object.freeze({
id: 'user_id_1',
email: 'auth@test.com',
@@ -148,16 +147,17 @@ describe('AssetService', () => {
getByIdAndUserId: jest.fn(),
};
jobMock = newJobRepositoryMock();
sui = new AssetService(
assetRepositoryMock,
albumRepositoryMock,
a,
backgroundTaskServiceMock,
assetUploadedQueueMock,
videoConversionQueueMock,
downloadServiceMock as DownloadService,
storageSeriveMock,
sharedLinkRepositoryMock,
jobMock,
);
});

View File

@@ -43,9 +43,7 @@ import { CheckExistingAssetsResponseDto } from './response-dto/check-existing-as
import { UpdateAssetDto } from './dto/update-asset.dto';
import { AssetFileUploadResponseDto } from './response-dto/asset-file-upload-response.dto';
import { BackgroundTaskService } from '../../modules/background-task/background-task.service';
import { IAssetUploadedJob, IVideoTranscodeJob, JobName, QueueName } from '@app/domain';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { IJobRepository, JobName } from '@app/domain';
import { DownloadService } from '../../modules/download/download.service';
import { DownloadDto } from './dto/download-library.dto';
import { IAlbumRepository } from '../album/album-repository';
@@ -66,24 +64,14 @@ export class AssetService {
constructor(
@Inject(IAssetRepository) private _assetRepository: IAssetRepository,
@Inject(IAlbumRepository) private _albumRepository: IAlbumRepository,
@InjectRepository(AssetEntity)
private assetRepository: Repository<AssetEntity>,
private backgroundTaskService: BackgroundTaskService,
@InjectQueue(QueueName.ASSET_UPLOADED)
private assetUploadedQueue: Queue<IAssetUploadedJob>,
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
private downloadService: DownloadService,
private storageService: StorageService,
@Inject(ISharedLinkRepository) sharedLinkRepository: ISharedLinkRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
) {
this.shareCore = new ShareCore(sharedLinkRepository);
}
@@ -122,7 +110,7 @@ export class AssetService {
await this.storageService.moveAsset(livePhotoAssetEntity, originalAssetData.originalname);
await this.videoConversionQueue.add(JobName.VIDEO_CONVERSION, { asset: livePhotoAssetEntity });
await this.jobRepository.add({ name: JobName.VIDEO_CONVERSION, data: { asset: livePhotoAssetEntity } });
}
const assetEntity = await this.createUserAsset(
@@ -146,11 +134,10 @@ export class AssetService {
const movedAsset = await this.storageService.moveAsset(assetEntity, originalAssetData.originalname);
await this.assetUploadedQueue.add(
JobName.ASSET_UPLOADED,
{ asset: movedAsset, fileName: originalAssetData.originalname },
{ jobId: movedAsset.id },
);
await this.jobRepository.add({
name: JobName.ASSET_UPLOADED,
data: { asset: movedAsset, fileName: originalAssetData.originalname },
});
return new AssetFileUploadResponseDto(movedAsset.id);
} catch (err) {

View File

@@ -1,11 +1,9 @@
import { Controller, Get, Body, ValidationPipe, Put, Param } from '@nestjs/common';
import { JobService } from './job.service';
import { Body, Controller, Get, Param, Put, ValidationPipe } from '@nestjs/common';
import { ApiBearerAuth, ApiTags } from '@nestjs/swagger';
import { Authenticated } from '../../decorators/authenticated.decorator';
import { AllJobStatusResponseDto } from './response-dto/all-job-status-response.dto';
import { GetJobDto } from './dto/get-job.dto';
import { JobStatusResponseDto } from './response-dto/job-status-response.dto';
import { JobService } from './job.service';
import { JobCommandDto } from './dto/job-command.dto';
@Authenticated({ admin: true })
@@ -20,21 +18,16 @@ export class JobController {
return this.jobService.getAllJobsStatus();
}
@Get('/:jobId')
getJobStatus(@Param(ValidationPipe) params: GetJobDto): Promise<JobStatusResponseDto> {
return this.jobService.getJobStatus(params);
}
@Put('/:jobId')
async sendJobCommand(
@Param(ValidationPipe) params: GetJobDto,
@Body(ValidationPipe) body: JobCommandDto,
): Promise<number> {
if (body.command === 'start') {
return await this.jobService.startJob(params);
return await this.jobService.start(params.jobId);
}
if (body.command === 'stop') {
return await this.jobService.stopJob(params);
return await this.jobService.stop(params.jobId);
}
return 0;
}

View File

@@ -1,217 +1,118 @@
import {
IMachineLearningJob,
IMetadataExtractionJob,
IThumbnailGenerationJob,
IVideoTranscodeJob,
JobName,
QueueName,
} from '@app/domain';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { JobName, IJobRepository, QueueName } from '@app/domain';
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { AllJobStatusResponseDto } from './response-dto/all-job-status-response.dto';
import { IAssetRepository } from '../asset/asset-repository';
import { AssetType } from '@app/infra';
import { GetJobDto, JobId } from './dto/get-job.dto';
import { JobStatusResponseDto } from './response-dto/job-status-response.dto';
import { StorageService } from '@app/storage';
import { JobId } from './dto/get-job.dto';
import { MACHINE_LEARNING_ENABLED } from '@app/common';
const jobIds = Object.values(JobId) as JobId[];
@Injectable()
export class JobService {
constructor(
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue<IThumbnailGenerationJob>,
@InjectQueue(QueueName.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
@InjectQueue(QueueName.MACHINE_LEARNING)
private machineLearningQueue: Queue<IMachineLearningJob>,
@InjectQueue(QueueName.CONFIG)
private configQueue: Queue,
@Inject(IAssetRepository)
private _assetRepository: IAssetRepository,
private storageService: StorageService,
@Inject(IAssetRepository) private _assetRepository: IAssetRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
) {
this.thumbnailGeneratorQueue.empty();
this.metadataExtractionQueue.empty();
this.videoConversionQueue.empty();
this.configQueue.empty();
for (const jobId of jobIds) {
this.jobRepository.empty(this.asQueueName(jobId));
}
}
async startJob(jobDto: GetJobDto): Promise<number> {
switch (jobDto.jobId) {
case JobId.THUMBNAIL_GENERATION:
return this.runThumbnailGenerationJob();
case JobId.METADATA_EXTRACTION:
return this.runMetadataExtractionJob();
case JobId.VIDEO_CONVERSION:
return this.runVideoConversionJob();
case JobId.MACHINE_LEARNING:
return this.runMachineLearningPipeline();
case JobId.STORAGE_TEMPLATE_MIGRATION:
return this.runStorageMigration();
default:
throw new BadRequestException('Invalid job id');
}
start(jobId: JobId): Promise<number> {
return this.run(this.asQueueName(jobId));
}
async stop(jobId: JobId): Promise<number> {
await this.jobRepository.empty(this.asQueueName(jobId));
return 0;
}
async getAllJobsStatus(): Promise<AllJobStatusResponseDto> {
const thumbnailGeneratorJobCount = await this.thumbnailGeneratorQueue.getJobCounts();
const metadataExtractionJobCount = await this.metadataExtractionQueue.getJobCounts();
const videoConversionJobCount = await this.videoConversionQueue.getJobCounts();
const machineLearningJobCount = await this.machineLearningQueue.getJobCounts();
const storageMigrationJobCount = await this.configQueue.getJobCounts();
const response = new AllJobStatusResponseDto();
response.isThumbnailGenerationActive = Boolean(thumbnailGeneratorJobCount.waiting);
response.thumbnailGenerationQueueCount = thumbnailGeneratorJobCount;
response.isMetadataExtractionActive = Boolean(metadataExtractionJobCount.waiting);
response.metadataExtractionQueueCount = metadataExtractionJobCount;
response.isVideoConversionActive = Boolean(videoConversionJobCount.waiting);
response.videoConversionQueueCount = videoConversionJobCount;
response.isMachineLearningActive = Boolean(machineLearningJobCount.waiting);
response.machineLearningQueueCount = machineLearningJobCount;
response.isStorageMigrationActive = Boolean(storageMigrationJobCount.active);
response.storageMigrationQueueCount = storageMigrationJobCount;
for (const jobId of jobIds) {
response[jobId] = await this.jobRepository.getJobCounts(this.asQueueName(jobId));
}
return response;
}
async getJobStatus(query: GetJobDto): Promise<JobStatusResponseDto> {
const response = new JobStatusResponseDto();
if (query.jobId === JobId.THUMBNAIL_GENERATION) {
response.isActive = Boolean((await this.thumbnailGeneratorQueue.getJobCounts()).waiting);
response.queueCount = await this.thumbnailGeneratorQueue.getJobCounts();
private async run(name: QueueName): Promise<number> {
const isActive = await this.jobRepository.isActive(name);
if (isActive) {
throw new BadRequestException(`Job is already running`);
}
if (query.jobId === JobId.METADATA_EXTRACTION) {
response.isActive = Boolean((await this.metadataExtractionQueue.getJobCounts()).waiting);
response.queueCount = await this.metadataExtractionQueue.getJobCounts();
}
switch (name) {
case QueueName.VIDEO_CONVERSION: {
const assets = await this._assetRepository.getAssetWithNoEncodedVideo();
for (const asset of assets) {
await this.jobRepository.add({ name: JobName.VIDEO_CONVERSION, data: { asset } });
}
if (query.jobId === JobId.VIDEO_CONVERSION) {
response.isActive = Boolean((await this.videoConversionQueue.getJobCounts()).waiting);
response.queueCount = await this.videoConversionQueue.getJobCounts();
}
if (query.jobId === JobId.STORAGE_TEMPLATE_MIGRATION) {
response.isActive = Boolean((await this.configQueue.getJobCounts()).waiting);
response.queueCount = await this.configQueue.getJobCounts();
}
return response;
}
async stopJob(query: GetJobDto): Promise<number> {
switch (query.jobId) {
case JobId.THUMBNAIL_GENERATION:
this.thumbnailGeneratorQueue.empty();
return 0;
case JobId.METADATA_EXTRACTION:
this.metadataExtractionQueue.empty();
return 0;
case JobId.VIDEO_CONVERSION:
this.videoConversionQueue.empty();
return 0;
case JobId.MACHINE_LEARNING:
this.machineLearningQueue.empty();
return 0;
case JobId.STORAGE_TEMPLATE_MIGRATION:
this.configQueue.empty();
return 0;
default:
throw new BadRequestException('Invalid job id');
}
}
private async runThumbnailGenerationJob(): Promise<number> {
const jobCount = await this.thumbnailGeneratorQueue.getJobCounts();
if (jobCount.waiting > 0) {
throw new BadRequestException('Thumbnail generation job is already running');
}
const assetsWithNoThumbnail = await this._assetRepository.getAssetWithNoThumbnail();
for (const asset of assetsWithNoThumbnail) {
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_JPEG_THUMBNAIL, { asset });
}
return assetsWithNoThumbnail.length;
}
private async runMetadataExtractionJob(): Promise<number> {
const jobCount = await this.metadataExtractionQueue.getJobCounts();
if (jobCount.waiting > 0) {
throw new BadRequestException('Metadata extraction job is already running');
}
const assetsWithNoExif = await this._assetRepository.getAssetWithNoEXIF();
for (const asset of assetsWithNoExif) {
if (asset.type === AssetType.VIDEO) {
await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id });
} else {
await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id });
return assets.length;
}
case QueueName.CONFIG:
await this.jobRepository.add({ name: JobName.TEMPLATE_MIGRATION });
return 1;
case QueueName.MACHINE_LEARNING: {
if (!MACHINE_LEARNING_ENABLED) {
throw new BadRequestException('Machine learning is not enabled.');
}
const assets = await this._assetRepository.getAssetWithNoSmartInfo();
for (const asset of assets) {
await this.jobRepository.add({ name: JobName.IMAGE_TAGGING, data: { asset } });
await this.jobRepository.add({ name: JobName.OBJECT_DETECTION, data: { asset } });
}
return assets.length;
}
case QueueName.METADATA_EXTRACTION: {
const assets = await this._assetRepository.getAssetWithNoEXIF();
for (const asset of assets) {
if (asset.type === AssetType.VIDEO) {
await this.jobRepository.add({ name: JobName.EXTRACT_VIDEO_METADATA, data: { asset, fileName: asset.id } });
} else {
await this.jobRepository.add({ name: JobName.EXIF_EXTRACTION, data: { asset, fileName: asset.id } });
}
}
return assets.length;
}
case QueueName.THUMBNAIL_GENERATION: {
const assets = await this._assetRepository.getAssetWithNoThumbnail();
for (const asset of assets) {
await this.jobRepository.add({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: { asset } });
}
return assets.length;
}
default:
return 0;
}
return assetsWithNoExif.length;
}
private async runMachineLearningPipeline(): Promise<number> {
if (!MACHINE_LEARNING_ENABLED) {
throw new BadRequestException('Machine learning is not enabled.');
private asQueueName(jobId: JobId) {
switch (jobId) {
case JobId.THUMBNAIL_GENERATION:
return QueueName.THUMBNAIL_GENERATION;
case JobId.METADATA_EXTRACTION:
return QueueName.METADATA_EXTRACTION;
case JobId.VIDEO_CONVERSION:
return QueueName.VIDEO_CONVERSION;
case JobId.STORAGE_TEMPLATE_MIGRATION:
return QueueName.CONFIG;
case JobId.MACHINE_LEARNING:
return QueueName.MACHINE_LEARNING;
default:
throw new BadRequestException(`Invalid job id: ${jobId}`);
}
const jobCount = await this.machineLearningQueue.getJobCounts();
if (jobCount.waiting > 0) {
throw new BadRequestException('Metadata extraction job is already running');
}
const assetWithNoSmartInfo = await this._assetRepository.getAssetWithNoSmartInfo();
for (const asset of assetWithNoSmartInfo) {
await this.machineLearningQueue.add(JobName.IMAGE_TAGGING, { asset });
await this.machineLearningQueue.add(JobName.OBJECT_DETECTION, { asset });
}
return assetWithNoSmartInfo.length;
}
private async runVideoConversionJob(): Promise<number> {
const jobCount = await this.videoConversionQueue.getJobCounts();
if (jobCount.waiting > 0) {
throw new BadRequestException('Video conversion job is already running');
}
const assetsWithNoConvertedVideo = await this._assetRepository.getAssetWithNoEncodedVideo();
for (const asset of assetsWithNoConvertedVideo) {
await this.videoConversionQueue.add(JobName.VIDEO_CONVERSION, { asset });
}
return assetsWithNoConvertedVideo.length;
}
async runStorageMigration() {
const jobCount = await this.configQueue.getJobCounts();
if (jobCount.active > 0) {
throw new BadRequestException('Storage migration job is already running');
}
await this.configQueue.add(JobName.TEMPLATE_MIGRATION, {});
return 1;
}
}

View File

@@ -1,4 +1,5 @@
import { ApiProperty } from '@nestjs/swagger';
import { JobId } from '../dto/get-job.dto';
export class JobCounts {
@ApiProperty({ type: 'integer' })
@@ -12,35 +13,20 @@ export class JobCounts {
@ApiProperty({ type: 'integer' })
waiting!: number;
}
export class AllJobStatusResponseDto {
isThumbnailGenerationActive!: boolean;
isMetadataExtractionActive!: boolean;
isVideoConversionActive!: boolean;
isMachineLearningActive!: boolean;
isStorageMigrationActive!: boolean;
@ApiProperty({ type: JobCounts })
[JobId.THUMBNAIL_GENERATION]!: JobCounts;
@ApiProperty({
type: JobCounts,
})
thumbnailGenerationQueueCount!: JobCounts;
@ApiProperty({ type: JobCounts })
[JobId.METADATA_EXTRACTION]!: JobCounts;
@ApiProperty({
type: JobCounts,
})
metadataExtractionQueueCount!: JobCounts;
@ApiProperty({ type: JobCounts })
[JobId.VIDEO_CONVERSION]!: JobCounts;
@ApiProperty({
type: JobCounts,
})
videoConversionQueueCount!: JobCounts;
@ApiProperty({ type: JobCounts })
[JobId.MACHINE_LEARNING]!: JobCounts;
@ApiProperty({
type: JobCounts,
})
machineLearningQueueCount!: JobCounts;
@ApiProperty({
type: JobCounts,
})
storageMigrationQueueCount!: JobCounts;
@ApiProperty({ type: JobCounts })
[JobId.STORAGE_TEMPLATE_MIGRATION]!: JobCounts;
}

View File

@@ -1,6 +0,0 @@
import Bull from 'bull';
export class JobStatusResponseDto {
isActive!: boolean;
queueCount!: Bull.JobCounts;
}

View File

@@ -1,12 +1,9 @@
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { QueueName } from '@app/domain';
import { BackgroundTaskProcessor } from './background-task.processor';
import { BackgroundTaskService } from './background-task.service';
@Module({
imports: [BullModule.registerQueue({ name: QueueName.BACKGROUND_TASK })],
providers: [BackgroundTaskService, BackgroundTaskProcessor],
exports: [BackgroundTaskService, BullModule],
exports: [BackgroundTaskService],
})
export class BackgroundTaskModule {}

View File

@@ -2,12 +2,12 @@ import { assetUtils } from '@app/common/utils';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JobName, QueueName } from '@app/domain';
import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto';
import { AssetEntity } from '@app/infra';
@Processor(QueueName.BACKGROUND_TASK)
export class BackgroundTaskProcessor {
@Process(JobName.DELETE_FILE_ON_DISK)
async deleteFileOnDisk(job: Job<{ assets: AssetResponseDto[] }>) {
async deleteFileOnDisk(job: Job<{ assets: AssetEntity[] }>) {
const { assets } = job.data;
for (const asset of assets) {

View File

@@ -1,17 +1,12 @@
import { InjectQueue } from '@nestjs/bull/dist/decorators';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { JobName, QueueName } from '@app/domain';
import { AssetResponseDto } from '../../api-v1/asset/response-dto/asset-response.dto';
import { IJobRepository, JobName } from '@app/domain';
import { AssetEntity } from '@app/infra';
import { Inject, Injectable } from '@nestjs/common';
@Injectable()
export class BackgroundTaskService {
constructor(
@InjectQueue(QueueName.BACKGROUND_TASK)
private backgroundTaskQueue: Queue,
) {}
constructor(@Inject(IJobRepository) private jobRepository: IJobRepository) {}
async deleteFileOnDisk(assets: AssetResponseDto[]) {
await this.backgroundTaskQueue.add(JobName.DELETE_FILE_ON_DISK, { assets });
async deleteFileOnDisk(assets: AssetEntity[]) {
await this.jobRepository.add({ name: JobName.DELETE_FILE_ON_DISK, data: { assets } });
}
}

View File

@@ -1,14 +1,11 @@
import { Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { IsNull, Not, Repository } from 'typeorm';
import { AssetEntity, AssetType, ExifEntity, UserEntity } from '@app/infra';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { IMetadataExtractionJob, IVideoTranscodeJob, QueueName, JobName } from '@app/domain';
import { ConfigService } from '@nestjs/config';
import { IUserDeletionJob } from '@app/domain';
import { userUtils } from '@app/common';
import { IJobRepository, JobName } from '@app/domain';
@Injectable()
export class ScheduleTasksService {
@@ -22,17 +19,7 @@ export class ScheduleTasksService {
@InjectRepository(ExifEntity)
private exifRepository: Repository<ExifEntity>,
@InjectQueue(QueueName.THUMBNAIL_GENERATION)
private thumbnailGeneratorQueue: Queue,
@InjectQueue(QueueName.VIDEO_CONVERSION)
private videoConversionQueue: Queue<IVideoTranscodeJob>,
@InjectQueue(QueueName.METADATA_EXTRACTION)
private metadataExtractionQueue: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueName.USER_DELETION)
private userDeletionQueue: Queue<IUserDeletionJob>,
@Inject(IJobRepository) private jobRepository: IJobRepository,
private configService: ConfigService,
) {}
@@ -51,7 +38,7 @@ export class ScheduleTasksService {
}
for (const asset of assets) {
await this.thumbnailGeneratorQueue.add(JobName.GENERATE_WEBP_THUMBNAIL, { asset: asset });
await this.jobRepository.add({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: { asset } });
}
}
@@ -69,7 +56,7 @@ export class ScheduleTasksService {
});
for (const asset of assets) {
await this.videoConversionQueue.add(JobName.VIDEO_CONVERSION, { asset });
await this.jobRepository.add({ name: JobName.VIDEO_CONVERSION, data: { asset } });
}
}
@@ -87,11 +74,11 @@ export class ScheduleTasksService {
});
for (const exif of exifInfo) {
await this.metadataExtractionQueue.add(
JobName.REVERSE_GEOCODING,
await this.jobRepository.add({
name: JobName.REVERSE_GEOCODING,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
{ exifId: exif.id, latitude: exif.latitude!, longitude: exif.longitude! },
);
data: { exifId: exif.id, latitude: exif.latitude!, longitude: exif.longitude! },
});
}
}
}
@@ -106,9 +93,9 @@ export class ScheduleTasksService {
for (const asset of exifAssets) {
if (asset.type === AssetType.VIDEO) {
await this.metadataExtractionQueue.add(JobName.EXTRACT_VIDEO_METADATA, { asset, fileName: asset.id });
await this.jobRepository.add({ name: JobName.EXTRACT_VIDEO_METADATA, data: { asset, fileName: asset.id } });
} else {
await this.metadataExtractionQueue.add(JobName.EXIF_EXTRACTION, { asset, fileName: asset.id });
await this.jobRepository.add({ name: JobName.EXIF_EXTRACTION, data: { asset, fileName: asset.id } });
}
}
}
@@ -118,7 +105,7 @@ export class ScheduleTasksService {
const usersToDelete = await this.userRepository.find({ withDeleted: true, where: { deletedAt: Not(IsNull()) } });
for (const user of usersToDelete) {
if (userUtils.isReadyForDeletion(user)) {
await this.userDeletionQueue.add(JobName.USER_DELETION, { user });
await this.jobRepository.add({ name: JobName.USER_DELETION, data: { user } });
}
}
}

View File

@@ -1,17 +1,16 @@
import { QueueName } from '@app/domain';
import { InjectQueue } from '@nestjs/bull';
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Queue } from 'bull';
import { Inject, Injectable, OnModuleInit } from '@nestjs/common';
import { IJobRepository, JobName } from '@app/domain';
const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(() => resolve(), ms));
@Injectable()
export class MicroservicesService implements OnModuleInit {
constructor(
@InjectQueue(QueueName.CHECKSUM_GENERATION)
private generateChecksumQueue: Queue,
) {}
constructor(@Inject(IJobRepository) private jobRepository: IJobRepository) {}
async onModuleInit() {
// wait for migration
await this.generateChecksumQueue.add({}, { delay: 10000 });
await sleep(10_000);
await this.jobRepository.add({ name: JobName.CHECKSUM_GENERATION });
}
}

View File

@@ -1,5 +1,5 @@
import { AssetEntity } from '@app/infra';
import { QueueName } from '@app/domain';
import { JobName, QueueName } from '@app/domain';
import { Process, Processor } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
@@ -15,7 +15,7 @@ export class GenerateChecksumProcessor {
private assetRepository: Repository<AssetEntity>,
) {}
@Process()
@Process(JobName.CHECKSUM_GENERATION)
async generateChecksum() {
const pageSize = 200;
let hasNext = true;

View File

@@ -2721,40 +2721,6 @@
}
},
"/jobs/{jobId}": {
"get": {
"operationId": "getJobStatus",
"description": "",
"parameters": [
{
"name": "jobId",
"required": true,
"in": "path",
"schema": {
"$ref": "#/components/schemas/JobId"
}
}
],
"responses": {
"200": {
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/JobStatusResponseDto"
}
}
}
}
},
"tags": [
"Job"
],
"security": [
{
"bearer": []
}
]
},
"put": {
"operationId": "sendJobCommand",
"description": "",
@@ -4569,48 +4535,28 @@
"AllJobStatusResponseDto": {
"type": "object",
"properties": {
"thumbnailGenerationQueueCount": {
"thumbnail-generation": {
"$ref": "#/components/schemas/JobCounts"
},
"metadataExtractionQueueCount": {
"metadata-extraction": {
"$ref": "#/components/schemas/JobCounts"
},
"videoConversionQueueCount": {
"video-conversion": {
"$ref": "#/components/schemas/JobCounts"
},
"machineLearningQueueCount": {
"machine-learning": {
"$ref": "#/components/schemas/JobCounts"
},
"storageMigrationQueueCount": {
"storage-template-migration": {
"$ref": "#/components/schemas/JobCounts"
},
"isThumbnailGenerationActive": {
"type": "boolean"
},
"isMetadataExtractionActive": {
"type": "boolean"
},
"isVideoConversionActive": {
"type": "boolean"
},
"isMachineLearningActive": {
"type": "boolean"
},
"isStorageMigrationActive": {
"type": "boolean"
}
},
"required": [
"thumbnailGenerationQueueCount",
"metadataExtractionQueueCount",
"videoConversionQueueCount",
"machineLearningQueueCount",
"storageMigrationQueueCount",
"isThumbnailGenerationActive",
"isMetadataExtractionActive",
"isVideoConversionActive",
"isMachineLearningActive",
"isStorageMigrationActive"
"thumbnail-generation",
"metadata-extraction",
"video-conversion",
"machine-learning",
"storage-template-migration"
]
},
"JobId": {
@@ -4623,21 +4569,6 @@
"storage-template-migration"
]
},
"JobStatusResponseDto": {
"type": "object",
"properties": {
"isActive": {
"type": "boolean"
},
"queueCount": {
"type": "object"
}
},
"required": [
"isActive",
"queueCount"
]
},
"JobCommand": {
"type": "string",
"enum": [

View File

@@ -24,4 +24,5 @@ export enum JobName {
OBJECT_DETECTION = 'detect-object',
IMAGE_TAGGING = 'tag-image',
DELETE_FILE_ON_DISK = 'delete-file-on-disk',
CHECKSUM_GENERATION = 'checksum-generation',
}

View File

@@ -6,10 +6,19 @@ import {
IVideoConversionProcessor,
IReverseGeocodingProcessor,
IUserDeletionJob,
IVideoLengthExtractionProcessor,
JpegGeneratorProcessor,
WebpGeneratorProcessor,
} from './interfaces';
import { JobName } from './job.constants';
import { JobName, QueueName } from './job.constants';
export interface JobCounts {
active: number;
completed: number;
failed: number;
delayed: number;
waiting: number;
}
export type JobItem =
| { name: JobName.ASSET_UPLOADED; data: IAssetUploadedJob }
@@ -21,6 +30,8 @@ export type JobItem =
| { name: JobName.USER_DELETION; data: IUserDeletionJob }
| { name: JobName.TEMPLATE_MIGRATION }
| { name: JobName.CONFIG_CHANGE }
| { name: JobName.CHECKSUM_GENERATION }
| { name: JobName.EXTRACT_VIDEO_METADATA; data: IVideoLengthExtractionProcessor }
| { name: JobName.OBJECT_DETECTION; data: IMachineLearningJob }
| { name: JobName.IMAGE_TAGGING; data: IMachineLearningJob }
| { name: JobName.DELETE_FILE_ON_DISK; data: IDeleteFileOnDiskJob };
@@ -28,5 +39,8 @@ export type JobItem =
export const IJobRepository = 'IJobRepository';
export interface IJobRepository {
empty(name: QueueName): Promise<void>;
add(item: JobItem): Promise<void>;
isActive(name: QueueName): Promise<boolean>;
getJobCounts(name: QueueName): Promise<JobCounts>;
}

View File

@@ -2,6 +2,9 @@ import { IJobRepository } from '../src';
export const newJobRepositoryMock = (): jest.Mocked<IJobRepository> => {
return {
empty: jest.fn(),
add: jest.fn().mockImplementation(() => Promise.resolve()),
isActive: jest.fn(),
getJobCounts: jest.fn(),
};
};

View File

@@ -1,21 +1,110 @@
import { IJobRepository, JobItem, JobName, QueueName } from '@app/domain';
import {
IAssetUploadedJob,
IJobRepository,
IMachineLearningJob,
IMetadataExtractionJob,
IUserDeletionJob,
IVideoTranscodeJob,
JobCounts,
JobItem,
JobName,
QueueName,
} from '@app/domain';
import { InjectQueue } from '@nestjs/bull';
import { Logger } from '@nestjs/common';
import { BadRequestException, Logger } from '@nestjs/common';
import { Queue } from 'bull';
export class JobRepository implements IJobRepository {
private logger = new Logger(JobRepository.name);
constructor(@InjectQueue(QueueName.CONFIG) private configQueue: Queue) {}
constructor(
@InjectQueue(QueueName.ASSET_UPLOADED) private assetUploaded: Queue<IAssetUploadedJob>,
@InjectQueue(QueueName.BACKGROUND_TASK) private backgroundTask: Queue,
@InjectQueue(QueueName.CHECKSUM_GENERATION) private generateChecksum: Queue,
@InjectQueue(QueueName.MACHINE_LEARNING) private machineLearning: Queue<IMachineLearningJob>,
@InjectQueue(QueueName.METADATA_EXTRACTION) private metadataExtraction: Queue<IMetadataExtractionJob>,
@InjectQueue(QueueName.CONFIG) private storageMigration: Queue,
@InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnail: Queue,
@InjectQueue(QueueName.USER_DELETION) private userDeletion: Queue<IUserDeletionJob>,
@InjectQueue(QueueName.VIDEO_CONVERSION) private videoTranscode: Queue<IVideoTranscodeJob>,
) {}
async isActive(name: QueueName): Promise<boolean> {
const counts = await this.getJobCounts(name);
return !!counts.active;
}
empty(name: QueueName) {
return this.getQueue(name).empty();
}
getJobCounts(name: QueueName): Promise<JobCounts> {
return this.getQueue(name).getJobCounts();
}
async add(item: JobItem): Promise<void> {
switch (item.name) {
case JobName.CONFIG_CHANGE:
await this.configQueue.add(JobName.CONFIG_CHANGE, {});
case JobName.ASSET_UPLOADED:
await this.assetUploaded.add(item.name, item.data, { jobId: item.data.asset.id });
break;
case JobName.DELETE_FILE_ON_DISK:
await this.backgroundTask.add(item.name, item.data);
break;
case JobName.CHECKSUM_GENERATION:
await this.generateChecksum.add(item.name, {});
break;
case JobName.OBJECT_DETECTION:
case JobName.IMAGE_TAGGING:
await this.machineLearning.add(item.name, item.data);
break;
case JobName.EXIF_EXTRACTION:
case JobName.EXTRACT_VIDEO_METADATA:
case JobName.REVERSE_GEOCODING:
await this.metadataExtraction.add(item.name, item.data);
break;
case JobName.TEMPLATE_MIGRATION:
case JobName.CONFIG_CHANGE:
await this.storageMigration.add(item.name, {});
break;
case JobName.GENERATE_JPEG_THUMBNAIL:
case JobName.GENERATE_WEBP_THUMBNAIL:
await this.thumbnail.add(item.name, item.data);
break;
case JobName.USER_DELETION:
await this.userDeletion.add(item.name, item.data);
break;
case JobName.VIDEO_CONVERSION:
await this.videoTranscode.add(item.name, item.data);
break;
default:
// TODO inject remaining queues and map job to queue
this.logger.error('Invalid job', item);
}
}
private getQueue(name: QueueName) {
switch (name) {
case QueueName.THUMBNAIL_GENERATION:
return this.thumbnail;
case QueueName.METADATA_EXTRACTION:
return this.metadataExtraction;
case QueueName.VIDEO_CONVERSION:
return this.videoTranscode;
case QueueName.CONFIG:
return this.storageMigration;
case QueueName.MACHINE_LEARNING:
return this.machineLearning;
default:
throw new BadRequestException('Invalid job name');
}
}
}