mirror of
https://github.com/KevinMidboe/immich.git
synced 2026-01-10 03:05:50 +00:00
chore(server) Add job for storage migration (#1117)
This commit is contained in:
@@ -15,6 +15,7 @@ import { CheckExistingAssetsResponseDto } from './response-dto/check-existing-as
|
||||
import { In } from 'typeorm/find-options/operator/In';
|
||||
import { UpdateAssetDto } from './dto/update-asset.dto';
|
||||
import { ITagRepository, TAG_REPOSITORY } from '../tag/tag.repository';
|
||||
import { IsNull } from 'typeorm';
|
||||
|
||||
export interface IAssetRepository {
|
||||
create(
|
||||
@@ -69,14 +70,14 @@ export class AssetRepository implements IAssetRepository {
|
||||
}
|
||||
|
||||
async getAssetWithNoThumbnail(): Promise<AssetEntity[]> {
|
||||
return await this.assetRepository
|
||||
.createQueryBuilder('asset')
|
||||
.where('asset.resizePath IS NULL')
|
||||
.andWhere('asset.isVisible = true')
|
||||
.orWhere('asset.resizePath = :resizePath', { resizePath: '' })
|
||||
.orWhere('asset.webpPath IS NULL')
|
||||
.orWhere('asset.webpPath = :webpPath', { webpPath: '' })
|
||||
.getMany();
|
||||
return await this.assetRepository.find({
|
||||
where: [
|
||||
{ resizePath: IsNull(), isVisible: true },
|
||||
{ resizePath: '', isVisible: true },
|
||||
{ webpPath: IsNull(), isVisible: true },
|
||||
{ webpPath: '', isVisible: true },
|
||||
],
|
||||
});
|
||||
}
|
||||
|
||||
async getAssetWithNoEXIF(): Promise<AssetEntity[]> {
|
||||
|
||||
@@ -7,13 +7,13 @@ import { BullModule } from '@nestjs/bull';
|
||||
import { BackgroundTaskModule } from '../../modules/background-task/background-task.module';
|
||||
import { BackgroundTaskService } from '../../modules/background-task/background-task.service';
|
||||
import { CommunicationModule } from '../communication/communication.module';
|
||||
import { QueueNameEnum } from '@app/job/constants/queue-name.constant';
|
||||
import { AssetRepository, ASSET_REPOSITORY } from './asset-repository';
|
||||
import { DownloadModule } from '../../modules/download/download.module';
|
||||
import { TagModule } from '../tag/tag.module';
|
||||
import { AlbumModule } from '../album/album.module';
|
||||
import { UserModule } from '../user/user.module';
|
||||
import { StorageModule } from '@app/storage';
|
||||
import { immichSharedQueues } from '@app/job/constants/bull-queue-registration.constant';
|
||||
|
||||
const ASSET_REPOSITORY_PROVIDER = {
|
||||
provide: ASSET_REPOSITORY,
|
||||
@@ -31,22 +31,7 @@ const ASSET_REPOSITORY_PROVIDER = {
|
||||
TagModule,
|
||||
StorageModule,
|
||||
forwardRef(() => AlbumModule),
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.ASSET_UPLOADED,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.VIDEO_CONVERSION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
BullModule.registerQueue(...immichSharedQueues),
|
||||
],
|
||||
controllers: [AssetController],
|
||||
providers: [AssetService, BackgroundTaskService, ASSET_REPOSITORY_PROVIDER],
|
||||
|
||||
@@ -6,6 +6,7 @@ export enum JobId {
|
||||
METADATA_EXTRACTION = 'metadata-extraction',
|
||||
VIDEO_CONVERSION = 'video-conversion',
|
||||
MACHINE_LEARNING = 'machine-learning',
|
||||
STORAGE_TEMPLATE_MIGRATION = 'storage-template-migration',
|
||||
}
|
||||
|
||||
export class GetJobDto {
|
||||
|
||||
@@ -6,13 +6,15 @@ import { ImmichJwtModule } from '../../modules/immich-jwt/immich-jwt.module';
|
||||
import { JwtModule } from '@nestjs/jwt';
|
||||
import { jwtConfig } from '../../config/jwt.config';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { BullModule } from '@nestjs/bull';
|
||||
import { QueueNameEnum } from '@app/job';
|
||||
import { ExifEntity } from '@app/database/entities/exif.entity';
|
||||
import { TagModule } from '../tag/tag.module';
|
||||
import { AssetModule } from '../asset/asset.module';
|
||||
import { UserModule } from '../user/user.module';
|
||||
|
||||
import { StorageModule } from '@app/storage';
|
||||
import { BullModule } from '@nestjs/bull';
|
||||
import { immichSharedQueues } from '@app/job/constants/bull-queue-registration.constant';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TypeOrmModule.forFeature([ExifEntity]),
|
||||
@@ -21,56 +23,8 @@ import { UserModule } from '../user/user.module';
|
||||
AssetModule,
|
||||
UserModule,
|
||||
JwtModule.register(jwtConfig),
|
||||
BullModule.registerQueue(
|
||||
{
|
||||
name: QueueNameEnum.THUMBNAIL_GENERATION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.ASSET_UPLOADED,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.METADATA_EXTRACTION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.VIDEO_CONVERSION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.CHECKSUM_GENERATION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.MACHINE_LEARNING,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
),
|
||||
StorageModule,
|
||||
BullModule.registerQueue(...immichSharedQueues),
|
||||
],
|
||||
controllers: [JobController],
|
||||
providers: [JobService, ImmichJwtService],
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
IVideoTranscodeJob,
|
||||
MachineLearningJobNameEnum,
|
||||
QueueNameEnum,
|
||||
templateMigrationProcessorName,
|
||||
videoMetadataExtractionProcessorName,
|
||||
} from '@app/job';
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
@@ -18,6 +19,7 @@ import { AssetType } from '@app/database/entities/asset.entity';
|
||||
import { GetJobDto, JobId } from './dto/get-job.dto';
|
||||
import { JobStatusResponseDto } from './response-dto/job-status-response.dto';
|
||||
import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interface';
|
||||
import { StorageService } from '@app/storage';
|
||||
|
||||
@Injectable()
|
||||
export class JobService {
|
||||
@@ -34,12 +36,18 @@ export class JobService {
|
||||
@InjectQueue(QueueNameEnum.MACHINE_LEARNING)
|
||||
private machineLearningQueue: Queue<IMachineLearningJob>,
|
||||
|
||||
@InjectQueue(QueueNameEnum.STORAGE_MIGRATION)
|
||||
private storageMigrationQueue: Queue,
|
||||
|
||||
@Inject(ASSET_REPOSITORY)
|
||||
private _assetRepository: IAssetRepository,
|
||||
|
||||
private storageService: StorageService,
|
||||
) {
|
||||
this.thumbnailGeneratorQueue.empty();
|
||||
this.metadataExtractionQueue.empty();
|
||||
this.videoConversionQueue.empty();
|
||||
this.storageMigrationQueue.empty();
|
||||
}
|
||||
|
||||
async startJob(jobDto: GetJobDto): Promise<number> {
|
||||
@@ -52,6 +60,8 @@ export class JobService {
|
||||
return 0;
|
||||
case JobId.MACHINE_LEARNING:
|
||||
return this.runMachineLearningPipeline();
|
||||
case JobId.STORAGE_TEMPLATE_MIGRATION:
|
||||
return this.runStorageMigration();
|
||||
default:
|
||||
throw new BadRequestException('Invalid job id');
|
||||
}
|
||||
@@ -62,6 +72,7 @@ export class JobService {
|
||||
const metadataExtractionJobCount = await this.metadataExtractionQueue.getJobCounts();
|
||||
const videoConversionJobCount = await this.videoConversionQueue.getJobCounts();
|
||||
const machineLearningJobCount = await this.machineLearningQueue.getJobCounts();
|
||||
const storageMigrationJobCount = await this.storageMigrationQueue.getJobCounts();
|
||||
|
||||
const response = new AllJobStatusResponseDto();
|
||||
response.isThumbnailGenerationActive = Boolean(thumbnailGeneratorJobCount.waiting);
|
||||
@@ -73,6 +84,9 @@ export class JobService {
|
||||
response.isMachineLearningActive = Boolean(machineLearningJobCount.waiting);
|
||||
response.machineLearningQueueCount = machineLearningJobCount;
|
||||
|
||||
response.isStorageMigrationActive = Boolean(storageMigrationJobCount.active);
|
||||
response.storageMigrationQueueCount = storageMigrationJobCount;
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -93,6 +107,11 @@ export class JobService {
|
||||
response.queueCount = await this.videoConversionQueue.getJobCounts();
|
||||
}
|
||||
|
||||
if (query.jobId === JobId.STORAGE_TEMPLATE_MIGRATION) {
|
||||
response.isActive = Boolean((await this.storageMigrationQueue.getJobCounts()).waiting);
|
||||
response.queueCount = await this.storageMigrationQueue.getJobCounts();
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
@@ -110,6 +129,9 @@ export class JobService {
|
||||
case JobId.MACHINE_LEARNING:
|
||||
this.machineLearningQueue.empty();
|
||||
return 0;
|
||||
case JobId.STORAGE_TEMPLATE_MIGRATION:
|
||||
this.storageMigrationQueue.empty();
|
||||
return 0;
|
||||
default:
|
||||
throw new BadRequestException('Invalid job id');
|
||||
}
|
||||
@@ -177,4 +199,16 @@ export class JobService {
|
||||
|
||||
return assetWithNoSmartInfo.length;
|
||||
}
|
||||
|
||||
async runStorageMigration() {
|
||||
const jobCount = await this.storageMigrationQueue.getJobCounts();
|
||||
|
||||
if (jobCount.active > 0) {
|
||||
throw new BadRequestException('Storage migration job is already running');
|
||||
}
|
||||
|
||||
await this.storageMigrationQueue.add(templateMigrationProcessorName, {}, { jobId: randomUUID() });
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ export class AllJobStatusResponseDto {
|
||||
isMetadataExtractionActive!: boolean;
|
||||
isVideoConversionActive!: boolean;
|
||||
isMachineLearningActive!: boolean;
|
||||
isStorageMigrationActive!: boolean;
|
||||
|
||||
@ApiProperty({
|
||||
type: JobCounts,
|
||||
@@ -37,4 +38,9 @@ export class AllJobStatusResponseDto {
|
||||
type: JobCounts,
|
||||
})
|
||||
machineLearningQueueCount!: JobCounts;
|
||||
|
||||
@ApiProperty({
|
||||
type: JobCounts,
|
||||
})
|
||||
storageMigrationQueueCount!: JobCounts;
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { SystemConfigEntity } from '@app/database/entities/system-config.entity';
|
||||
import { immichSharedQueues } from '@app/job/constants/bull-queue-registration.constant';
|
||||
import { BullModule } from '@nestjs/bull';
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { ImmichConfigModule } from 'libs/immich-config/src';
|
||||
@@ -7,7 +9,12 @@ import { SystemConfigController } from './system-config.controller';
|
||||
import { SystemConfigService } from './system-config.service';
|
||||
|
||||
@Module({
|
||||
imports: [ImmichJwtModule, ImmichConfigModule, TypeOrmModule.forFeature([SystemConfigEntity])],
|
||||
imports: [
|
||||
ImmichJwtModule,
|
||||
ImmichConfigModule,
|
||||
TypeOrmModule.forFeature([SystemConfigEntity]),
|
||||
BullModule.registerQueue(...immichSharedQueues),
|
||||
],
|
||||
controllers: [SystemConfigController],
|
||||
providers: [SystemConfigService],
|
||||
})
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { QueueNameEnum, updateTemplateProcessorName } from '@app/job';
|
||||
import {
|
||||
supportedDayTokens,
|
||||
supportedHourTokens,
|
||||
@@ -7,14 +8,21 @@ import {
|
||||
supportedSecondTokens,
|
||||
supportedYearTokens,
|
||||
} from '@app/storage/constants/supported-datetime-template';
|
||||
import { InjectQueue } from '@nestjs/bull';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Queue } from 'bull';
|
||||
import { randomUUID } from 'crypto';
|
||||
import { ImmichConfigService } from 'libs/immich-config/src';
|
||||
import { mapConfig, SystemConfigDto } from './dto/system-config.dto';
|
||||
import { SystemConfigTemplateStorageOptionDto } from './response-dto/system-config-template-storage-option.dto';
|
||||
|
||||
@Injectable()
|
||||
export class SystemConfigService {
|
||||
constructor(private immichConfigService: ImmichConfigService) {}
|
||||
constructor(
|
||||
private immichConfigService: ImmichConfigService,
|
||||
@InjectQueue(QueueNameEnum.STORAGE_MIGRATION)
|
||||
private storageMigrationQueue: Queue,
|
||||
) {}
|
||||
|
||||
public async getConfig(): Promise<SystemConfigDto> {
|
||||
const config = await this.immichConfigService.getConfig();
|
||||
@@ -28,6 +36,7 @@ export class SystemConfigService {
|
||||
|
||||
public async updateConfig(dto: SystemConfigDto): Promise<SystemConfigDto> {
|
||||
const config = await this.immichConfigService.updateConfig(dto);
|
||||
this.storageMigrationQueue.add(updateTemplateProcessorName, {}, { jobId: randomUUID() });
|
||||
return mapConfig(config);
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ import {
|
||||
UnauthorizedException,
|
||||
} from '@nestjs/common';
|
||||
import { Response as Res } from 'express';
|
||||
import { createReadStream } from 'fs';
|
||||
import { constants, createReadStream } from 'fs';
|
||||
import { AuthUserDto } from '../../decorators/auth-user.decorator';
|
||||
import { CreateUserDto } from './dto/create-user.dto';
|
||||
import { UpdateUserDto } from './dto/update-user.dto';
|
||||
@@ -22,6 +22,7 @@ import {
|
||||
import { mapUserCountResponse, UserCountResponseDto } from './response-dto/user-count-response.dto';
|
||||
import { mapUser, UserResponseDto } from './response-dto/user-response.dto';
|
||||
import { IUserRepository, USER_REPOSITORY } from './user-repository';
|
||||
import fs from 'fs/promises';
|
||||
|
||||
@Injectable()
|
||||
export class UserService {
|
||||
@@ -196,6 +197,8 @@ export class UserService {
|
||||
throw new NotFoundException('User does not have a profile image');
|
||||
}
|
||||
|
||||
await fs.access(user.profileImagePath, constants.R_OK | constants.W_OK);
|
||||
|
||||
res.set({
|
||||
'Content-Type': 'image/jpeg',
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { immichAppConfig } from '@app/common/config';
|
||||
import { immichAppConfig, immichBullAsyncConfig } from '@app/common/config';
|
||||
import { MiddlewareConsumer, Module, NestModule } from '@nestjs/common';
|
||||
import { UserModule } from './api-v1/user/user.module';
|
||||
import { AssetModule } from './api-v1/asset/asset.module';
|
||||
@@ -36,18 +36,7 @@ import { TagModule } from './api-v1/tag/tag.module';
|
||||
|
||||
DeviceInfoModule,
|
||||
|
||||
BullModule.forRootAsync({
|
||||
useFactory: async () => ({
|
||||
prefix: 'immich_bull',
|
||||
redis: {
|
||||
host: process.env.REDIS_HOSTNAME || 'immich_redis',
|
||||
port: parseInt(process.env.REDIS_PORT || '6379'),
|
||||
db: parseInt(process.env.REDIS_DBINDEX || '0'),
|
||||
password: process.env.REDIS_PASSWORD || undefined,
|
||||
path: process.env.REDIS_SOCKET || undefined,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
BullModule.forRootAsync(immichBullAsyncConfig),
|
||||
|
||||
ServerInfoModule,
|
||||
|
||||
|
||||
@@ -11,11 +11,6 @@ import { BackgroundTaskService } from './background-task.service';
|
||||
imports: [
|
||||
BullModule.registerQueue({
|
||||
name: 'background-task',
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
TypeOrmModule.forFeature([AssetEntity, ExifEntity, SmartInfoEntity]),
|
||||
],
|
||||
|
||||
@@ -3,46 +3,14 @@ import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { AssetEntity } from '@app/database/entities/asset.entity';
|
||||
import { ScheduleTasksService } from './schedule-tasks.service';
|
||||
import { QueueNameEnum } from '@app/job/constants/queue-name.constant';
|
||||
import { ExifEntity } from '@app/database/entities/exif.entity';
|
||||
import { UserEntity } from '@app/database/entities/user.entity';
|
||||
import { immichSharedQueues } from '@app/job/constants/bull-queue-registration.constant';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TypeOrmModule.forFeature([AssetEntity, ExifEntity, UserEntity]),
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.USER_DELETION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.VIDEO_CONVERSION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.THUMBNAIL_GENERATION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
|
||||
BullModule.registerQueue({
|
||||
name: QueueNameEnum.METADATA_EXTRACTION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
BullModule.registerQueue(...immichSharedQueues),
|
||||
],
|
||||
providers: [ScheduleTasksService],
|
||||
})
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import { immichAppConfig } from '@app/common/config';
|
||||
import { immichAppConfig, immichBullAsyncConfig } from '@app/common/config';
|
||||
import { DatabaseModule } from '@app/database';
|
||||
import { AssetEntity } from '@app/database/entities/asset.entity';
|
||||
import { ExifEntity } from '@app/database/entities/exif.entity';
|
||||
import { SmartInfoEntity } from '@app/database/entities/smart-info.entity';
|
||||
import { UserEntity } from '@app/database/entities/user.entity';
|
||||
import { QueueNameEnum } from '@app/job/constants/queue-name.constant';
|
||||
import { StorageModule } from '@app/storage';
|
||||
import { BullModule } from '@nestjs/bull';
|
||||
import { Module } from '@nestjs/common';
|
||||
import { ConfigModule } from '@nestjs/config';
|
||||
@@ -16,9 +16,11 @@ import { AssetUploadedProcessor } from './processors/asset-uploaded.processor';
|
||||
import { GenerateChecksumProcessor } from './processors/generate-checksum.processor';
|
||||
import { MachineLearningProcessor } from './processors/machine-learning.processor';
|
||||
import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor';
|
||||
import { StorageMigrationProcessor } from './processors/storage-migration.processor';
|
||||
import { ThumbnailGeneratorProcessor } from './processors/thumbnail.processor';
|
||||
import { UserDeletionProcessor } from './processors/user-deletion.processor';
|
||||
import { VideoTranscodeProcessor } from './processors/video-transcode.processor';
|
||||
import { immichSharedQueues } from '@app/job/constants/bull-queue-registration.constant';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -26,76 +28,9 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
|
||||
DatabaseModule,
|
||||
ImmichConfigModule,
|
||||
TypeOrmModule.forFeature([UserEntity, ExifEntity, AssetEntity, SmartInfoEntity]),
|
||||
BullModule.forRootAsync({
|
||||
useFactory: async () => ({
|
||||
prefix: 'immich_bull',
|
||||
redis: {
|
||||
host: process.env.REDIS_HOSTNAME || 'immich_redis',
|
||||
port: parseInt(process.env.REDIS_PORT || '6379'),
|
||||
db: parseInt(process.env.REDIS_DBINDEX || '0'),
|
||||
password: process.env.REDIS_PASSWORD || undefined,
|
||||
path: process.env.REDIS_SOCKET || undefined,
|
||||
},
|
||||
}),
|
||||
}),
|
||||
BullModule.registerQueue(
|
||||
{
|
||||
name: QueueNameEnum.USER_DELETION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.THUMBNAIL_GENERATION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.ASSET_UPLOADED,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.METADATA_EXTRACTION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.VIDEO_CONVERSION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.CHECKSUM_GENERATION,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: QueueNameEnum.MACHINE_LEARNING,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
removeOnComplete: true,
|
||||
removeOnFail: false,
|
||||
},
|
||||
},
|
||||
),
|
||||
StorageModule,
|
||||
BullModule.forRootAsync(immichBullAsyncConfig),
|
||||
BullModule.registerQueue(...immichSharedQueues),
|
||||
CommunicationModule,
|
||||
],
|
||||
controllers: [],
|
||||
@@ -108,7 +43,8 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor'
|
||||
GenerateChecksumProcessor,
|
||||
MachineLearningProcessor,
|
||||
UserDeletionProcessor,
|
||||
StorageMigrationProcessor,
|
||||
],
|
||||
exports: [],
|
||||
exports: [BullModule],
|
||||
})
|
||||
export class MicroservicesModule {}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
import { APP_UPLOAD_LOCATION } from '@app/common';
|
||||
import { AssetEntity } from '@app/database/entities/asset.entity';
|
||||
import { ImmichConfigService } from '@app/immich-config';
|
||||
import { QueueNameEnum, templateMigrationProcessorName, updateTemplateProcessorName } from '@app/job';
|
||||
import { StorageService } from '@app/storage';
|
||||
import { Process, Processor } from '@nestjs/bull';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
@Processor(QueueNameEnum.STORAGE_MIGRATION)
|
||||
export class StorageMigrationProcessor {
|
||||
readonly logger: Logger = new Logger(StorageMigrationProcessor.name);
|
||||
|
||||
constructor(
|
||||
private storageService: StorageService,
|
||||
private immichConfigService: ImmichConfigService,
|
||||
|
||||
@InjectRepository(AssetEntity)
|
||||
private assetRepository: Repository<AssetEntity>,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Migration process when a new user set a new storage template.
|
||||
* @param job
|
||||
*/
|
||||
@Process({ name: templateMigrationProcessorName, concurrency: 100 })
|
||||
async templateMigration() {
|
||||
console.time('migrating-time');
|
||||
const assets = await this.assetRepository.find({
|
||||
relations: ['exifInfo'],
|
||||
});
|
||||
|
||||
const livePhotoMap: Record<string, AssetEntity> = {};
|
||||
|
||||
for (const asset of assets) {
|
||||
if (asset.livePhotoVideoId) {
|
||||
livePhotoMap[asset.livePhotoVideoId] = asset;
|
||||
}
|
||||
}
|
||||
|
||||
for (const asset of assets) {
|
||||
const livePhotoParentAsset = livePhotoMap[asset.id];
|
||||
const filename = asset.exifInfo?.imageName || livePhotoParentAsset?.exifInfo?.imageName || asset.id;
|
||||
await this.storageService.moveAsset(asset, filename);
|
||||
}
|
||||
|
||||
await this.storageService.removeEmptyDirectories(APP_UPLOAD_LOCATION);
|
||||
console.timeEnd('migrating-time');
|
||||
}
|
||||
|
||||
/**
|
||||
* Update config when a new storage template is set.
|
||||
* This is to ensure the synchronization between processes.
|
||||
* @param job
|
||||
*/
|
||||
@Process({ name: updateTemplateProcessorName, concurrency: 1 })
|
||||
async updateTemplate() {
|
||||
await this.immichConfigService.refreshConfig();
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
import { APP_UPLOAD_LOCATION } from '@app/common';
|
||||
import { ImmichLogLevel } from '@app/common/constants/log-level.constant';
|
||||
import { AssetEntity, AssetType } from '@app/database/entities/asset.entity';
|
||||
import {
|
||||
WebpGeneratorProcessor,
|
||||
@@ -11,7 +10,6 @@ import {
|
||||
} from '@app/job';
|
||||
import { InjectQueue, Process, Processor } from '@nestjs/bull';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { mapAsset } from 'apps/immich/src/api-v1/asset/response-dto/asset-response.dto';
|
||||
import { Job, Queue } from 'bull';
|
||||
@@ -27,7 +25,7 @@ import { IMachineLearningJob } from '@app/job/interfaces/machine-learning.interf
|
||||
|
||||
@Processor(QueueNameEnum.THUMBNAIL_GENERATION)
|
||||
export class ThumbnailGeneratorProcessor {
|
||||
private logLevel: ImmichLogLevel;
|
||||
readonly logger: Logger = new Logger(ThumbnailGeneratorProcessor.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(AssetEntity)
|
||||
@@ -40,12 +38,7 @@ export class ThumbnailGeneratorProcessor {
|
||||
|
||||
@InjectQueue(QueueNameEnum.MACHINE_LEARNING)
|
||||
private machineLearningQueue: Queue<IMachineLearningJob>,
|
||||
|
||||
private configService: ConfigService,
|
||||
) {
|
||||
this.logLevel = this.configService.get('LOG_LEVEL') || ImmichLogLevel.SIMPLE;
|
||||
// TODO - Add observable paterrn to listen to the config change
|
||||
}
|
||||
) {}
|
||||
|
||||
@Process({ name: generateJPEGThumbnailProcessorName, concurrency: 3 })
|
||||
async generateJPEGThumbnail(job: Job<JpegGeneratorProcessor>) {
|
||||
@@ -70,12 +63,8 @@ export class ThumbnailGeneratorProcessor {
|
||||
.rotate()
|
||||
.toFile(jpegThumbnailPath);
|
||||
await this.assetRepository.update({ id: asset.id }, { resizePath: jpegThumbnailPath });
|
||||
} catch (error) {
|
||||
Logger.error('Failed to generate jpeg thumbnail for asset: ' + asset.id);
|
||||
|
||||
if (this.logLevel == ImmichLogLevel.VERBOSE) {
|
||||
console.trace('Failed to generate jpeg thumbnail for asset', error);
|
||||
}
|
||||
} catch (error: any) {
|
||||
this.logger.error('Failed to generate jpeg thumbnail for asset: ' + asset.id, error.stack);
|
||||
}
|
||||
|
||||
// Update resize path to send to generate webp queue
|
||||
@@ -140,12 +129,8 @@ export class ThumbnailGeneratorProcessor {
|
||||
try {
|
||||
await sharp(asset.resizePath, { failOnError: false }).resize(250).webp().rotate().toFile(webpPath);
|
||||
await this.assetRepository.update({ id: asset.id }, { webpPath: webpPath });
|
||||
} catch (error) {
|
||||
Logger.error('Failed to generate webp thumbnail for asset: ' + asset.id);
|
||||
|
||||
if (this.logLevel == ImmichLogLevel.VERBOSE) {
|
||||
console.trace('Failed to generate webp thumbnail for asset', error);
|
||||
}
|
||||
} catch (error: any) {
|
||||
this.logger.error('Failed to generate webp thumbnail for asset: ' + asset.id, error.stack);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user