feat(server): dynamic job concurrency (#2622)

* feat(server): dynamic job concurrency

* styling and add setting info to top of the job list

* regenerate api

* remove DETECT_OBJECT job

---------

Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
Jason Rasmussen
2023-06-01 06:32:51 -04:00
committed by GitHub
parent 656dc08406
commit 2493dfaba3
48 changed files with 1454 additions and 490 deletions

View File

@@ -1,14 +1,14 @@
export enum QueueName {
THUMBNAIL_GENERATION = 'thumbnail-generation-queue',
METADATA_EXTRACTION = 'metadata-extraction-queue',
VIDEO_CONVERSION = 'video-conversion-queue',
OBJECT_TAGGING = 'object-tagging-queue',
RECOGNIZE_FACES = 'recognize-faces-queue',
CLIP_ENCODING = 'clip-encoding-queue',
BACKGROUND_TASK = 'background-task-queue',
STORAGE_TEMPLATE_MIGRATION = 'storage-template-migration-queue',
SEARCH = 'search-queue',
SIDECAR = 'sidecar-queue',
THUMBNAIL_GENERATION = 'thumbnailGeneration',
METADATA_EXTRACTION = 'metadataExtraction',
VIDEO_CONVERSION = 'videoConversion',
OBJECT_TAGGING = 'objectTagging',
RECOGNIZE_FACES = 'recognizeFaces',
CLIP_ENCODING = 'clipEncoding',
BACKGROUND_TASK = 'backgroundTask',
STORAGE_TEMPLATE_MIGRATION = 'storageTemplateMigration',
SEARCH = 'search',
SIDECAR = 'sidecar',
}
export enum JobCommand {
@@ -135,17 +135,3 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
[JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR,
[JobName.SIDECAR_SYNC]: QueueName.SIDECAR,
};
// max concurrency for each queue (total concurrency across all jobs)
export const QUEUE_TO_CONCURRENCY: Record<QueueName, number> = {
[QueueName.BACKGROUND_TASK]: 5,
[QueueName.CLIP_ENCODING]: 2,
[QueueName.METADATA_EXTRACTION]: 5,
[QueueName.OBJECT_TAGGING]: 2,
[QueueName.RECOGNIZE_FACES]: 2,
[QueueName.SEARCH]: 5,
[QueueName.SIDECAR]: 5,
[QueueName.STORAGE_TEMPLATE_MIGRATION]: 5,
[QueueName.THUMBNAIL_GENERATION]: 5,
[QueueName.VIDEO_CONVERSION]: 1,
};

View File

@@ -33,13 +33,13 @@ export type JobItem =
| { name: JobName.GENERATE_WEBP_THUMBNAIL; data: IEntityJob }
// User Deletion
| { name: JobName.USER_DELETE_CHECK }
| { name: JobName.USER_DELETE_CHECK; data?: IBaseJob }
| { name: JobName.USER_DELETION; data: IEntityJob }
// Storage Template
| { name: JobName.STORAGE_TEMPLATE_MIGRATION }
| { name: JobName.STORAGE_TEMPLATE_MIGRATION; data?: IBaseJob }
| { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE; data: IEntityJob }
| { name: JobName.SYSTEM_CONFIG_CHANGE }
| { name: JobName.SYSTEM_CONFIG_CHANGE; data?: IBaseJob }
// Metadata Extraction
| { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob }
@@ -67,22 +67,26 @@ export type JobItem =
| { name: JobName.DELETE_FILES; data: IDeleteFilesJob }
// Asset Deletion
| { name: JobName.PERSON_CLEANUP }
| { name: JobName.PERSON_CLEANUP; data?: IBaseJob }
// Search
| { name: JobName.SEARCH_INDEX_ASSETS }
| { name: JobName.SEARCH_INDEX_ASSETS; data?: IBaseJob }
| { name: JobName.SEARCH_INDEX_ASSET; data: IBulkEntityJob }
| { name: JobName.SEARCH_INDEX_FACES }
| { name: JobName.SEARCH_INDEX_FACES; data?: IBaseJob }
| { name: JobName.SEARCH_INDEX_FACE; data: IAssetFaceJob }
| { name: JobName.SEARCH_INDEX_ALBUMS }
| { name: JobName.SEARCH_INDEX_ALBUMS; data?: IBaseJob }
| { name: JobName.SEARCH_INDEX_ALBUM; data: IBulkEntityJob }
| { name: JobName.SEARCH_REMOVE_ASSET; data: IBulkEntityJob }
| { name: JobName.SEARCH_REMOVE_ALBUM; data: IBulkEntityJob }
| { name: JobName.SEARCH_REMOVE_FACE; data: IAssetFaceJob };
export type JobHandler<T = any> = (data: T) => boolean | Promise<boolean>;
export const IJobRepository = 'IJobRepository';
export interface IJobRepository {
addHandler(queueName: QueueName, concurrency: number, handler: (job: JobItem) => Promise<void>): void;
setConcurrency(queueName: QueueName, concurrency: number): void;
queue(item: JobItem): Promise<void>;
pause(name: QueueName): Promise<void>;
resume(name: QueueName): Promise<void>;

View File

@@ -1,20 +1,28 @@
import { BadRequestException } from '@nestjs/common';
import { newAssetRepositoryMock, newCommunicationRepositoryMock, newJobRepositoryMock } from '../../test';
import {
newAssetRepositoryMock,
newCommunicationRepositoryMock,
newJobRepositoryMock,
newSystemConfigRepositoryMock,
} from '../../test';
import { IAssetRepository } from '../asset';
import { ICommunicationRepository } from '../communication';
import { IJobRepository, JobCommand, JobName, JobService, QueueName } from '../job';
import { IJobRepository, JobCommand, JobHandler, JobName, JobService, QueueName } from '../job';
import { ISystemConfigRepository } from '../system-config';
describe(JobService.name, () => {
let sut: JobService;
let assetMock: jest.Mocked<IAssetRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let jobMock: jest.Mocked<IJobRepository>;
beforeEach(async () => {
assetMock = newAssetRepositoryMock();
configMock = newSystemConfigRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
jobMock = newJobRepositoryMock();
sut = new JobService(assetMock, communicationMock, jobMock);
sut = new JobService(assetMock, communicationMock, jobMock, configMock);
});
it('should work', () => {
@@ -64,16 +72,16 @@ describe(JobService.name, () => {
};
await expect(sut.getAllJobsStatus()).resolves.toEqual({
'background-task-queue': expectedJobStatus,
'clip-encoding-queue': expectedJobStatus,
'metadata-extraction-queue': expectedJobStatus,
'object-tagging-queue': expectedJobStatus,
'search-queue': expectedJobStatus,
'storage-template-migration-queue': expectedJobStatus,
'thumbnail-generation-queue': expectedJobStatus,
'video-conversion-queue': expectedJobStatus,
'recognize-faces-queue': expectedJobStatus,
'sidecar-queue': expectedJobStatus,
[QueueName.BACKGROUND_TASK]: expectedJobStatus,
[QueueName.CLIP_ENCODING]: expectedJobStatus,
[QueueName.METADATA_EXTRACTION]: expectedJobStatus,
[QueueName.OBJECT_TAGGING]: expectedJobStatus,
[QueueName.SEARCH]: expectedJobStatus,
[QueueName.STORAGE_TEMPLATE_MIGRATION]: expectedJobStatus,
[QueueName.THUMBNAIL_GENERATION]: expectedJobStatus,
[QueueName.VIDEO_CONVERSION]: expectedJobStatus,
[QueueName.RECOGNIZE_FACES]: expectedJobStatus,
[QueueName.SIDECAR]: expectedJobStatus,
});
});
});
@@ -147,6 +155,14 @@ describe(JobService.name, () => {
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force: false } });
});
it('should handle a start sidecar command', async () => {
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.SIDECAR, { command: JobCommand.START, force: false });
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_SIDECAR, data: { force: false } });
});
it('should handle a start thumbnail generation command', async () => {
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
@@ -155,6 +171,14 @@ describe(JobService.name, () => {
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
});
it('should handle a start recognize faces command', async () => {
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.RECOGNIZE_FACES, { command: JobCommand.START, force: false });
expect(jobMock.queue).toHaveBeenCalledWith({ name: JobName.QUEUE_RECOGNIZE_FACES, data: { force: false } });
});
it('should throw a bad request when an invalid queue is used', async () => {
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
@@ -165,4 +189,19 @@ describe(JobService.name, () => {
expect(jobMock.queue).not.toHaveBeenCalled();
});
});
describe('registerHandlers', () => {
it('should register a handler for each queue', async () => {
const mock = jest.fn();
const handlers = Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record<
JobName,
JobHandler
>;
await sut.registerHandlers(handlers);
expect(configMock.load).toHaveBeenCalled();
expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length);
});
});
});

View File

@@ -2,20 +2,26 @@ import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common'
import { IAssetRepository, mapAsset } from '../asset';
import { CommunicationEvent, ICommunicationRepository } from '../communication';
import { assertMachineLearningEnabled } from '../domain.constant';
import { ISystemConfigRepository } from '../system-config';
import { SystemConfigCore } from '../system-config/system-config.core';
import { JobCommandDto } from './dto';
import { JobCommand, JobName, QueueName } from './job.constants';
import { IJobRepository, JobItem } from './job.repository';
import { IJobRepository, JobHandler, JobItem } from './job.repository';
import { AllJobStatusResponseDto, JobStatusDto } from './response-dto';
@Injectable()
export class JobService {
private logger = new Logger(JobService.name);
private configCore: SystemConfigCore;
constructor(
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
) {}
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
) {
this.configCore = new SystemConfigCore(configRepository);
}
handleCommand(queueName: QueueName, dto: JobCommandDto): Promise<void> {
this.logger.debug(`Handling command: queue=${queueName},force=${dto.force}`);
@@ -90,6 +96,36 @@ export class JobService {
}
}
async registerHandlers(jobHandlers: Record<JobName, JobHandler>) {
const config = await this.configCore.getConfig();
for (const queueName of Object.values(QueueName)) {
const concurrency = config.job[queueName].concurrency;
this.logger.debug(`Registering ${queueName} with a concurrency of ${concurrency}`);
this.jobRepository.addHandler(queueName, concurrency, async (item: JobItem): Promise<void> => {
const { name, data } = item;
try {
const handler = jobHandlers[name];
const success = await handler(data);
if (success) {
await this.onDone(item);
}
} catch (error: Error | any) {
this.logger.error(`Unable to run job handler: ${error}`, error?.stack, data);
}
});
}
this.configCore.config$.subscribe((config) => {
this.logger.log(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
const concurrency = config.job[queueName].concurrency;
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
}
});
}
async handleNightlyJobs() {
await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK });
await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP });

View File

@@ -0,0 +1,73 @@
import { ApiProperty } from '@nestjs/swagger';
import { Type } from 'class-transformer';
import { IsInt, IsObject, IsPositive, ValidateNested } from 'class-validator';
import { QueueName } from '../../job';
export class JobSettingsDto {
@IsInt()
@IsPositive()
@ApiProperty({ type: 'integer' })
concurrency!: number;
}
export class SystemConfigJobDto implements Record<QueueName, JobSettingsDto> {
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.THUMBNAIL_GENERATION]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.METADATA_EXTRACTION]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.VIDEO_CONVERSION]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.OBJECT_TAGGING]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.CLIP_ENCODING]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.STORAGE_TEMPLATE_MIGRATION]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.BACKGROUND_TASK]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.SEARCH]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.RECOGNIZE_FACES]!: JobSettingsDto;
@ApiProperty({ type: JobSettingsDto })
@ValidateNested()
@IsObject()
@Type(() => JobSettingsDto)
[QueueName.SIDECAR]!: JobSettingsDto;
}

View File

@@ -1,6 +1,7 @@
import { SystemConfig } from '@app/infra/entities';
import { Type } from 'class-transformer';
import { IsObject, ValidateNested } from 'class-validator';
import { SystemConfigJobDto } from './system-config-job.dto';
import { SystemConfigFFmpegDto } from './system-config-ffmpeg.dto';
import { SystemConfigOAuthDto } from './system-config-oauth.dto';
import { SystemConfigPasswordLoginDto } from './system-config-password-login.dto';
@@ -26,6 +27,11 @@ export class SystemConfigDto {
@ValidateNested()
@IsObject()
storageTemplate!: SystemConfigStorageTemplateDto;
@Type(() => SystemConfigJobDto)
@ValidateNested()
@IsObject()
job!: SystemConfigJobDto;
}
export function mapConfig(config: SystemConfig): SystemConfigDto {

View File

@@ -1,13 +1,20 @@
import { SystemConfig, SystemConfigEntity, SystemConfigKey, TranscodePreset } from '@app/infra/entities';
import {
SystemConfig,
SystemConfigEntity,
SystemConfigKey,
SystemConfigValue,
TranscodePreset,
} from '@app/infra/entities';
import { BadRequestException, Injectable, Logger } from '@nestjs/common';
import * as _ from 'lodash';
import { Subject } from 'rxjs';
import { DeepPartial } from 'typeorm';
import { QueueName } from '../job/job.constants';
import { ISystemConfigRepository } from './system-config.repository';
export type SystemConfigValidator = (config: SystemConfig) => void | Promise<void>;
const defaults: SystemConfig = Object.freeze({
const defaults = Object.freeze<SystemConfig>({
ffmpeg: {
crf: 23,
threads: 0,
@@ -19,6 +26,18 @@ const defaults: SystemConfig = Object.freeze({
twoPass: false,
transcode: TranscodePreset.REQUIRED,
},
job: {
[QueueName.BACKGROUND_TASK]: { concurrency: 5 },
[QueueName.CLIP_ENCODING]: { concurrency: 2 },
[QueueName.METADATA_EXTRACTION]: { concurrency: 5 },
[QueueName.OBJECT_TAGGING]: { concurrency: 2 },
[QueueName.RECOGNIZE_FACES]: { concurrency: 2 },
[QueueName.SEARCH]: { concurrency: 5 },
[QueueName.SIDECAR]: { concurrency: 5 },
[QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 5 },
[QueueName.THUMBNAIL_GENERATION]: { concurrency: 5 },
[QueueName.VIDEO_CONVERSION]: { concurrency: 1 },
},
oauth: {
enabled: false,
issuerUrl: '',
@@ -85,7 +104,7 @@ export class SystemConfigCore {
for (const key of Object.values(SystemConfigKey)) {
// get via dot notation
const item = { key, value: _.get(config, key) };
const item = { key, value: _.get(config, key) as SystemConfigValue };
const defaultValue = _.get(defaults, key);
const isMissing = !_.has(config, key);

View File

@@ -1,7 +1,7 @@
import { SystemConfigEntity, SystemConfigKey, TranscodePreset } from '@app/infra/entities';
import { SystemConfig, SystemConfigEntity, SystemConfigKey, TranscodePreset } from '@app/infra/entities';
import { BadRequestException } from '@nestjs/common';
import { newJobRepositoryMock, newSystemConfigRepositoryMock, systemConfigStub } from '../../test';
import { IJobRepository, JobName } from '../job';
import { IJobRepository, JobName, QueueName } from '../job';
import { SystemConfigValidator } from './system-config.core';
import { ISystemConfigRepository } from './system-config.repository';
import { SystemConfigService } from './system-config.service';
@@ -11,7 +11,19 @@ const updates: SystemConfigEntity[] = [
{ key: SystemConfigKey.OAUTH_AUTO_LAUNCH, value: true },
];
const updatedConfig = Object.freeze({
const updatedConfig = Object.freeze<SystemConfig>({
job: {
[QueueName.BACKGROUND_TASK]: { concurrency: 5 },
[QueueName.CLIP_ENCODING]: { concurrency: 2 },
[QueueName.METADATA_EXTRACTION]: { concurrency: 5 },
[QueueName.OBJECT_TAGGING]: { concurrency: 2 },
[QueueName.RECOGNIZE_FACES]: { concurrency: 2 },
[QueueName.SEARCH]: { concurrency: 5 },
[QueueName.SIDECAR]: { concurrency: 5 },
[QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 5 },
[QueueName.THUMBNAIL_GENERATION]: { concurrency: 5 },
[QueueName.VIDEO_CONVERSION]: { concurrency: 1 },
},
ffmpeg: {
crf: 30,
threads: 0,

View File

@@ -23,6 +23,7 @@ import {
AuthUserDto,
ExifResponseDto,
mapUser,
QueueName,
SearchResult,
SharedLinkResponseDto,
TagResponseDto,
@@ -531,6 +532,18 @@ export const systemConfigStub = {
twoPass: false,
transcode: TranscodePreset.REQUIRED,
},
job: {
[QueueName.BACKGROUND_TASK]: { concurrency: 5 },
[QueueName.CLIP_ENCODING]: { concurrency: 2 },
[QueueName.METADATA_EXTRACTION]: { concurrency: 5 },
[QueueName.OBJECT_TAGGING]: { concurrency: 2 },
[QueueName.RECOGNIZE_FACES]: { concurrency: 2 },
[QueueName.SEARCH]: { concurrency: 5 },
[QueueName.SIDECAR]: { concurrency: 5 },
[QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 5 },
[QueueName.THUMBNAIL_GENERATION]: { concurrency: 5 },
[QueueName.VIDEO_CONVERSION]: { concurrency: 1 },
},
oauth: {
autoLaunch: false,
autoRegister: true,

View File

@@ -2,6 +2,8 @@ import { IJobRepository } from '../src';
export const newJobRepositoryMock = (): jest.Mocked<IJobRepository> => {
return {
addHandler: jest.fn(),
setConcurrency: jest.fn(),
empty: jest.fn(),
pause: jest.fn(),
resume: jest.fn(),

View File

@@ -1,7 +1,8 @@
import { Column, Entity, PrimaryColumn } from 'typeorm';
import { QueueName } from '../../../domain/src';
@Entity('system_config')
export class SystemConfigEntity<T = string | boolean | number> {
export class SystemConfigEntity<T = SystemConfigValue> {
@PrimaryColumn()
key!: SystemConfigKey;
@@ -9,7 +10,7 @@ export class SystemConfigEntity<T = string | boolean | number> {
value!: T;
}
export type SystemConfigValue = any;
export type SystemConfigValue = string | number | boolean;
// dot notation matches path in `SystemConfig`
export enum SystemConfigKey {
@@ -22,6 +23,18 @@ export enum SystemConfigKey {
FFMPEG_MAX_BITRATE = 'ffmpeg.maxBitrate',
FFMPEG_TWO_PASS = 'ffmpeg.twoPass',
FFMPEG_TRANSCODE = 'ffmpeg.transcode',
JOB_THUMBNAIL_GENERATION_CONCURRENCY = 'job.thumbnailGeneration.concurrency',
JOB_METADATA_EXTRACTION_CONCURRENCY = 'job.metadataExtraction.concurrency',
JOB_VIDEO_CONVERSION_CONCURRENCY = 'job.videoConversion.concurrency',
JOB_OBJECT_TAGGING_CONCURRENCY = 'job.objectTagging.concurrency',
JOB_RECOGNIZE_FACES_CONCURRENCY = 'job.recognizeFaces.concurrency',
JOB_CLIP_ENCODING_CONCURRENCY = 'job.clipEncoding.concurrency',
JOB_BACKGROUND_TASK_CONCURRENCY = 'job.backgroundTask.concurrency',
JOB_STORAGE_TEMPLATE_MIGRATION_CONCURRENCY = 'job.storageTemplateMigration.concurrency',
JOB_SEARCH_CONCURRENCY = 'job.search.concurrency',
JOB_SIDECAR_CONCURRENCY = 'job.sidecar.concurrency',
OAUTH_ENABLED = 'oauth.enabled',
OAUTH_ISSUER_URL = 'oauth.issuerUrl',
OAUTH_CLIENT_ID = 'oauth.clientId',
@@ -32,7 +45,9 @@ export enum SystemConfigKey {
OAUTH_AUTO_REGISTER = 'oauth.autoRegister',
OAUTH_MOBILE_OVERRIDE_ENABLED = 'oauth.mobileOverrideEnabled',
OAUTH_MOBILE_REDIRECT_URI = 'oauth.mobileRedirectUri',
PASSWORD_LOGIN_ENABLED = 'passwordLogin.enabled',
STORAGE_TEMPLATE = 'storageTemplate.template',
}
@@ -55,6 +70,7 @@ export interface SystemConfig {
twoPass: boolean;
transcode: TranscodePreset;
};
job: Record<QueueName, { concurrency: number }>;
oauth: {
enabled: boolean;
issuerUrl: string;

View File

@@ -1,5 +1,6 @@
import { QueueName } from '@app/domain';
import { BullModuleOptions } from '@nestjs/bull';
import { RegisterQueueOptions } from '@nestjs/bullmq';
import { QueueOptions } from 'bullmq';
import { RedisOptions } from 'ioredis';
import { InitOptions } from 'local-reverse-geocoder';
import { ConfigurationOptions } from 'typesense/lib/Typesense/Configuration';
@@ -26,9 +27,9 @@ function parseRedisConfig(): RedisOptions {
export const redisConfig: RedisOptions = parseRedisConfig();
export const bullConfig: BullModuleOptions = {
export const bullConfig: QueueOptions = {
prefix: 'immich_bull',
redis: redisConfig,
connection: redisConfig,
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
@@ -36,7 +37,7 @@ export const bullConfig: BullModuleOptions = {
},
};
export const bullQueues: BullModuleOptions[] = Object.values(QueueName).map((name) => ({ name }));
export const bullQueues: RegisterQueueOptions[] = Object.values(QueueName).map((name) => ({ name }));
function parseTypeSenseConfig(): ConfigurationOptions {
const typesenseURL = process.env.TYPESENSE_URL;

View File

@@ -21,7 +21,7 @@ import {
IUserRepository,
IUserTokenRepository,
} from '@app/domain';
import { BullModule } from '@nestjs/bull';
import { BullModule } from '@nestjs/bullmq';
import { Global, Module, Provider } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';

View File

@@ -1,13 +1,33 @@
import { IJobRepository, JobCounts, JobItem, JobName, JOBS_TO_QUEUE, QueueName, QueueStatus } from '@app/domain';
import { getQueueToken } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { getQueueToken } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { JobOptions, Queue, type JobCounts as BullJobCounts } from 'bull';
import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import { bullConfig } from '../infra.config';
@Injectable()
export class JobRepository implements IJobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private logger = new Logger(JobRepository.name);
constructor(private moduleRef: ModuleRef) {}
addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise<void>) {
const workerHandler: Processor = async (job: Job) => handler(job as JobItem);
const workerOptions: WorkerOptions = { ...bullConfig, concurrency };
this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions);
}
setConcurrency(queueName: QueueName, concurrency: number) {
const worker = this.workers[queueName];
if (!worker) {
this.logger.warn(`Unable to set queue concurrency, worker not found: '${queueName}'`);
return;
}
worker.concurrency = concurrency;
}
async getQueueStatus(name: QueueName): Promise<QueueStatus> {
const queue = this.getQueue(name);
@@ -26,13 +46,18 @@ export class JobRepository implements IJobRepository {
}
empty(name: QueueName) {
return this.getQueue(name).empty();
return this.getQueue(name).drain();
}
getJobCounts(name: QueueName): Promise<JobCounts> {
// Typecast needed because the `paused` key is missing from Bull's
// type definition. Can be removed once fixed upstream.
return this.getQueue(name).getJobCounts() as Promise<BullJobCounts & { paused: number }>;
return this.getQueue(name).getJobCounts(
'active',
'completed',
'failed',
'delayed',
'waiting',
'paused',
) as unknown as Promise<JobCounts>;
}
async queue(item: JobItem): Promise<void> {
@@ -43,7 +68,7 @@ export class JobRepository implements IJobRepository {
await this.getQueue(JOBS_TO_QUEUE[jobName]).add(jobName, jobData, jobOptions);
}
private getJobOptions(item: JobItem): JobOptions | null {
private getJobOptions(item: JobItem): JobsOptions | null {
switch (item.name) {
case JobName.GENERATE_FACE_THUMBNAIL:
return { priority: 1 };

View File

@@ -9,7 +9,7 @@ export class SystemConfigRepository implements ISystemConfigRepository {
private repository: Repository<SystemConfigEntity>,
) {}
load(): Promise<SystemConfigEntity<string | boolean | number>[]> {
load(): Promise<SystemConfigEntity[]> {
return this.repository.find();
}