feat(mobile): efficient asset sync (#3945)

* feat(mobile): efficient asset sync
This commit is contained in:
Fynn Petersen-Frey
2023-09-10 14:51:18 +02:00
committed by GitHub
parent 4b11e925d9
commit 5d1011b482
15 changed files with 379 additions and 257 deletions

View File

@@ -20,6 +20,7 @@ class ApiService {
late ServerInfoApi serverInfoApi;
late PartnerApi partnerApi;
late PersonApi personApi;
late AuditApi auditApi;
ApiService() {
final endpoint = Store.tryGet(StoreKey.serverEndpoint);
@@ -43,6 +44,7 @@ class ApiService {
searchApi = SearchApi(_apiClient);
partnerApi = PartnerApi(_apiClient);
personApi = PersonApi(_apiClient);
auditApi = AuditApi(_apiClient);
}
Future<String> resolveAndSetEndpoint(String serverUrl) async {

View File

@@ -3,7 +3,6 @@ import 'dart:async';
import 'package:flutter/material.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/shared/models/asset.dart';
import 'package:immich_mobile/shared/models/etag.dart';
import 'package:immich_mobile/shared/models/exif_info.dart';
import 'package:immich_mobile/shared/models/store.dart';
import 'package:immich_mobile/shared/models/user.dart';
@@ -11,7 +10,6 @@ import 'package:immich_mobile/shared/providers/api.provider.dart';
import 'package:immich_mobile/shared/providers/db.provider.dart';
import 'package:immich_mobile/shared/services/api.service.dart';
import 'package:immich_mobile/shared/services/sync.service.dart';
import 'package:immich_mobile/utils/openapi_extensions.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
@@ -39,37 +37,34 @@ class AssetService {
/// Checks the server for updated assets and updates the local database if
/// required. Returns `true` if there were any changes.
Future<bool> refreshRemoteAssets([User? user]) async {
user ??= Store.get(StoreKey.currentUser);
user ??= Store.get<User>(StoreKey.currentUser);
final Stopwatch sw = Stopwatch()..start();
final int numOwnedRemoteAssets = await _db.assets
.where()
.remoteIdIsNotNull()
.filter()
.ownerIdEqualTo(user!.isarId)
.count();
final bool changes = await _syncService.syncRemoteAssetsToDb(
user,
() async => (await _getRemoteAssets(
hasCache: numOwnedRemoteAssets > 0,
user: user!,
)),
_getRemoteAssetChanges,
_getRemoteAssets,
);
debugPrint("refreshRemoteAssets full took ${sw.elapsedMilliseconds}ms");
return changes;
}
/// Returns `(null, null)` if changes are invalid -> requires full sync
Future<(List<Asset>? toUpsert, List<String>? toDelete)>
_getRemoteAssetChanges(User user, DateTime since) async {
final deleted = await _apiService.auditApi
.getAuditDeletes(EntityType.ASSET, since, userId: user.id);
if (deleted == null || deleted.needsFullSync) return (null, null);
final assetDto = await _apiService.assetApi
.getAllAssets(userId: user.id, updatedAfter: since);
if (assetDto == null) return (null, null);
return (assetDto.map(Asset.remote).toList(), deleted.ids);
}
/// Returns `null` if the server state did not change, else list of assets
Future<List<Asset>?> _getRemoteAssets({
required bool hasCache,
required User user,
}) async {
Future<List<Asset>?> _getRemoteAssets(User user) async {
try {
final etag = hasCache ? _db.eTags.getByIdSync(user.id)?.value : null;
final (List<AssetResponseDto>? assets, String? newETag) =
await _apiService.assetApi.getAllAssetsWithETag(
eTag: etag,
userId: user.id,
);
final List<AssetResponseDto>? assets =
await _apiService.assetApi.getAllAssets(userId: user.id);
if (assets == null) {
return null;
} else if (assets.isNotEmpty && assets.first.ownerId != user.id) {
@@ -77,8 +72,6 @@ class AssetService {
" The server returned assets for user ${assets.first.ownerId}"
" while requesting assets of user ${user.id}");
return null;
} else if (newETag != etag) {
_db.writeTxn(() => _db.eTags.put(ETag(id: user.id, value: newETag)));
}
return assets.map(Asset.remote).toList();
} catch (error, stack) {

View File

@@ -69,9 +69,17 @@ class SyncService {
/// Returns `true` if there were any changes
Future<bool> syncRemoteAssetsToDb(
User user,
FutureOr<List<Asset>?> Function() loadAssets,
Future<(List<Asset>? toUpsert, List<String>? toDelete)> Function(
User user,
DateTime since,
) getChangedAssets,
FutureOr<List<Asset>?> Function(User user) loadAssets,
) =>
_lock.run(() => _syncRemoteAssetsToDb(user, loadAssets));
_lock.run(
() async =>
await _syncRemoteAssetChanges(user, getChangedAssets) ??
await _syncRemoteAssetsFull(user, loadAssets),
);
/// Syncs remote albums to the database
/// returns `true` if there were any changes
@@ -130,13 +138,59 @@ class SyncService {
return true;
}
/// Syncs remote assets to the databas
/// returns `true` if there were any changes
Future<bool> _syncRemoteAssetsToDb(
/// Efficiently syncs assets via changes. Returns `null` when a full sync is required.
Future<bool?> _syncRemoteAssetChanges(
User user,
FutureOr<List<Asset>?> Function() loadAssets,
Future<(List<Asset>? toUpsert, List<String>? toDelete)> Function(
User user,
DateTime since,
) getChangedAssets,
) async {
final List<Asset>? remote = await loadAssets();
final DateTime? since = _db.eTags.getByIdSync(user.id)?.time?.toUtc();
if (since == null) return null;
final DateTime now = DateTime.now();
final (toUpsert, toDelete) = await getChangedAssets(user, since);
if (toUpsert == null || toDelete == null) return null;
try {
if (toDelete.isNotEmpty) {
await _handleRemoteAssetRemoval(toDelete);
}
if (toUpsert.isNotEmpty) {
final (_, updated) = await _linkWithExistingFromDb(toUpsert);
await upsertAssetsWithExif(updated);
}
if (toUpsert.isNotEmpty || toDelete.isNotEmpty) {
await _updateUserAssetsETag(user, now);
return true;
}
return false;
} on IsarError catch (e) {
_log.severe("Failed to sync remote assets to db: $e");
}
return null;
}
/// Deletes remote-only assets, updates merged assets to be local-only
Future<void> _handleRemoteAssetRemoval(List<String> idsToDelete) {
return _db.writeTxn(() async {
await _db.assets.remote(idsToDelete).filter().localIdIsNull().deleteAll();
final onlyLocal = await _db.assets.remote(idsToDelete).findAll();
if (onlyLocal.isNotEmpty) {
for (final Asset a in onlyLocal) {
a.remoteId = null;
}
await _db.assets.putAll(onlyLocal);
}
});
}
/// Syncs assets by loading and comparing all assets from the server.
Future<bool> _syncRemoteAssetsFull(
User user,
FutureOr<List<Asset>?> Function(User user) loadAssets,
) async {
final DateTime now = DateTime.now();
final List<Asset>? remote = await loadAssets(user);
if (remote == null) {
return false;
}
@@ -150,6 +204,7 @@ class SyncService {
remote.sort(Asset.compareByChecksum);
final (toAdd, toUpdate, toRemove) = _diffAssets(remote, inDb, remote: true);
if (toAdd.isEmpty && toUpdate.isEmpty && toRemove.isEmpty) {
await _updateUserAssetsETag(user, now);
return false;
}
final idsToDelete = toRemove.map((e) => e.id).toList();
@@ -159,9 +214,13 @@ class SyncService {
} on IsarError catch (e) {
_log.severe("Failed to sync remote assets to db: $e");
}
await _updateUserAssetsETag(user, now);
return true;
}
Future<void> _updateUserAssetsETag(User user, DateTime time) =>
_db.writeTxn(() => _db.eTags.put(ETag(id: user.id, time: time)));
/// Syncs remote albums to the database
/// returns `true` if there were any changes
Future<bool> _syncRemoteAlbumsToDb(
@@ -450,6 +509,14 @@ class SyncService {
_log.fine(
"Only excluded assets in local album ${ape.name} changed. Stopping sync.",
);
if (assetCountOnDevice !=
_db.eTags.getByIdSync(ape.eTagKeyAssetCount)?.assetCount) {
await _db.writeTxn(
() => _db.eTags.put(
ETag(id: ape.eTagKeyAssetCount, assetCount: assetCountOnDevice),
),
);
}
return false;
}
_log.fine(
@@ -477,7 +544,7 @@ class SyncService {
album.thumbnail.value ??= await album.assets.filter().findFirst();
await album.thumbnail.save();
await _db.eTags.put(
ETag(id: ape.eTagKeyAssetCount, value: assetCountOnDevice.toString()),
ETag(id: ape.eTagKeyAssetCount, assetCount: assetCountOnDevice),
);
});
_log.info("Synced changes of local album ${ape.name} to DB");
@@ -496,7 +563,7 @@ class SyncService {
}
final int totalOnDevice = await ape.assetCountAsync;
final int lastKnownTotal =
(await _db.eTags.getById(ape.eTagKeyAssetCount))?.value?.toInt() ?? 0;
(await _db.eTags.getById(ape.eTagKeyAssetCount))?.assetCount ?? 0;
final AssetPathEntity? modified = totalOnDevice > lastKnownTotal
? await ape.fetchPathProperties(
filterOptionGroup: FilterOptionGroup(
@@ -523,9 +590,8 @@ class SyncService {
await _db.assets.putAll(updated);
await album.assets.update(link: existingInDb + updated);
await _db.albums.put(album);
await _db.eTags.put(
ETag(id: ape.eTagKeyAssetCount, value: totalOnDevice.toString()),
);
await _db.eTags
.put(ETag(id: ape.eTagKeyAssetCount, assetCount: totalOnDevice));
});
_log.info("Fast synced local album ${ape.name} to DB");
} on IsarError catch (e) {
@@ -667,7 +733,7 @@ class SyncService {
a.lastModified == null ||
!a.lastModified!.isAtSameMomentAs(b.modifiedAt) ||
await a.assetCountAsync !=
(await _db.eTags.getById(a.eTagKeyAssetCount))?.value?.toInt();
(await _db.eTags.getById(a.eTagKeyAssetCount))?.assetCount;
}
}