mirror of
https://github.com/KevinMidboe/immich.git
synced 2025-12-08 20:29:05 +00:00
refactor(server)*: tsconfigs (#2689)
* refactor(server): tsconfigs * chore: dummy commit * fix: start.sh * chore: restore original entry scripts
This commit is contained in:
2
server/src/domain/job/dto/index.ts
Normal file
2
server/src/domain/job/dto/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from './job-command.dto';
|
||||
export * from './job-id.dto';
|
||||
14
server/src/domain/job/dto/job-command.dto.ts
Normal file
14
server/src/domain/job/dto/job-command.dto.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { ApiProperty } from '@nestjs/swagger';
|
||||
import { IsBoolean, IsEnum, IsNotEmpty, IsOptional } from 'class-validator';
|
||||
import { JobCommand } from '../job.constants';
|
||||
|
||||
export class JobCommandDto {
|
||||
@IsNotEmpty()
|
||||
@IsEnum(JobCommand)
|
||||
@ApiProperty({ type: 'string', enum: JobCommand, enumName: 'JobCommand' })
|
||||
command!: JobCommand;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean()
|
||||
force!: boolean;
|
||||
}
|
||||
10
server/src/domain/job/dto/job-id.dto.ts
Normal file
10
server/src/domain/job/dto/job-id.dto.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { ApiProperty } from '@nestjs/swagger';
|
||||
import { IsEnum, IsNotEmpty } from 'class-validator';
|
||||
import { QueueName } from '../job.constants';
|
||||
|
||||
export class JobIdDto {
|
||||
@IsNotEmpty()
|
||||
@IsEnum(QueueName)
|
||||
@ApiProperty({ type: String, enum: QueueName, enumName: 'JobName' })
|
||||
jobId!: QueueName;
|
||||
}
|
||||
6
server/src/domain/job/index.ts
Normal file
6
server/src/domain/job/index.ts
Normal file
@@ -0,0 +1,6 @@
|
||||
export * from './dto';
|
||||
export * from './job.constants';
|
||||
export * from './job.interface';
|
||||
export * from './job.repository';
|
||||
export * from './job.service';
|
||||
export * from './response-dto';
|
||||
137
server/src/domain/job/job.constants.ts
Normal file
137
server/src/domain/job/job.constants.ts
Normal file
@@ -0,0 +1,137 @@
|
||||
export enum QueueName {
|
||||
THUMBNAIL_GENERATION = 'thumbnailGeneration',
|
||||
METADATA_EXTRACTION = 'metadataExtraction',
|
||||
VIDEO_CONVERSION = 'videoConversion',
|
||||
OBJECT_TAGGING = 'objectTagging',
|
||||
RECOGNIZE_FACES = 'recognizeFaces',
|
||||
CLIP_ENCODING = 'clipEncoding',
|
||||
BACKGROUND_TASK = 'backgroundTask',
|
||||
STORAGE_TEMPLATE_MIGRATION = 'storageTemplateMigration',
|
||||
SEARCH = 'search',
|
||||
SIDECAR = 'sidecar',
|
||||
}
|
||||
|
||||
export enum JobCommand {
|
||||
START = 'start',
|
||||
PAUSE = 'pause',
|
||||
RESUME = 'resume',
|
||||
EMPTY = 'empty',
|
||||
}
|
||||
|
||||
export enum JobName {
|
||||
// conversion
|
||||
QUEUE_VIDEO_CONVERSION = 'queue-video-conversion',
|
||||
VIDEO_CONVERSION = 'video-conversion',
|
||||
|
||||
// thumbnails
|
||||
QUEUE_GENERATE_THUMBNAILS = 'queue-generate-thumbnails',
|
||||
GENERATE_JPEG_THUMBNAIL = 'generate-jpeg-thumbnail',
|
||||
GENERATE_WEBP_THUMBNAIL = 'generate-webp-thumbnail',
|
||||
|
||||
// metadata
|
||||
QUEUE_METADATA_EXTRACTION = 'queue-metadata-extraction',
|
||||
METADATA_EXTRACTION = 'metadata-extraction',
|
||||
|
||||
// user deletion
|
||||
USER_DELETION = 'user-deletion',
|
||||
USER_DELETE_CHECK = 'user-delete-check',
|
||||
|
||||
// storage template
|
||||
STORAGE_TEMPLATE_MIGRATION = 'storage-template-migration',
|
||||
STORAGE_TEMPLATE_MIGRATION_SINGLE = 'storage-template-migration-single',
|
||||
SYSTEM_CONFIG_CHANGE = 'system-config-change',
|
||||
|
||||
// object tagging
|
||||
QUEUE_OBJECT_TAGGING = 'queue-object-tagging',
|
||||
CLASSIFY_IMAGE = 'classify-image',
|
||||
|
||||
// facial recognition
|
||||
QUEUE_RECOGNIZE_FACES = 'queue-recognize-faces',
|
||||
RECOGNIZE_FACES = 'recognize-faces',
|
||||
GENERATE_FACE_THUMBNAIL = 'generate-face-thumbnail',
|
||||
PERSON_CLEANUP = 'person-cleanup',
|
||||
|
||||
// cleanup
|
||||
DELETE_FILES = 'delete-files',
|
||||
|
||||
// search
|
||||
SEARCH_INDEX_ASSETS = 'search-index-assets',
|
||||
SEARCH_INDEX_ASSET = 'search-index-asset',
|
||||
SEARCH_INDEX_FACE = 'search-index-face',
|
||||
SEARCH_INDEX_FACES = 'search-index-faces',
|
||||
SEARCH_INDEX_ALBUMS = 'search-index-albums',
|
||||
SEARCH_INDEX_ALBUM = 'search-index-album',
|
||||
SEARCH_REMOVE_ALBUM = 'search-remove-album',
|
||||
SEARCH_REMOVE_ASSET = 'search-remove-asset',
|
||||
SEARCH_REMOVE_FACE = 'search-remove-face',
|
||||
|
||||
// clip
|
||||
QUEUE_ENCODE_CLIP = 'queue-clip-encode',
|
||||
ENCODE_CLIP = 'clip-encode',
|
||||
|
||||
// XMP sidecars
|
||||
QUEUE_SIDECAR = 'queue-sidecar',
|
||||
SIDECAR_DISCOVERY = 'sidecar-discovery',
|
||||
SIDECAR_SYNC = 'sidecar-sync',
|
||||
}
|
||||
|
||||
export const JOBS_ASSET_PAGINATION_SIZE = 1000;
|
||||
|
||||
export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
|
||||
// misc
|
||||
[JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.USER_DELETION]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK,
|
||||
|
||||
// conversion
|
||||
[JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
|
||||
[JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
|
||||
|
||||
// thumbnails
|
||||
[JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
|
||||
[JobName.GENERATE_JPEG_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION,
|
||||
[JobName.GENERATE_WEBP_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION,
|
||||
|
||||
// metadata
|
||||
[JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
|
||||
[JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
|
||||
|
||||
// storage template
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION,
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION,
|
||||
[JobName.SYSTEM_CONFIG_CHANGE]: QueueName.STORAGE_TEMPLATE_MIGRATION,
|
||||
|
||||
// object tagging
|
||||
[JobName.QUEUE_OBJECT_TAGGING]: QueueName.OBJECT_TAGGING,
|
||||
[JobName.CLASSIFY_IMAGE]: QueueName.OBJECT_TAGGING,
|
||||
|
||||
// facial recognition
|
||||
[JobName.QUEUE_RECOGNIZE_FACES]: QueueName.RECOGNIZE_FACES,
|
||||
[JobName.RECOGNIZE_FACES]: QueueName.RECOGNIZE_FACES,
|
||||
[JobName.GENERATE_FACE_THUMBNAIL]: QueueName.RECOGNIZE_FACES,
|
||||
|
||||
// clip
|
||||
[JobName.QUEUE_ENCODE_CLIP]: QueueName.CLIP_ENCODING,
|
||||
[JobName.ENCODE_CLIP]: QueueName.CLIP_ENCODING,
|
||||
|
||||
// search - albums
|
||||
[JobName.SEARCH_INDEX_ALBUMS]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_INDEX_ALBUM]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_REMOVE_ALBUM]: QueueName.SEARCH,
|
||||
|
||||
// search - assets
|
||||
[JobName.SEARCH_INDEX_ASSETS]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_INDEX_ASSET]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_REMOVE_ASSET]: QueueName.SEARCH,
|
||||
|
||||
// search - faces
|
||||
[JobName.SEARCH_INDEX_FACES]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_INDEX_FACE]: QueueName.SEARCH,
|
||||
[JobName.SEARCH_REMOVE_FACE]: QueueName.SEARCH,
|
||||
|
||||
// XMP sidecars
|
||||
[JobName.QUEUE_SIDECAR]: QueueName.SIDECAR,
|
||||
[JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR,
|
||||
[JobName.SIDECAR_SYNC]: QueueName.SIDECAR,
|
||||
};
|
||||
31
server/src/domain/job/job.interface.ts
Normal file
31
server/src/domain/job/job.interface.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { BoundingBox } from '../smart-info';
|
||||
|
||||
export interface IBaseJob {
|
||||
force?: boolean;
|
||||
}
|
||||
|
||||
export interface IAssetFaceJob extends IBaseJob {
|
||||
assetId: string;
|
||||
personId: string;
|
||||
}
|
||||
|
||||
export interface IFaceThumbnailJob extends IAssetFaceJob {
|
||||
imageWidth: number;
|
||||
imageHeight: number;
|
||||
boundingBox: BoundingBox;
|
||||
assetId: string;
|
||||
personId: string;
|
||||
}
|
||||
|
||||
export interface IEntityJob extends IBaseJob {
|
||||
id: string;
|
||||
source?: 'upload';
|
||||
}
|
||||
|
||||
export interface IBulkEntityJob extends IBaseJob {
|
||||
ids: string[];
|
||||
}
|
||||
|
||||
export interface IDeleteFilesJob extends IBaseJob {
|
||||
files: Array<string | null | undefined>;
|
||||
}
|
||||
96
server/src/domain/job/job.repository.ts
Normal file
96
server/src/domain/job/job.repository.ts
Normal file
@@ -0,0 +1,96 @@
|
||||
import { JobName, QueueName } from './job.constants';
|
||||
import {
|
||||
IAssetFaceJob,
|
||||
IBaseJob,
|
||||
IBulkEntityJob,
|
||||
IDeleteFilesJob,
|
||||
IEntityJob,
|
||||
IFaceThumbnailJob,
|
||||
} from './job.interface';
|
||||
|
||||
export interface JobCounts {
|
||||
active: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
delayed: number;
|
||||
waiting: number;
|
||||
paused: number;
|
||||
}
|
||||
|
||||
export interface QueueStatus {
|
||||
isActive: boolean;
|
||||
isPaused: boolean;
|
||||
}
|
||||
|
||||
export type JobItem =
|
||||
// Transcoding
|
||||
| { name: JobName.QUEUE_VIDEO_CONVERSION; data: IBaseJob }
|
||||
| { name: JobName.VIDEO_CONVERSION; data: IEntityJob }
|
||||
|
||||
// Thumbnails
|
||||
| { name: JobName.QUEUE_GENERATE_THUMBNAILS; data: IBaseJob }
|
||||
| { name: JobName.GENERATE_JPEG_THUMBNAIL; data: IEntityJob }
|
||||
| { name: JobName.GENERATE_WEBP_THUMBNAIL; data: IEntityJob }
|
||||
|
||||
// User Deletion
|
||||
| { name: JobName.USER_DELETE_CHECK; data?: IBaseJob }
|
||||
| { name: JobName.USER_DELETION; data: IEntityJob }
|
||||
|
||||
// Storage Template
|
||||
| { name: JobName.STORAGE_TEMPLATE_MIGRATION; data?: IBaseJob }
|
||||
| { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE; data: IEntityJob }
|
||||
| { name: JobName.SYSTEM_CONFIG_CHANGE; data?: IBaseJob }
|
||||
|
||||
// Metadata Extraction
|
||||
| { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob }
|
||||
| { name: JobName.METADATA_EXTRACTION; data: IEntityJob }
|
||||
|
||||
// Sidecar Scanning
|
||||
| { name: JobName.QUEUE_SIDECAR; data: IBaseJob }
|
||||
| { name: JobName.SIDECAR_DISCOVERY; data: IEntityJob }
|
||||
| { name: JobName.SIDECAR_SYNC; data: IEntityJob }
|
||||
|
||||
// Object Tagging
|
||||
| { name: JobName.QUEUE_OBJECT_TAGGING; data: IBaseJob }
|
||||
| { name: JobName.CLASSIFY_IMAGE; data: IEntityJob }
|
||||
|
||||
// Recognize Faces
|
||||
| { name: JobName.QUEUE_RECOGNIZE_FACES; data: IBaseJob }
|
||||
| { name: JobName.RECOGNIZE_FACES; data: IEntityJob }
|
||||
| { name: JobName.GENERATE_FACE_THUMBNAIL; data: IFaceThumbnailJob }
|
||||
|
||||
// Clip Embedding
|
||||
| { name: JobName.QUEUE_ENCODE_CLIP; data: IBaseJob }
|
||||
| { name: JobName.ENCODE_CLIP; data: IEntityJob }
|
||||
|
||||
// Filesystem
|
||||
| { name: JobName.DELETE_FILES; data: IDeleteFilesJob }
|
||||
|
||||
// Asset Deletion
|
||||
| { name: JobName.PERSON_CLEANUP; data?: IBaseJob }
|
||||
|
||||
// Search
|
||||
| { name: JobName.SEARCH_INDEX_ASSETS; data?: IBaseJob }
|
||||
| { name: JobName.SEARCH_INDEX_ASSET; data: IBulkEntityJob }
|
||||
| { name: JobName.SEARCH_INDEX_FACES; data?: IBaseJob }
|
||||
| { name: JobName.SEARCH_INDEX_FACE; data: IAssetFaceJob }
|
||||
| { name: JobName.SEARCH_INDEX_ALBUMS; data?: IBaseJob }
|
||||
| { name: JobName.SEARCH_INDEX_ALBUM; data: IBulkEntityJob }
|
||||
| { name: JobName.SEARCH_REMOVE_ASSET; data: IBulkEntityJob }
|
||||
| { name: JobName.SEARCH_REMOVE_ALBUM; data: IBulkEntityJob }
|
||||
| { name: JobName.SEARCH_REMOVE_FACE; data: IAssetFaceJob };
|
||||
|
||||
export type JobHandler<T = any> = (data: T) => boolean | Promise<boolean>;
|
||||
|
||||
export const IJobRepository = 'IJobRepository';
|
||||
|
||||
export interface IJobRepository {
|
||||
addHandler(queueName: QueueName, concurrency: number, handler: (job: JobItem) => Promise<void>): void;
|
||||
setConcurrency(queueName: QueueName, concurrency: number): void;
|
||||
queue(item: JobItem): Promise<void>;
|
||||
pause(name: QueueName): Promise<void>;
|
||||
resume(name: QueueName): Promise<void>;
|
||||
empty(name: QueueName): Promise<void>;
|
||||
getQueueStatus(name: QueueName): Promise<QueueStatus>;
|
||||
getJobCounts(name: QueueName): Promise<JobCounts>;
|
||||
}
|
||||
303
server/src/domain/job/job.service.spec.ts
Normal file
303
server/src/domain/job/job.service.spec.ts
Normal file
@@ -0,0 +1,303 @@
|
||||
import { SystemConfig } from '@app/infra/entities';
|
||||
import { BadRequestException } from '@nestjs/common';
|
||||
import {
|
||||
asyncTick,
|
||||
newAssetRepositoryMock,
|
||||
newCommunicationRepositoryMock,
|
||||
newJobRepositoryMock,
|
||||
newSystemConfigRepositoryMock,
|
||||
} from '@test';
|
||||
import { IAssetRepository } from '../asset';
|
||||
import { ICommunicationRepository } from '../communication';
|
||||
import { IJobRepository, JobCommand, JobHandler, JobItem, JobName, JobService, QueueName } from '.';
|
||||
import { ISystemConfigRepository } from '../system-config';
|
||||
import { SystemConfigCore } from '../system-config/system-config.core';
|
||||
|
||||
const makeMockHandlers = (success: boolean) => {
|
||||
const mock = jest.fn().mockResolvedValue(success);
|
||||
return Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record<
|
||||
JobName,
|
||||
JobHandler
|
||||
>;
|
||||
};
|
||||
|
||||
describe(JobService.name, () => {
|
||||
let sut: JobService;
|
||||
let assetMock: jest.Mocked<IAssetRepository>;
|
||||
let configMock: jest.Mocked<ISystemConfigRepository>;
|
||||
let communicationMock: jest.Mocked<ICommunicationRepository>;
|
||||
let jobMock: jest.Mocked<IJobRepository>;
|
||||
|
||||
beforeEach(async () => {
|
||||
assetMock = newAssetRepositoryMock();
|
||||
configMock = newSystemConfigRepositoryMock();
|
||||
communicationMock = newCommunicationRepositoryMock();
|
||||
jobMock = newJobRepositoryMock();
|
||||
sut = new JobService(assetMock, communicationMock, jobMock, configMock);
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
expect(sut).toBeDefined();
|
||||
});
|
||||
|
||||
describe('handleNightlyJobs', () => {
|
||||
it('should run the scheduled jobs', async () => {
|
||||
await sut.handleNightlyJobs();
|
||||
|
||||
expect(jobMock.queue.mock.calls).toEqual([
|
||||
[{ name: JobName.USER_DELETE_CHECK }],
|
||||
[{ name: JobName.PERSON_CLEANUP }],
|
||||
[{ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } }],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getAllJobStatus', () => {
|
||||
it('should get all job statuses', async () => {
|
||||
jobMock.getJobCounts.mockResolvedValue({
|
||||
active: 1,
|
||||
completed: 1,
|
||||
failed: 1,
|
||||
delayed: 1,
|
||||
waiting: 1,
|
||||
paused: 1,
|
||||
});
|
||||
jobMock.getQueueStatus.mockResolvedValue({
|
||||
isActive: true,
|
||||
isPaused: true,
|
||||
});
|
||||
|
||||
const expectedJobStatus = {
|
||||
jobCounts: {
|
||||
active: 1,
|
||||
completed: 1,
|
||||
delayed: 1,
|
||||
failed: 1,
|
||||
waiting: 1,
|
||||
paused: 1,
|
||||
},
|
||||
queueStatus: {
|
||||
isActive: true,
|
||||
isPaused: true,
|
||||
},
|
||||
};
|
||||
|
||||
await expect(sut.getAllJobsStatus()).resolves.toEqual({
|
||||
[QueueName.BACKGROUND_TASK]: expectedJobStatus,
|
||||
[QueueName.CLIP_ENCODING]: expectedJobStatus,
|
||||
[QueueName.METADATA_EXTRACTION]: expectedJobStatus,
|
||||
[QueueName.OBJECT_TAGGING]: expectedJobStatus,
|
||||
[QueueName.SEARCH]: expectedJobStatus,
|
||||
[QueueName.STORAGE_TEMPLATE_MIGRATION]: expectedJobStatus,
|
||||
[QueueName.THUMBNAIL_GENERATION]: expectedJobStatus,
|
||||
[QueueName.VIDEO_CONVERSION]: expectedJobStatus,
|
||||
[QueueName.RECOGNIZE_FACES]: expectedJobStatus,
|
||||
[QueueName.SIDECAR]: expectedJobStatus,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('handleCommand', () => {
|
||||
it('should handle a pause command', async () => {
|
||||
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.PAUSE, force: false });
|
||||
|
||||
expect(jobMock.pause).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
||||
});
|
||||
|
||||
it('should handle a resume command', async () => {
|
||||
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.RESUME, force: false });
|
||||
|
||||
expect(jobMock.resume).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
||||
});
|
||||
|
||||
it('should handle an empty command', async () => {
|
||||
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.EMPTY, force: false });
|
||||
|
||||
expect(jobMock.empty).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION);
|
||||
});
|
||||
|
||||
it('should not start a job that is already running', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false });
|
||||
|
||||
await expect(
|
||||
sut.handleCommand(QueueName.VIDEO_CONVERSION, { command: JobCommand.START, force: false }),
|
||||
).rejects.toBeInstanceOf(BadRequestException);
|
||||
|
||||
expect(jobMock.queue).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should handle a start video conversion command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.VIDEO_CONVERSION, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start storage template migration command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.STORAGE_TEMPLATE_MIGRATION, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.STORAGE_TEMPLATE_MIGRATION });
|
||||
});
|
||||
|
||||
it('should handle a start object tagging command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.OBJECT_TAGGING, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_OBJECT_TAGGING, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start clip encoding command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.CLIP_ENCODING, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_ENCODE_CLIP, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start metadata extraction command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.METADATA_EXTRACTION, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start sidecar command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.SIDECAR, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_SIDECAR, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start thumbnail generation command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.THUMBNAIL_GENERATION, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should handle a start recognize faces command', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.handleCommand(QueueName.RECOGNIZE_FACES, { command: JobCommand.START, force: false });
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_RECOGNIZE_FACES, data: { force: false } });
|
||||
});
|
||||
|
||||
it('should throw a bad request when an invalid queue is used', async () => {
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await expect(
|
||||
sut.handleCommand(QueueName.BACKGROUND_TASK, { command: JobCommand.START, force: false }),
|
||||
).rejects.toBeInstanceOf(BadRequestException);
|
||||
|
||||
expect(jobMock.queue).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('registerHandlers', () => {
|
||||
it('should register a handler for each queue', async () => {
|
||||
await sut.registerHandlers(makeMockHandlers(true));
|
||||
expect(configMock.load).toHaveBeenCalled();
|
||||
expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length);
|
||||
});
|
||||
|
||||
it('should subscribe to config changes', async () => {
|
||||
await sut.registerHandlers(makeMockHandlers(false));
|
||||
|
||||
const configCore = new SystemConfigCore(newSystemConfigRepositoryMock());
|
||||
configCore.config$.next({
|
||||
job: {
|
||||
[QueueName.BACKGROUND_TASK]: { concurrency: 10 },
|
||||
[QueueName.CLIP_ENCODING]: { concurrency: 10 },
|
||||
[QueueName.METADATA_EXTRACTION]: { concurrency: 10 },
|
||||
[QueueName.OBJECT_TAGGING]: { concurrency: 10 },
|
||||
[QueueName.RECOGNIZE_FACES]: { concurrency: 10 },
|
||||
[QueueName.SEARCH]: { concurrency: 10 },
|
||||
[QueueName.SIDECAR]: { concurrency: 10 },
|
||||
[QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 10 },
|
||||
[QueueName.THUMBNAIL_GENERATION]: { concurrency: 10 },
|
||||
[QueueName.VIDEO_CONVERSION]: { concurrency: 10 },
|
||||
},
|
||||
} as SystemConfig);
|
||||
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.BACKGROUND_TASK, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.CLIP_ENCODING, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.OBJECT_TAGGING, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.RECOGNIZE_FACES, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.SIDECAR, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.STORAGE_TEMPLATE_MIGRATION, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.THUMBNAIL_GENERATION, 10);
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.VIDEO_CONVERSION, 10);
|
||||
});
|
||||
|
||||
const tests: Array<{ item: JobItem; jobs: JobName[] }> = [
|
||||
{
|
||||
item: { name: JobName.SIDECAR_SYNC, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.METADATA_EXTRACTION],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.METADATA_EXTRACTION],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, JobName.SEARCH_INDEX_ASSET],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } },
|
||||
jobs: [JobName.GENERATE_JPEG_THUMBNAIL],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1' } },
|
||||
jobs: [],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.GENERATE_WEBP_THUMBNAIL, JobName.CLASSIFY_IMAGE, JobName.ENCODE_CLIP, JobName.RECOGNIZE_FACES],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.CLASSIFY_IMAGE, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.SEARCH_INDEX_ASSET],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.ENCODE_CLIP, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.SEARCH_INDEX_ASSET],
|
||||
},
|
||||
{
|
||||
item: { name: JobName.RECOGNIZE_FACES, data: { id: 'asset-1' } },
|
||||
jobs: [JobName.SEARCH_INDEX_ASSET],
|
||||
},
|
||||
];
|
||||
|
||||
for (const { item, jobs } of tests) {
|
||||
it(`should queue ${jobs.length} jobs when a ${item.name} job finishes successfully`, async () => {
|
||||
assetMock.getByIds.mockResolvedValue([]);
|
||||
|
||||
await sut.registerHandlers(makeMockHandlers(true));
|
||||
await jobMock.addHandler.mock.calls[0][2](item);
|
||||
await asyncTick(3);
|
||||
|
||||
expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length);
|
||||
for (const jobName of jobs) {
|
||||
expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() });
|
||||
}
|
||||
});
|
||||
|
||||
it(`should not queue any jobs when ${item.name} finishes with 'false'`, async () => {
|
||||
await sut.registerHandlers(makeMockHandlers(false));
|
||||
await jobMock.addHandler.mock.calls[0][2](item);
|
||||
await asyncTick(3);
|
||||
|
||||
expect(jobMock.queue).not.toHaveBeenCalled();
|
||||
});
|
||||
}
|
||||
});
|
||||
});
|
||||
179
server/src/domain/job/job.service.ts
Normal file
179
server/src/domain/job/job.service.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { IAssetRepository, mapAsset } from '../asset';
|
||||
import { CommunicationEvent, ICommunicationRepository } from '../communication';
|
||||
import { assertMachineLearningEnabled } from '../domain.constant';
|
||||
import { ISystemConfigRepository } from '../system-config';
|
||||
import { SystemConfigCore } from '../system-config/system-config.core';
|
||||
import { JobCommandDto } from './dto';
|
||||
import { JobCommand, JobName, QueueName } from './job.constants';
|
||||
import { IJobRepository, JobHandler, JobItem } from './job.repository';
|
||||
import { AllJobStatusResponseDto, JobStatusDto } from './response-dto';
|
||||
|
||||
@Injectable()
|
||||
export class JobService {
|
||||
private logger = new Logger(JobService.name);
|
||||
private configCore: SystemConfigCore;
|
||||
|
||||
constructor(
|
||||
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
|
||||
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
|
||||
@Inject(IJobRepository) private jobRepository: IJobRepository,
|
||||
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
|
||||
) {
|
||||
this.configCore = new SystemConfigCore(configRepository);
|
||||
}
|
||||
|
||||
handleCommand(queueName: QueueName, dto: JobCommandDto): Promise<void> {
|
||||
this.logger.debug(`Handling command: queue=${queueName},force=${dto.force}`);
|
||||
|
||||
switch (dto.command) {
|
||||
case JobCommand.START:
|
||||
return this.start(queueName, dto);
|
||||
|
||||
case JobCommand.PAUSE:
|
||||
return this.jobRepository.pause(queueName);
|
||||
|
||||
case JobCommand.RESUME:
|
||||
return this.jobRepository.resume(queueName);
|
||||
|
||||
case JobCommand.EMPTY:
|
||||
return this.jobRepository.empty(queueName);
|
||||
}
|
||||
}
|
||||
|
||||
async getJobStatus(queueName: QueueName): Promise<JobStatusDto> {
|
||||
const [jobCounts, queueStatus] = await Promise.all([
|
||||
this.jobRepository.getJobCounts(queueName),
|
||||
this.jobRepository.getQueueStatus(queueName),
|
||||
]);
|
||||
|
||||
return { jobCounts, queueStatus };
|
||||
}
|
||||
|
||||
async getAllJobsStatus(): Promise<AllJobStatusResponseDto> {
|
||||
const response = new AllJobStatusResponseDto();
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
response[queueName] = await this.getJobStatus(queueName);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
|
||||
const { isActive } = await this.jobRepository.getQueueStatus(name);
|
||||
if (isActive) {
|
||||
throw new BadRequestException(`Job is already running`);
|
||||
}
|
||||
|
||||
switch (name) {
|
||||
case QueueName.VIDEO_CONVERSION:
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force } });
|
||||
|
||||
case QueueName.STORAGE_TEMPLATE_MIGRATION:
|
||||
return this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION });
|
||||
|
||||
case QueueName.OBJECT_TAGGING:
|
||||
assertMachineLearningEnabled();
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_OBJECT_TAGGING, data: { force } });
|
||||
|
||||
case QueueName.CLIP_ENCODING:
|
||||
assertMachineLearningEnabled();
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_ENCODE_CLIP, data: { force } });
|
||||
|
||||
case QueueName.METADATA_EXTRACTION:
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force } });
|
||||
|
||||
case QueueName.SIDECAR:
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_SIDECAR, data: { force } });
|
||||
|
||||
case QueueName.THUMBNAIL_GENERATION:
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force } });
|
||||
|
||||
case QueueName.RECOGNIZE_FACES:
|
||||
return this.jobRepository.queue({ name: JobName.QUEUE_RECOGNIZE_FACES, data: { force } });
|
||||
|
||||
default:
|
||||
throw new BadRequestException(`Invalid job name: ${name}`);
|
||||
}
|
||||
}
|
||||
|
||||
async registerHandlers(jobHandlers: Record<JobName, JobHandler>) {
|
||||
const config = await this.configCore.getConfig();
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
const concurrency = config.job[queueName].concurrency;
|
||||
this.logger.debug(`Registering ${queueName} with a concurrency of ${concurrency}`);
|
||||
this.jobRepository.addHandler(queueName, concurrency, async (item: JobItem): Promise<void> => {
|
||||
const { name, data } = item;
|
||||
|
||||
try {
|
||||
const handler = jobHandlers[name];
|
||||
const success = await handler(data);
|
||||
if (success) {
|
||||
await this.onDone(item);
|
||||
}
|
||||
} catch (error: Error | any) {
|
||||
this.logger.error(`Unable to run job handler: ${error}`, error?.stack, data);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.configCore.config$.subscribe((config) => {
|
||||
this.logger.log(`Updating queue concurrency settings`);
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
const concurrency = config.job[queueName].concurrency;
|
||||
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
|
||||
this.jobRepository.setConcurrency(queueName, concurrency);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async handleNightlyJobs() {
|
||||
await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK });
|
||||
await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP });
|
||||
await this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue follow up jobs
|
||||
*/
|
||||
private async onDone(item: JobItem) {
|
||||
switch (item.name) {
|
||||
case JobName.SIDECAR_SYNC:
|
||||
case JobName.SIDECAR_DISCOVERY:
|
||||
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: item.data });
|
||||
break;
|
||||
|
||||
case JobName.METADATA_EXTRACTION:
|
||||
await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: item.data });
|
||||
break;
|
||||
|
||||
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE:
|
||||
if (item.data.source === 'upload') {
|
||||
await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: item.data });
|
||||
}
|
||||
break;
|
||||
|
||||
case JobName.GENERATE_JPEG_THUMBNAIL: {
|
||||
await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data });
|
||||
await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: item.data });
|
||||
await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data });
|
||||
await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data });
|
||||
|
||||
const [asset] = await this.assetRepository.getByIds([item.data.id]);
|
||||
if (asset) {
|
||||
this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// In addition to the above jobs, all of these should queue `SEARCH_INDEX_ASSET`
|
||||
switch (item.name) {
|
||||
case JobName.CLASSIFY_IMAGE:
|
||||
case JobName.ENCODE_CLIP:
|
||||
case JobName.RECOGNIZE_FACES:
|
||||
case JobName.METADATA_EXTRACTION:
|
||||
await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [item.data.id] } });
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
import { ApiProperty } from '@nestjs/swagger';
|
||||
import { QueueName } from '../job.constants';
|
||||
|
||||
export class JobCountsDto {
|
||||
@ApiProperty({ type: 'integer' })
|
||||
active!: number;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
completed!: number;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
failed!: number;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
delayed!: number;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
waiting!: number;
|
||||
@ApiProperty({ type: 'integer' })
|
||||
paused!: number;
|
||||
}
|
||||
|
||||
export class QueueStatusDto {
|
||||
isActive!: boolean;
|
||||
isPaused!: boolean;
|
||||
}
|
||||
|
||||
export class JobStatusDto {
|
||||
@ApiProperty({ type: JobCountsDto })
|
||||
jobCounts!: JobCountsDto;
|
||||
|
||||
@ApiProperty({ type: QueueStatusDto })
|
||||
queueStatus!: QueueStatusDto;
|
||||
}
|
||||
|
||||
export class AllJobStatusResponseDto implements Record<QueueName, JobStatusDto> {
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.THUMBNAIL_GENERATION]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.METADATA_EXTRACTION]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.VIDEO_CONVERSION]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.OBJECT_TAGGING]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.CLIP_ENCODING]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.STORAGE_TEMPLATE_MIGRATION]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.BACKGROUND_TASK]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.SEARCH]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.RECOGNIZE_FACES]!: JobStatusDto;
|
||||
|
||||
@ApiProperty({ type: JobStatusDto })
|
||||
[QueueName.SIDECAR]!: JobStatusDto;
|
||||
}
|
||||
1
server/src/domain/job/response-dto/index.ts
Normal file
1
server/src/domain/job/response-dto/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './all-job-status-response.dto';
|
||||
Reference in New Issue
Block a user