feat(web): enable websocket (#3765)

* send store event to page

* fix format

* add new asset to existing bucket

* format

* debouncing

* format

* load bucket

* feedback

* feat: listen to deletes and auto-subscribe on all asset grid pages

* feat: auto refresh on person thumbnail

* chore: skip upload event for now

* fix: person thumbnail event

* fix merge

* update handleAssetDeletion with websocket communication

* update info box on mount

* fix test

* fix test

* feat: event for trash asset

---------

Co-authored-by: Jason Rasmussen <jrasm91@gmail.com>
This commit is contained in:
Alex
2023-10-06 15:48:11 -05:00
committed by GitHub
parent 4dffae3f39
commit 36b21948bf
16 changed files with 279 additions and 136 deletions

View File

@@ -7,6 +7,7 @@ import {
faceStub,
newAccessRepositoryMock,
newAssetRepositoryMock,
newCommunicationRepositoryMock,
newCryptoRepositoryMock,
newJobRepositoryMock,
newStorageRepositoryMock,
@@ -14,6 +15,7 @@ import {
} from '@test';
import { when } from 'jest-when';
import { Readable } from 'stream';
import { ICommunicationRepository } from '../communication';
import { ICryptoRepository } from '../crypto';
import { IJobRepository, JobItem, JobName } from '../job';
import { IStorageRepository } from '../storage';
@@ -153,6 +155,7 @@ describe(AssetService.name, () => {
let cryptoMock: jest.Mocked<ICryptoRepository>;
let jobMock: jest.Mocked<IJobRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
it('should work', () => {
@@ -162,11 +165,12 @@ describe(AssetService.name, () => {
beforeEach(async () => {
accessMock = newAccessRepositoryMock();
assetMock = newAssetRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
cryptoMock = newCryptoRepositoryMock();
jobMock = newJobRepositoryMock();
storageMock = newStorageRepositoryMock();
configMock = newSystemConfigRepositoryMock();
sut = new AssetService(accessMock, assetMock, cryptoMock, jobMock, configMock, storageMock);
sut = new AssetService(accessMock, assetMock, cryptoMock, jobMock, configMock, storageMock, communicationMock);
when(assetMock.getById)
.calledWith(assetStub.livePhotoStillAsset.id)

View File

@@ -6,6 +6,7 @@ import { extname } from 'path';
import sanitize from 'sanitize-filename';
import { AccessCore, IAccessRepository, Permission } from '../access';
import { AuthUserDto } from '../auth';
import { CommunicationEvent, ICommunicationRepository } from '../communication';
import { ICryptoRepository } from '../crypto';
import { mimeTypes } from '../domain.constant';
import { HumanReadableSize, usePagination } from '../domain.util';
@@ -72,6 +73,7 @@ export class AssetService {
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
) {
this.access = new AccessCore(accessRepository);
this.storageCore = new StorageCore(storageRepository);
@@ -362,6 +364,7 @@ export class AssetService {
await this.assetRepository.remove(asset);
await this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_ASSET, data: { ids: [asset.id] } });
this.communicationRepository.send(CommunicationEvent.ASSET_DELETE, asset.ownerId, id);
// TODO refactor this to use cascades
if (asset.livePhotoVideoId) {
@@ -392,6 +395,7 @@ export class AssetService {
} else {
await this.assetRepository.softDeleteAll(ids);
await this.jobRepository.queue({ name: JobName.SEARCH_REMOVE_ASSET, data: { ids } });
this.communicationRepository.send(CommunicationEvent.ASSET_TRASH, authUser.id, ids);
}
}

View File

@@ -2,6 +2,10 @@ export const ICommunicationRepository = 'ICommunicationRepository';
export enum CommunicationEvent {
UPLOAD_SUCCESS = 'on_upload_success',
ASSET_DELETE = 'on_asset_delete',
ASSET_TRASH = 'on_asset_trash',
PERSON_THUMBNAIL = 'on_person_thumbnail',
SERVER_VERSION = 'on_server_version',
CONFIG_UPDATE = 'on_config_update',
}

View File

@@ -6,10 +6,12 @@ import {
newAssetRepositoryMock,
newCommunicationRepositoryMock,
newJobRepositoryMock,
newPersonRepositoryMock,
newSystemConfigRepositoryMock,
} from '@test';
import { IAssetRepository } from '../asset';
import { ICommunicationRepository } from '../communication';
import { IPersonRepository } from '../person';
import { ISystemConfigRepository } from '../system-config';
import { SystemConfigCore } from '../system-config/system-config.core';
import { JobCommand, JobName, QueueName } from './job.constants';
@@ -30,13 +32,15 @@ describe(JobService.name, () => {
let configMock: jest.Mocked<ISystemConfigRepository>;
let communicationMock: jest.Mocked<ICommunicationRepository>;
let jobMock: jest.Mocked<IJobRepository>;
let personMock: jest.Mocked<IPersonRepository>;
beforeEach(async () => {
assetMock = newAssetRepositoryMock();
configMock = newSystemConfigRepositoryMock();
communicationMock = newCommunicationRepositoryMock();
jobMock = newJobRepositoryMock();
sut = new JobService(assetMock, communicationMock, jobMock, configMock);
personMock = newPersonRepositoryMock();
sut = new JobService(assetMock, communicationMock, jobMock, configMock, personMock);
});
it('should work', () => {

View File

@@ -2,6 +2,7 @@ import { AssetType } from '@app/infra/entities';
import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common';
import { IAssetRepository, mapAsset } from '../asset';
import { CommunicationEvent, ICommunicationRepository } from '../communication';
import { IPersonRepository } from '../person';
import { FeatureFlag, ISystemConfigRepository } from '../system-config';
import { SystemConfigCore } from '../system-config/system-config.core';
import { JobCommand, JobName, QueueName } from './job.constants';
@@ -18,6 +19,7 @@ export class JobService {
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
@Inject(IPersonRepository) private personRepository: IPersonRepository,
) {
this.configCore = new SystemConfigCore(configRepository);
}
@@ -172,15 +174,20 @@ export class JobService {
}
break;
case JobName.GENERATE_PERSON_THUMBNAIL:
const { id } = item.data;
const person = await this.personRepository.getById(id);
if (person) {
this.communicationRepository.send(CommunicationEvent.PERSON_THUMBNAIL, person.ownerId, id);
}
break;
case JobName.GENERATE_JPEG_THUMBNAIL: {
await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data });
await this.jobRepository.queue({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: item.data });
await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: item.data });
await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data });
await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data });
if (item.data.source !== 'upload') {
break;
}
const [asset] = await this.assetRepository.getByIds([item.data.id]);
if (asset) {
@@ -189,10 +196,20 @@ export class JobService {
} else if (asset.livePhotoVideoId) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.livePhotoVideoId } });
}
this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
}
break;
}
case JobName.GENERATE_WEBP_THUMBNAIL: {
if (item.data.source !== 'upload') {
break;
}
const [asset] = await this.assetRepository.getByIds([item.data.id]);
if (asset) {
this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
}
}
}
// In addition to the above jobs, all of these should queue `SEARCH_INDEX_ASSET`

View File

@@ -1,34 +0,0 @@
import { AuthService } from '@app/domain';
import { Logger } from '@nestjs/common';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({ cors: true })
export class CommunicationGateway implements OnGatewayConnection, OnGatewayDisconnect {
private logger = new Logger(CommunicationGateway.name);
constructor(private authService: AuthService) {}
@WebSocketServer() server!: Server;
handleDisconnect(client: Socket) {
client.leave(client.nsp.name);
this.logger.log(`Client ${client.id} disconnected from Websocket`);
}
async handleConnection(client: Socket) {
try {
this.logger.log(`New websocket connection: ${client.id}`);
const user = await this.authService.validate(client.request.headers, {});
if (user) {
client.join(user.id);
} else {
client.emit('error', 'unauthorized');
client.disconnect();
}
} catch (e) {
client.emit('error', 'unauthorized');
client.disconnect();
}
}
}

View File

@@ -27,7 +27,6 @@ import { BullModule } from '@nestjs/bullmq';
import { Global, Module, Provider } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CommunicationGateway } from './communication.gateway';
import { databaseConfig } from './database.config';
import { databaseEntities } from './entities';
import { bullConfig, bullQueues } from './infra.config';
@@ -90,7 +89,7 @@ const providers: Provider[] = [
BullModule.forRoot(bullConfig),
BullModule.registerQueue(...bullQueues),
],
providers: [...providers, CommunicationGateway],
providers: [...providers],
exports: [...providers, BullModule],
})
export class InfraModule {}

View File

@@ -1,16 +1,43 @@
import { CommunicationEvent } from '@app/domain';
import { Injectable } from '@nestjs/common';
import { CommunicationGateway } from '../communication.gateway';
import { AuthService, CommunicationEvent, ICommunicationRepository, serverVersion } from '@app/domain';
import { Logger } from '@nestjs/common';
import { OnGatewayConnection, OnGatewayDisconnect, WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@Injectable()
export class CommunicationRepository {
constructor(private ws: CommunicationGateway) {}
@WebSocketGateway({ cors: true })
export class CommunicationRepository implements OnGatewayConnection, OnGatewayDisconnect, ICommunicationRepository {
private logger = new Logger(CommunicationRepository.name);
constructor(private authService: AuthService) {}
@WebSocketServer() server!: Server;
async handleConnection(client: Socket) {
try {
this.logger.log(`New websocket connection: ${client.id}`);
const user = await this.authService.validate(client.request.headers, {});
if (user) {
client.join(user.id);
this.send(CommunicationEvent.SERVER_VERSION, user.id, serverVersion);
} else {
client.emit('error', 'unauthorized');
client.disconnect();
}
} catch (e) {
client.emit('error', 'unauthorized');
client.disconnect();
}
}
handleDisconnect(client: Socket) {
client.leave(client.nsp.name);
this.logger.log(`Client ${client.id} disconnected from Websocket`);
}
send(event: CommunicationEvent, userId: string, data: any) {
this.ws.server.to(userId).emit(event, JSON.stringify(data));
this.server.to(userId).emit(event, JSON.stringify(data));
}
broadcast(event: CommunicationEvent, data: any) {
this.ws.server.emit(event, data);
this.server.emit(event, data);
}
}