feature(mobile): Hardening synchronization mechanism + Pull to refresh (#2085)

* fix(mobile): allow syncing duplicate local IDs

* enable to run isar unit tests on CI

* serialize sync operations, add pull to refresh on timeline

---------

Co-authored-by: Fynn Petersen-Frey <zoodyy@users.noreply.github.com>
This commit is contained in:
Fynn Petersen-Frey
2023-03-27 04:35:52 +02:00
committed by GitHub
parent 1a94530935
commit cae37657e9
21 changed files with 653 additions and 249 deletions

View File

@@ -45,14 +45,11 @@ class AssetService {
.filter()
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.count();
final List<AssetResponseDto>? dtos =
await _getRemoteAssets(hasCache: numOwnedRemoteAssets > 0);
if (dtos == null) {
debugPrint("refreshRemoteAssets fast took ${sw.elapsedMilliseconds}ms");
return false;
}
final bool changes = await _syncService
.syncRemoteAssetsToDb(dtos.map(Asset.remote).toList());
final bool changes = await _syncService.syncRemoteAssetsToDb(
() async => (await _getRemoteAssets(hasCache: numOwnedRemoteAssets > 0))
?.map(Asset.remote)
.toList(),
);
debugPrint("refreshRemoteAssets full took ${sw.elapsedMilliseconds}ms");
return changes;
}

View File

@@ -20,7 +20,7 @@ class ImmichLogger {
static final ImmichLogger _instance = ImmichLogger._internal();
final maxLogEntries = 200;
final Isar _db = Isar.getInstance()!;
final List<LoggerMessage> _msgBuffer = [];
List<LoggerMessage> _msgBuffer = [];
Timer? _timer;
factory ImmichLogger() => _instance;
@@ -41,7 +41,12 @@ class ImmichLogger {
final msgCount = _db.loggerMessages.countSync();
if (msgCount > maxLogEntries) {
final numberOfEntryToBeDeleted = msgCount - maxLogEntries;
_db.loggerMessages.where().limit(numberOfEntryToBeDeleted).deleteAll();
_db.writeTxn(
() => _db.loggerMessages
.where()
.limit(numberOfEntryToBeDeleted)
.deleteAll(),
);
}
}
@@ -63,8 +68,9 @@ class ImmichLogger {
void _flushBufferToDatabase() {
_timer = null;
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
_msgBuffer.clear();
final buffer = _msgBuffer;
_msgBuffer = [];
_db.writeTxn(() => _db.loggerMessages.putAll(buffer));
}
void clearLogs() {
@@ -111,7 +117,7 @@ class ImmichLogger {
void flush() {
if (_timer != null) {
_timer!.cancel();
_flushBufferToDatabase();
_db.writeTxnSync(() => _db.loggerMessages.putAllSync(_msgBuffer));
}
}
}

View File

@@ -1,7 +1,6 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/shared/models/album.dart';
import 'package:immich_mobile/shared/models/asset.dart';
@@ -61,8 +60,10 @@ class SyncService {
/// Syncs remote assets owned by the logged-in user to the DB
/// Returns `true` if there were any changes
Future<bool> syncRemoteAssetsToDb(List<Asset> remote) =>
_lock.run(() => _syncRemoteAssetsToDb(remote));
Future<bool> syncRemoteAssetsToDb(
FutureOr<List<Asset>?> Function() loadAssets,
) =>
_lock.run(() => _syncRemoteAssetsToDb(loadAssets));
/// Syncs remote albums to the database
/// returns `true` if there were any changes
@@ -97,19 +98,72 @@ class SyncService {
.toList();
}
/// Syncs a new asset to the db. Returns `true` if successful
Future<bool> syncNewAssetToDb(Asset newAsset) =>
_lock.run(() => _syncNewAssetToDb(newAsset));
// private methods:
/// Syncs a new asset to the db. Returns `true` if successful
Future<bool> _syncNewAssetToDb(Asset newAsset) async {
final List<Asset> inDb = await _db.assets
.where()
.localIdDeviceIdEqualTo(newAsset.localId, newAsset.deviceId)
.findAll();
Asset? match;
if (inDb.length == 1) {
// exactly one match: trivial case
match = inDb.first;
} else if (inDb.length > 1) {
// TODO instead of this heuristics: match by checksum once available
for (Asset a in inDb) {
if (a.ownerId == newAsset.ownerId &&
a.fileModifiedAt == newAsset.fileModifiedAt) {
assert(match == null);
match = a;
}
}
if (match == null) {
for (Asset a in inDb) {
if (a.ownerId == newAsset.ownerId) {
assert(match == null);
match = a;
}
}
}
}
if (match != null) {
// unify local/remote assets by replacing the
// local-only asset in the DB with a local&remote asset
newAsset.updateFromDb(match);
}
try {
await _db.writeTxn(() => newAsset.put(_db));
} on IsarError catch (e) {
_log.severe("Failed to put new asset into db: $e");
return false;
}
return true;
}
/// Syncs remote assets to the databas
/// returns `true` if there were any changes
Future<bool> _syncRemoteAssetsToDb(List<Asset> remote) async {
Future<bool> _syncRemoteAssetsToDb(
FutureOr<List<Asset>?> Function() loadAssets,
) async {
final List<Asset>? remote = await loadAssets();
if (remote == null) {
return false;
}
final User user = Store.get(StoreKey.currentUser);
final List<Asset> inDb = await _db.assets
.filter()
.ownerIdEqualTo(user.isarId)
.sortByDeviceId()
.thenByLocalId()
.thenByFileModifiedAt()
.findAll();
remote.sort(Asset.compareByDeviceIdLocalId);
remote.sort(Asset.compareByOwnerDeviceLocalIdModified);
final diff = _diffAssets(remote, inDb, remote: true);
if (diff.first.isEmpty && diff.second.isEmpty && diff.third.isEmpty) {
return false;
@@ -119,7 +173,7 @@ class SyncService {
await _db.writeTxn(() => _db.assets.deleteAll(idsToDelete));
await _upsertAssetsWithExif(diff.first + diff.second);
} on IsarError catch (e) {
debugPrint(e.toString());
_log.severe("Failed to sync remote assets to db: $e");
}
return true;
}
@@ -188,10 +242,15 @@ class SyncService {
if (dto.assetCount != dto.assets.length) {
return false;
}
final assetsInDb =
await album.assets.filter().sortByDeviceId().thenByLocalId().findAll();
final assetsInDb = await album.assets
.filter()
.sortByOwnerId()
.thenByDeviceId()
.thenByLocalId()
.thenByFileModifiedAt()
.findAll();
final List<Asset> assetsOnRemote = dto.getAssets();
assetsOnRemote.sort(Asset.compareByDeviceIdLocalId);
assetsOnRemote.sort(Asset.compareByOwnerDeviceLocalIdModified);
final d = _diffAssets(assetsOnRemote, assetsInDb);
final List<Asset> toAdd = d.first, toUpdate = d.second, toUnlink = d.third;
@@ -237,7 +296,7 @@ class SyncService {
await _db.albums.put(album);
});
} on IsarError catch (e) {
debugPrint(e.toString());
_log.severe("Failed to sync remote album to database $e");
}
if (album.shared || dto.shared) {
@@ -300,7 +359,7 @@ class SyncService {
assert(ok);
_log.info("Removed local album $album from DB");
} catch (e) {
_log.warning("Failed to remove local album $album from DB");
_log.severe("Failed to remove local album $album from DB");
}
}
@@ -331,7 +390,7 @@ class SyncService {
_addAlbumFromDevice(ape, existing, excludedAssets),
onlySecond: (Album a) => _removeAlbumFromDb(a, deleteCandidates),
);
final pair = _handleAssetRemoval(deleteCandidates, existing);
final pair = _handleAssetRemoval(deleteCandidates, existing, remote: false);
if (pair.first.isNotEmpty || pair.second.isNotEmpty) {
await _db.writeTxn(() async {
await _db.assets.deleteAll(pair.first);
@@ -366,7 +425,12 @@ class SyncService {
}
// general case, e.g. some assets have been deleted or there are excluded albums on iOS
final inDb = await album.assets.filter().sortByLocalId().findAll();
final inDb = await album.assets
.filter()
.ownerIdEqualTo(Store.get(StoreKey.currentUser).isarId)
.deviceIdEqualTo(Store.get(StoreKey.deviceIdHash))
.sortByLocalId()
.findAll();
final List<Asset> onDevice =
await ape.getAssets(excludedAssets: excludedAssets);
onDevice.sort(Asset.compareByLocalId);
@@ -401,7 +465,7 @@ class SyncService {
});
_log.info("Synced changes of local album $ape to DB");
} on IsarError catch (e) {
_log.warning("Failed to update synced album $ape in DB: $e");
_log.severe("Failed to update synced album $ape in DB: $e");
}
return true;
@@ -438,7 +502,7 @@ class SyncService {
});
_log.info("Fast synced local album $ape to DB");
} on IsarError catch (e) {
_log.warning("Failed to fast sync local album $ape to DB: $e");
_log.severe("Failed to fast sync local album $ape to DB: $e");
return false;
}
@@ -470,7 +534,7 @@ class SyncService {
await _db.writeTxn(() => _db.albums.store(a));
_log.info("Added a new local album to DB: $ape");
} on IsarError catch (e) {
_log.warning("Failed to add new local album $ape to DB: $e");
_log.severe("Failed to add new local album $ape to DB: $e");
}
}
@@ -487,15 +551,19 @@ class SyncService {
assets,
(q, Asset e) => q.localIdDeviceIdEqualTo(e.localId, e.deviceId),
)
.sortByDeviceId()
.sortByOwnerId()
.thenByDeviceId()
.thenByLocalId()
.thenByFileModifiedAt()
.findAll();
assets.sort(Asset.compareByDeviceIdLocalId);
assets.sort(Asset.compareByOwnerDeviceLocalIdModified);
final List<Asset> existing = [], toUpsert = [];
diffSortedListsSync(
inDb,
assets,
compare: Asset.compareByDeviceIdLocalId,
// do not compare by modified date because for some assets dates differ on
// client and server, thus never reaching "both" case below
compare: Asset.compareByOwnerDeviceLocalId,
both: (Asset a, Asset b) {
if ((a.isLocal || !b.isLocal) &&
(a.isRemote || !b.isRemote) &&
@@ -541,7 +609,7 @@ Triple<List<Asset>, List<Asset>, List<Asset>> _diffAssets(
List<Asset> assets,
List<Asset> inDb, {
bool? remote,
int Function(Asset, Asset) compare = Asset.compareByDeviceIdLocalId,
int Function(Asset, Asset) compare = Asset.compareByOwnerDeviceLocalId,
}) {
final List<Asset> toAdd = [];
final List<Asset> toUpdate = [];
@@ -582,15 +650,20 @@ Triple<List<Asset>, List<Asset>, List<Asset>> _diffAssets(
/// returns a tuple (toDelete toUpdate) when assets are to be deleted
Pair<List<int>, List<Asset>> _handleAssetRemoval(
List<Asset> deleteCandidates,
List<Asset> existing,
) {
List<Asset> existing, {
bool? remote,
}) {
if (deleteCandidates.isEmpty) {
return const Pair([], []);
}
deleteCandidates.sort(Asset.compareById);
existing.sort(Asset.compareById);
final triple =
_diffAssets(existing, deleteCandidates, compare: Asset.compareById);
final triple = _diffAssets(
existing,
deleteCandidates,
compare: Asset.compareById,
remote: remote,
);
return Pair(triple.third.map((e) => e.id).toList(), triple.second);
}