mirror of
https://github.com/KevinMidboe/immich.git
synced 2025-10-29 17:40:28 +00:00
feat(server)!: search via typesense (#1778)
* build: add typesense to docker * feat(server): typesense search * feat(web): search * fix(web): show api error response message * chore: search tests * chore: regenerate open api * fix: disable typesense on e2e * fix: number properties for open api (dart) * fix: e2e test * fix: change lat/lng from floats to typesense geopoint * dev: Add smartInfo relation to findAssetById to be able to query against it --------- Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
@@ -11,4 +11,13 @@ export class AlbumRepository implements IAlbumRepository {
|
||||
async deleteAll(userId: string): Promise<void> {
|
||||
await this.repository.delete({ ownerId: userId });
|
||||
}
|
||||
|
||||
getAll(): Promise<AlbumEntity[]> {
|
||||
return this.repository.find();
|
||||
}
|
||||
|
||||
async save(album: Partial<AlbumEntity>) {
|
||||
const { id } = await this.repository.save(album);
|
||||
return this.repository.findOneOrFail({ where: { id } });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { IAssetRepository } from '@app/domain';
|
||||
import { AssetSearchOptions, IAssetRepository } from '@app/domain';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Not, Repository } from 'typeorm';
|
||||
@@ -12,13 +12,32 @@ export class AssetRepository implements IAssetRepository {
|
||||
await this.repository.delete({ ownerId });
|
||||
}
|
||||
|
||||
async getAll(): Promise<AssetEntity[]> {
|
||||
return this.repository.find({ relations: { exifInfo: true } });
|
||||
getAll(options?: AssetSearchOptions | undefined): Promise<AssetEntity[]> {
|
||||
options = options || {};
|
||||
|
||||
return this.repository.find({
|
||||
where: {
|
||||
isVisible: options.isVisible,
|
||||
},
|
||||
relations: {
|
||||
exifInfo: true,
|
||||
smartInfo: true,
|
||||
tags: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async save(asset: Partial<AssetEntity>): Promise<AssetEntity> {
|
||||
const { id } = await this.repository.save(asset);
|
||||
return this.repository.findOneOrFail({ where: { id } });
|
||||
return this.repository.findOneOrFail({
|
||||
where: { id },
|
||||
relations: {
|
||||
exifInfo: true,
|
||||
owner: true,
|
||||
smartInfo: true,
|
||||
tags: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
findLivePhotoMatch(livePhotoCID: string, otherAssetId: string, type: AssetType): Promise<AssetEntity | null> {
|
||||
|
||||
@@ -8,6 +8,7 @@ import {
|
||||
IKeyRepository,
|
||||
IMachineLearningRepository,
|
||||
IMediaRepository,
|
||||
ISearchRepository,
|
||||
ISharedLinkRepository,
|
||||
ISmartInfoRepository,
|
||||
IStorageRepository,
|
||||
@@ -45,6 +46,7 @@ import {
|
||||
import { JobRepository } from './job';
|
||||
import { MachineLearningRepository } from './machine-learning';
|
||||
import { MediaRepository } from './media';
|
||||
import { TypesenseRepository } from './search';
|
||||
import { FilesystemProvider } from './storage';
|
||||
|
||||
const providers: Provider[] = [
|
||||
@@ -52,12 +54,12 @@ const providers: Provider[] = [
|
||||
{ provide: IAssetRepository, useClass: AssetRepository },
|
||||
{ provide: ICommunicationRepository, useClass: CommunicationRepository },
|
||||
{ provide: ICryptoRepository, useClass: CryptoRepository },
|
||||
{ provide: ICryptoRepository, useClass: CryptoRepository },
|
||||
{ provide: IDeviceInfoRepository, useClass: DeviceInfoRepository },
|
||||
{ provide: IKeyRepository, useClass: APIKeyRepository },
|
||||
{ provide: IJobRepository, useClass: JobRepository },
|
||||
{ provide: IMachineLearningRepository, useClass: MachineLearningRepository },
|
||||
{ provide: IMediaRepository, useClass: MediaRepository },
|
||||
{ provide: ISearchRepository, useClass: TypesenseRepository },
|
||||
{ provide: ISharedLinkRepository, useClass: SharedLinkRepository },
|
||||
{ provide: ISmartInfoRepository, useClass: SmartInfoRepository },
|
||||
{ provide: IStorageRepository, useClass: FilesystemProvider },
|
||||
|
||||
@@ -13,6 +13,7 @@ export class JobRepository implements IJobRepository {
|
||||
@InjectQueue(QueueName.STORAGE_TEMPLATE_MIGRATION) private storageTemplateMigration: Queue,
|
||||
@InjectQueue(QueueName.THUMBNAIL_GENERATION) private thumbnail: Queue,
|
||||
@InjectQueue(QueueName.VIDEO_CONVERSION) private videoTranscode: Queue<IAssetJob>,
|
||||
@InjectQueue(QueueName.SEARCH) private searchIndex: Queue,
|
||||
) {}
|
||||
|
||||
async isActive(name: QueueName): Promise<boolean> {
|
||||
@@ -70,6 +71,18 @@ export class JobRepository implements IJobRepository {
|
||||
await this.videoTranscode.add(item.name, item.data);
|
||||
break;
|
||||
|
||||
case JobName.SEARCH_INDEX_ASSETS:
|
||||
case JobName.SEARCH_INDEX_ALBUMS:
|
||||
await this.searchIndex.add(item.name);
|
||||
break;
|
||||
|
||||
case JobName.SEARCH_INDEX_ASSET:
|
||||
case JobName.SEARCH_INDEX_ALBUM:
|
||||
case JobName.SEARCH_REMOVE_ALBUM:
|
||||
case JobName.SEARCH_REMOVE_ASSET:
|
||||
await this.searchIndex.add(item.name, item.data);
|
||||
break;
|
||||
|
||||
default:
|
||||
// TODO inject remaining queues and map job to queue
|
||||
this.logger.error('Invalid job', item);
|
||||
|
||||
1
server/libs/infra/src/search/index.ts
Normal file
1
server/libs/infra/src/search/index.ts
Normal file
@@ -0,0 +1 @@
|
||||
export * from './typesense.repository';
|
||||
13
server/libs/infra/src/search/schemas/album.schema.ts
Normal file
13
server/libs/infra/src/search/schemas/album.schema.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { CollectionCreateSchema } from 'typesense/lib/Typesense/Collections';
|
||||
|
||||
export const albumSchemaVersion = 1;
|
||||
export const albumSchema: CollectionCreateSchema = {
|
||||
name: `albums-v${albumSchemaVersion}`,
|
||||
fields: [
|
||||
{ name: 'ownerId', type: 'string', facet: false },
|
||||
{ name: 'albumName', type: 'string', facet: false, sort: true },
|
||||
{ name: 'createdAt', type: 'string', facet: false, sort: true },
|
||||
{ name: 'updatedAt', type: 'string', facet: false, sort: true },
|
||||
],
|
||||
default_sorting_field: 'createdAt',
|
||||
};
|
||||
37
server/libs/infra/src/search/schemas/asset.schema.ts
Normal file
37
server/libs/infra/src/search/schemas/asset.schema.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { CollectionCreateSchema } from 'typesense/lib/Typesense/Collections';
|
||||
|
||||
export const assetSchemaVersion = 1;
|
||||
export const assetSchema: CollectionCreateSchema = {
|
||||
name: `assets-v${assetSchemaVersion}`,
|
||||
fields: [
|
||||
// asset
|
||||
{ name: 'ownerId', type: 'string', facet: false },
|
||||
{ name: 'type', type: 'string', facet: true },
|
||||
{ name: 'originalPath', type: 'string', facet: false },
|
||||
{ name: 'createdAt', type: 'string', facet: false, sort: true },
|
||||
{ name: 'updatedAt', type: 'string', facet: false, sort: true },
|
||||
{ name: 'fileCreatedAt', type: 'string', facet: false, sort: true },
|
||||
{ name: 'fileModifiedAt', type: 'string', facet: false, sort: true },
|
||||
{ name: 'isFavorite', type: 'bool', facet: true },
|
||||
// { name: 'checksum', type: 'string', facet: true },
|
||||
// { name: 'tags', type: 'string[]', facet: true, optional: true },
|
||||
|
||||
// exif
|
||||
{ name: 'exifInfo.city', type: 'string', facet: true, optional: true },
|
||||
{ name: 'exifInfo.country', type: 'string', facet: true, optional: true },
|
||||
{ name: 'exifInfo.state', type: 'string', facet: true, optional: true },
|
||||
{ name: 'exifInfo.description', type: 'string', facet: false, optional: true },
|
||||
{ name: 'exifInfo.imageName', type: 'string', facet: false, optional: true },
|
||||
{ name: 'geo', type: 'geopoint', facet: false, optional: true },
|
||||
{ name: 'exifInfo.make', type: 'string', facet: true, optional: true },
|
||||
{ name: 'exifInfo.model', type: 'string', facet: true, optional: true },
|
||||
{ name: 'exifInfo.orientation', type: 'string', optional: true },
|
||||
|
||||
// smart info
|
||||
{ name: 'smartInfo.objects', type: 'string[]', facet: true, optional: true },
|
||||
{ name: 'smartInfo.tags', type: 'string[]', facet: true, optional: true },
|
||||
],
|
||||
token_separators: ['.'],
|
||||
enable_nested_fields: true,
|
||||
default_sorting_field: 'fileCreatedAt',
|
||||
};
|
||||
325
server/libs/infra/src/search/typesense.repository.ts
Normal file
325
server/libs/infra/src/search/typesense.repository.ts
Normal file
@@ -0,0 +1,325 @@
|
||||
import {
|
||||
ISearchRepository,
|
||||
SearchCollection,
|
||||
SearchCollectionIndexStatus,
|
||||
SearchFilter,
|
||||
SearchResult,
|
||||
} from '@app/domain';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import _, { Dictionary } from 'lodash';
|
||||
import { Client } from 'typesense';
|
||||
import { CollectionCreateSchema } from 'typesense/lib/Typesense/Collections';
|
||||
import { DocumentSchema, SearchResponse } from 'typesense/lib/Typesense/Documents';
|
||||
import { AlbumEntity, AssetEntity } from '../db';
|
||||
import { albumSchema } from './schemas/album.schema';
|
||||
import { assetSchema } from './schemas/asset.schema';
|
||||
|
||||
interface GeoAssetEntity extends AssetEntity {
|
||||
geo?: [number, number];
|
||||
}
|
||||
|
||||
function removeNil<T extends Dictionary<any>>(item: T): Partial<T> {
|
||||
_.forOwn(item, (value, key) => {
|
||||
if (_.isNil(value) || (_.isObject(value) && !_.isDate(value) && _.isEmpty(removeNil(value)))) {
|
||||
delete item[key];
|
||||
}
|
||||
});
|
||||
|
||||
return item;
|
||||
}
|
||||
|
||||
const schemaMap: Record<SearchCollection, CollectionCreateSchema> = {
|
||||
[SearchCollection.ASSETS]: assetSchema,
|
||||
[SearchCollection.ALBUMS]: albumSchema,
|
||||
};
|
||||
|
||||
const schemas = Object.entries(schemaMap) as [SearchCollection, CollectionCreateSchema][];
|
||||
|
||||
interface SearchUpdateQueue<T = any> {
|
||||
upsert: T[];
|
||||
delete: string[];
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class TypesenseRepository implements ISearchRepository {
|
||||
private logger = new Logger(TypesenseRepository.name);
|
||||
private queue: Record<SearchCollection, SearchUpdateQueue> = {
|
||||
[SearchCollection.ASSETS]: {
|
||||
upsert: [],
|
||||
delete: [],
|
||||
},
|
||||
[SearchCollection.ALBUMS]: {
|
||||
upsert: [],
|
||||
delete: [],
|
||||
},
|
||||
};
|
||||
|
||||
private _client: Client | null = null;
|
||||
private get client(): Client {
|
||||
if (!this._client) {
|
||||
throw new Error('Typesense client not available (no apiKey was provided)');
|
||||
}
|
||||
return this._client;
|
||||
}
|
||||
|
||||
constructor() {
|
||||
const apiKey = process.env.TYPESENSE_API_KEY;
|
||||
if (!apiKey) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._client = new Client({
|
||||
nodes: [
|
||||
{
|
||||
host: process.env.TYPESENSE_HOST || 'typesense',
|
||||
port: Number(process.env.TYPESENSE_PORT) || 8108,
|
||||
protocol: process.env.TYPESENSE_PROTOCOL || 'http',
|
||||
},
|
||||
],
|
||||
apiKey,
|
||||
numRetries: 3,
|
||||
connectionTimeoutSeconds: 10,
|
||||
});
|
||||
|
||||
setInterval(() => this.flush(), 5_000);
|
||||
}
|
||||
|
||||
async setup(): Promise<void> {
|
||||
// upsert collections
|
||||
for (const [collectionName, schema] of schemas) {
|
||||
const collection = await this.client
|
||||
.collections(schema.name)
|
||||
.retrieve()
|
||||
.catch(() => null);
|
||||
if (!collection) {
|
||||
this.logger.log(`Creating schema: ${collectionName}/${schema.name}`);
|
||||
await this.client.collections().create(schema);
|
||||
} else {
|
||||
this.logger.log(`Schema up to date: ${collectionName}/${schema.name}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async checkMigrationStatus(): Promise<SearchCollectionIndexStatus> {
|
||||
const migrationMap: SearchCollectionIndexStatus = {
|
||||
[SearchCollection.ASSETS]: false,
|
||||
[SearchCollection.ALBUMS]: false,
|
||||
};
|
||||
|
||||
// check if alias is using the current schema
|
||||
const { aliases } = await this.client.aliases().retrieve();
|
||||
this.logger.log(`Alias mapping: ${JSON.stringify(aliases)}`);
|
||||
|
||||
for (const [aliasName, schema] of schemas) {
|
||||
const match = aliases.find((alias) => alias.name === aliasName);
|
||||
if (!match || match.collection_name !== schema.name) {
|
||||
migrationMap[aliasName] = true;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Collections needing migration: ${JSON.stringify(migrationMap)}`);
|
||||
|
||||
return migrationMap;
|
||||
}
|
||||
|
||||
async index(collection: SearchCollection, item: AssetEntity | AlbumEntity, immediate?: boolean): Promise<void> {
|
||||
const schema = schemaMap[collection];
|
||||
|
||||
if (collection === SearchCollection.ASSETS) {
|
||||
item = this.patchAsset(item as AssetEntity);
|
||||
}
|
||||
|
||||
if (immediate) {
|
||||
await this.client.collections(schema.name).documents().upsert(item);
|
||||
return;
|
||||
}
|
||||
|
||||
this.queue[collection].upsert.push(item);
|
||||
}
|
||||
|
||||
async delete(collection: SearchCollection, id: string, immediate?: boolean): Promise<void> {
|
||||
const schema = schemaMap[collection];
|
||||
|
||||
if (immediate) {
|
||||
await this.client.collections(schema.name).documents().delete(id);
|
||||
return;
|
||||
}
|
||||
|
||||
this.queue[collection].delete.push(id);
|
||||
}
|
||||
|
||||
async import(collection: SearchCollection, items: AssetEntity[] | AlbumEntity[], done: boolean): Promise<void> {
|
||||
try {
|
||||
const schema = schemaMap[collection];
|
||||
const _items = items.map((item) => {
|
||||
if (collection === SearchCollection.ASSETS) {
|
||||
item = this.patchAsset(item as AssetEntity);
|
||||
}
|
||||
// null values are invalid for typesense documents
|
||||
return removeNil(item);
|
||||
});
|
||||
if (_items.length > 0) {
|
||||
await this.client
|
||||
.collections(schema.name)
|
||||
.documents()
|
||||
.import(_items, { action: 'upsert', dirty_values: 'coerce_or_drop' });
|
||||
}
|
||||
if (done) {
|
||||
await this.updateAlias(collection);
|
||||
}
|
||||
} catch (error: any) {
|
||||
this.handleError(error);
|
||||
}
|
||||
}
|
||||
|
||||
search(collection: SearchCollection.ASSETS, query: string, filter: SearchFilter): Promise<SearchResult<AssetEntity>>;
|
||||
search(collection: SearchCollection.ALBUMS, query: string, filter: SearchFilter): Promise<SearchResult<AlbumEntity>>;
|
||||
async search(collection: SearchCollection, query: string, filters: SearchFilter) {
|
||||
const alias = await this.client.aliases(collection).retrieve();
|
||||
|
||||
const { userId } = filters;
|
||||
|
||||
const _filters = [`ownerId:${userId}`];
|
||||
|
||||
if (filters.id) {
|
||||
_filters.push(`id:=${filters.id}`);
|
||||
}
|
||||
if (collection === SearchCollection.ASSETS) {
|
||||
for (const item of schemaMap[collection].fields || []) {
|
||||
let value = filters[item.name as keyof SearchFilter];
|
||||
if (Array.isArray(value)) {
|
||||
value = `[${value.join(',')}]`;
|
||||
}
|
||||
if (item.facet && value !== undefined) {
|
||||
_filters.push(`${item.name}:${value}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(`Searching query='${query}', filters='${JSON.stringify(_filters)}'`);
|
||||
|
||||
const results = await this.client
|
||||
.collections<AssetEntity>(alias.collection_name)
|
||||
.documents()
|
||||
.search({
|
||||
q: query,
|
||||
query_by: [
|
||||
'exifInfo.imageName',
|
||||
'exifInfo.country',
|
||||
'exifInfo.state',
|
||||
'exifInfo.city',
|
||||
'exifInfo.description',
|
||||
'smartInfo.tags',
|
||||
'smartInfo.objects',
|
||||
].join(','),
|
||||
filter_by: _filters.join(' && '),
|
||||
per_page: 250,
|
||||
facet_by: (assetSchema.fields || [])
|
||||
.filter((field) => field.facet)
|
||||
.map((field) => field.name)
|
||||
.join(','),
|
||||
});
|
||||
|
||||
return this.asResponse(results);
|
||||
}
|
||||
|
||||
if (collection === SearchCollection.ALBUMS) {
|
||||
const results = await this.client
|
||||
.collections<AlbumEntity>(alias.collection_name)
|
||||
.documents()
|
||||
.search({
|
||||
q: query,
|
||||
query_by: 'albumName',
|
||||
filter_by: _filters.join(','),
|
||||
});
|
||||
|
||||
return this.asResponse(results);
|
||||
}
|
||||
|
||||
throw new Error(`Invalid collection: ${collection}`);
|
||||
}
|
||||
|
||||
private asResponse<T extends DocumentSchema>(results: SearchResponse<T>): SearchResult<T> {
|
||||
return {
|
||||
page: results.page,
|
||||
total: results.found,
|
||||
count: results.out_of,
|
||||
items: (results.hits || []).map((hit) => hit.document),
|
||||
facets: (results.facet_counts || []).map((facet) => ({
|
||||
counts: facet.counts.map((item) => ({ count: item.count, value: item.value })),
|
||||
fieldName: facet.field_name as string,
|
||||
})),
|
||||
};
|
||||
}
|
||||
|
||||
private async flush() {
|
||||
for (const [collection, schema] of schemas) {
|
||||
if (this.queue[collection].upsert.length > 0) {
|
||||
try {
|
||||
const items = this.queue[collection].upsert.map((item) => removeNil(item));
|
||||
this.logger.debug(`Flushing ${items.length} ${collection} upserts to typesense`);
|
||||
await this.client
|
||||
.collections(schema.name)
|
||||
.documents()
|
||||
.import(items, { action: 'upsert', dirty_values: 'coerce_or_drop' });
|
||||
this.queue[collection].upsert = [];
|
||||
} catch (error) {
|
||||
this.handleError(error);
|
||||
}
|
||||
}
|
||||
|
||||
if (this.queue[collection].delete.length > 0) {
|
||||
try {
|
||||
const items = this.queue[collection].delete;
|
||||
this.logger.debug(`Flushing ${items.length} ${collection} deletes to typesense`);
|
||||
await this.client
|
||||
.collections(schema.name)
|
||||
.documents()
|
||||
.delete({ filter_by: `id: [${items.join(',')}]` });
|
||||
this.queue[collection].delete = [];
|
||||
} catch (error) {
|
||||
this.handleError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private handleError(error: any): never {
|
||||
this.logger.error('Unable to index documents');
|
||||
const results = error.importResults || [];
|
||||
for (const result of results) {
|
||||
try {
|
||||
result.document = JSON.parse(result.document);
|
||||
} catch {}
|
||||
}
|
||||
this.logger.verbose(JSON.stringify(results, null, 2));
|
||||
throw error;
|
||||
}
|
||||
|
||||
private async updateAlias(collection: SearchCollection) {
|
||||
const schema = schemaMap[collection];
|
||||
const alias = await this.client
|
||||
.aliases(collection)
|
||||
.retrieve()
|
||||
.catch(() => null);
|
||||
|
||||
// update alias to current collection
|
||||
this.logger.log(`Using new schema: ${alias?.collection_name || '(unset)'} => ${schema.name}`);
|
||||
await this.client.aliases().upsert(collection, { collection_name: schema.name });
|
||||
|
||||
// delete previous collection
|
||||
if (alias && alias.collection_name !== schema.name) {
|
||||
this.logger.log(`Deleting old schema: ${alias.collection_name}`);
|
||||
await this.client.collections(alias.collection_name).delete();
|
||||
}
|
||||
}
|
||||
|
||||
private patchAsset(asset: AssetEntity): GeoAssetEntity {
|
||||
const lat = asset.exifInfo?.latitude;
|
||||
const lng = asset.exifInfo?.longitude;
|
||||
if (lat && lng && lat !== 0 && lng !== 0) {
|
||||
return { ...asset, geo: [lat, lng] };
|
||||
}
|
||||
|
||||
return asset;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user