significantly faster sync with transactions

This commit is contained in:
zadam 2020-04-04 21:49:57 +02:00
parent ae50c9847d
commit b1bed18331
4 changed files with 31 additions and 28 deletions

View file

@ -126,7 +126,7 @@ async function getChanged(req) {
}; };
if (ret.syncs.length > 0) { if (ret.syncs.length > 0) {
log.info(`Returning ${ret.syncs.length} in ${Date.now() - startTime}ms`); log.info(`Returning ${ret.syncs.length} sync records in ${Date.now() - startTime}ms`);
} }
return ret; return ret;

View file

@ -18,6 +18,9 @@ async function createConnection() {
const dbConnection = new Promise(async (resolve, reject) => { const dbConnection = new Promise(async (resolve, reject) => {
// no need to create new connection now since DB stays the same all the time // no need to create new connection now since DB stays the same all the time
const db = await createConnection(); const db = await createConnection();
db.run('PRAGMA journal_mode = WAL;');
sql.setDbConnection(db); sql.setDbConnection(db);
resolve(); resolve();

View file

@ -145,29 +145,25 @@ async function pullSync(syncContext) {
break; break;
} }
log.info(`Pulled ${rows.length} changes from ${changesUri} in ${Date.now() - startDate}ms`); await sql.transactional(async () => {
for (const {sync, entity} of rows) {
if (!sourceIdService.isLocalSourceId(sync.sourceId)) {
if (appliedPulls === 0 && sync.entity !== 'recent_notes') { // send only for first
ws.syncPullInProgress();
for (const {sync, entity} of rows) { appliedPulls++;
if (!sourceIdService.isLocalSourceId(sync.sourceId)) { }
if (appliedPulls === 0 && sync.entity !== 'recent_notes') { // send only for first
ws.syncPullInProgress();
appliedPulls++; await syncUpdateService.updateEntity(sync, entity, syncContext.sourceId);
} }
const startTime = Date.now(); stats.outstandingPulls = resp.maxSyncId - sync.id;
const updated = await syncUpdateService.updateEntity(sync, entity, syncContext.sourceId);
if (updated) {
log.info(`Updated ${sync.entityName} ${sync.entityId} in ${Date.now() - startTime}ms`);
}
} }
stats.outstandingPulls = resp.maxSyncId - sync.id; await setLastSyncedPull(rows[rows.length - 1].sync.id);
} });
await setLastSyncedPull(rows[rows.length - 1].sync.id); log.info(`Pulled and updated ${rows.length} changes from ${changesUri} in ${Date.now() - startDate}ms`);
} }
if (appliedPulls > 0) { if (appliedPulls > 0) {
@ -257,8 +253,6 @@ async function checkContentHash(syncContext) {
await syncTableService.addEntitySyncsForSector(entityName, entityPrimaryKey, sector); await syncTableService.addEntitySyncsForSector(entityName, entityPrimaryKey, sector);
await syncRequest(syncContext, 'POST', `/api/sync/queue-sector/${entityName}/${sector}`); await syncRequest(syncContext, 'POST', `/api/sync/queue-sector/${entityName}/${sector}`);
log.info(`Added sector ${sector} of ${entityName} to sync queue.`);
} }
return failedChecks.length > 0; return failedChecks.length > 0;

View file

@ -33,19 +33,25 @@ async function addEntitySync(entityName, entityId, sourceId, isSynced) {
} }
async function addEntitySyncsForSector(entityName, entityPrimaryKey, sector) { async function addEntitySyncsForSector(entityName, entityPrimaryKey, sector) {
const entityIds = await sql.getColumn(`SELECT ${entityPrimaryKey} FROM ${entityName} WHERE SUBSTR(${entityPrimaryKey}, 1, 1) = ?`, [sector]); const startTime = Date.now();
for (const entityId of entityIds) { await sql.transactional(async () => {
if (entityName === 'options') { const entityIds = await sql.getColumn(`SELECT ${entityPrimaryKey} FROM ${entityName} WHERE SUBSTR(${entityPrimaryKey}, 1, 1) = ?`, [sector]);
const isSynced = await sql.getValue(`SELECT isSynced FROM options WHERE name = ?`, [entityId]);
if (!isSynced) { for (const entityId of entityIds) {
continue; if (entityName === 'options') {
const isSynced = await sql.getValue(`SELECT isSynced FROM options WHERE name = ?`, [entityId]);
if (!isSynced) {
continue;
}
} }
}
await insertEntitySync(entityName, entityId, 'content-check', true); await insertEntitySync(entityName, entityId, 'content-check', true);
} }
});
log.info(`Added sector ${sector} of ${entityName} to sync queue in ${Date.now() - startTime}ms.`);
} }
async function cleanupSyncRowsForMissingEntities(entityName, entityPrimaryKey) { async function cleanupSyncRowsForMissingEntities(entityName, entityPrimaryKey) {