From 8630b3685dc572f053db399a1aba223c26130abd Mon Sep 17 00:00:00 2001 From: azivner Date: Sun, 29 Oct 2017 22:22:30 -0400 Subject: [PATCH] incremental push sync --- routes/api/sync.js | 16 +++- services/sql.js | 19 ++++- services/sync.js | 178 ++++++++++++++++++++++++++++----------------- 3 files changed, 142 insertions(+), 71 deletions(-) diff --git a/routes/api/sync.js b/routes/api/sync.js index dbef955c6..672a57393 100644 --- a/routes/api/sync.js +++ b/routes/api/sync.js @@ -35,8 +35,20 @@ router.get('/note/:noteId/:since', auth.checkApiAuth, async (req, res, next) => res.send(await sync.getNoteSince(noteId, since)); }); -router.put('/note', auth.checkApiAuth, async (req, res, next) => { - await sync.putNote(req.body); +router.put('/notes', auth.checkApiAuth, async (req, res, next) => { + await sync.updateNote(req.body); + + res.send({}); +}); + +router.put('/notes_tree', auth.checkApiAuth, async (req, res, next) => { + await sync.updateNoteTree(req.body); + + res.send({}); +}); + +router.put('/notes_history', auth.checkApiAuth, async (req, res, next) => { + await sync.updateNoteHistory(req.body); res.send({}); }); diff --git a/services/sql.js b/services/sql.js index 3ba03ffea..03dbfc358 100644 --- a/services/sql.js +++ b/services/sql.js @@ -21,6 +21,10 @@ async function insert(table_name, rec, replace = false) { return res.lastID; } +async function replace(table_name, rec) { + return await insert(table_name, rec, true); +} + async function beginTransaction() { return await db.run("BEGIN"); } @@ -56,7 +60,17 @@ async function getSingleResult(query, params = []) { async function getSingleResultOrNull(query, params = []) { const all = await db.all(query, ...params); - return all ? all[0] : null; + return all.length > 0 ? all[0] : null; +} + +async function getSingleValue(query, params = []) { + const row = await getSingleResultOrNull(query, params); + + if (!row) { + return null; + } + + return row[Object.keys(row)[0]]; } async function getResults(query, params = []) { @@ -126,7 +140,10 @@ async function doInTransaction(func) { module.exports = { insert, + replace, + getSingleValue, getSingleResult, + getSingleResultOrNull, getResults, getFlattenedResults, execute, diff --git a/services/sync.js b/services/sync.js index bbb2aea46..d09ef543d 100644 --- a/services/sync.js +++ b/services/sync.js @@ -69,57 +69,69 @@ async function pullSync(cookieJar, syncLog) { } } +async function syncEntity(entity, entityName, cookieJar, syncLog) { + try { + const payload = { + entity: entity + }; + + if (entityName === 'notes') { + payload.links = await sql.getResults('select * from links where note_id = ?', [entity.note_id]); + } + + await rp({ + method: 'PUT', + uri: SYNC_SERVER + '/api/sync/' + entityName, + body: payload, + json: true, + timeout: 60 * 1000, + jar: cookieJar + }); + } + catch (e) { + throw new Error("Failed sending update for entity " + entityName + ", inner exception: " + e.stack); + } +} + +async function syncEntities(entities, entityName, cookieJar, syncLog) { + for (const entity of entities) { + await syncEntity(entity, entityName, cookieJar, syncLog); + } +} + async function pushSync(cookieJar, syncLog) { - const lastSyncedPush = parseInt(await sql.getOption('last_synced_push')); + let lastSyncedPush = parseInt(await sql.getOption('last_synced_push')); const syncStarted = utils.nowTimestamp(); - const changed = await getChangedSince(lastSyncedPush); + while (true) { + const oldestUnsyncedDateModified = await sql.getSingleValue(` + SELECT MIN(date_modified) FROM ( + SELECT MIN(date_modified) AS date_modified FROM notes_tree WHERE date_modified > ? + UNION + SELECT MIN(date_modified) AS date_modified FROM notes WHERE date_modified > ? + UNION + SELECT MIN(date_modified_to) AS date_modified FROM notes_history WHERE date_modified_to > ? + )`, [lastSyncedPush, lastSyncedPush, lastSyncedPush]); - if (changed.tree.length > 0 || changed.audit_log.length > 0) { - logSync("Sending " + changed.tree.length + " tree changes and " + changed.audit_log.length + " audit changes", syncLog); + if (oldestUnsyncedDateModified === null) { + break; + } - try { - await rp({ - method: 'PUT', - uri: SYNC_SERVER + '/api/sync/changed', - headers: { - auth: 'sync' - }, - body: changed, - json: true, - timeout: 300 * 1000, // this can take long time - jar: cookieJar - }); - } - catch (e) { - throw new Error("Can't send tree changes and audit, inner exception: " + e.stack); - } + await sql.doInTransaction(async () => { + const notesTree = await sql.getResults('SELECT * FROM notes_tree WHERE date_modified = ?', [oldestUnsyncedDateModified]); + await syncEntities(notesTree, 'notes_tree', cookieJar, syncLog); + + const notes = await sql.getResults('SELECT * FROM notes WHERE date_modified = ?', [oldestUnsyncedDateModified]); + await syncEntities(notes, 'notes', cookieJar, syncLog); + + const notesHistory = await sql.getResults('SELECT * FROM notes_history WHERE date_modified_to = ?', [oldestUnsyncedDateModified]); + await syncEntities(notesHistory, 'notes_history', cookieJar, syncLog); + + lastSyncedPush = oldestUnsyncedDateModified; + + await sql.setOption('last_synced_push', lastSyncedPush); + }); } - - for (const noteId of changed.notes) { - logSync("Sending note " + noteId, syncLog); - - const note = await getNoteSince(noteId); - - try { - await rp({ - method: 'PUT', - uri: SYNC_SERVER + '/api/sync/note', - headers: { - auth: 'sync' - }, - body: note, - json: true, - timeout: 60 * 1000, - jar: cookieJar - }); - } - catch (e) { - throw new Error("Can't send note update, inner exception: " + e.stack); - } - } - - await sql.setOption('last_synced_push', syncStarted); } async function login() { @@ -173,7 +185,7 @@ async function sync() { await pushSync(cookieJar, syncLog); - await pullSync(cookieJar, syncLog); + //await pullSync(cookieJar, syncLog); } catch (e) { logSync("sync failed: " + e.stack, syncLog); @@ -234,34 +246,62 @@ async function putChanged(changed, syncLog) { } } -async function putNote(note, syncLog) { - const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [note.detail.note_id]); +async function updateNote(body, syncLog) { + const entity = body.entity; - try { - if (origNote !== null && origNote.date_modified >= note.detail.date_modified) { - // version we have in DB is actually newer than the one we're getting from sync - // so we'll leave the current state as it is. The synced version should be stored in the history - } - else { - await sql.insert("notes", note.detail, true); - } + const origNote = await sql.getSingleResult("select * from notes where note_id = ?", [entity.note_id]); - await sql.remove("images", note.detail.note_id); + if (origNote === null || origNote.date_modified <= entity.date_modified) { + await sql.doInTransaction(async () => { + await sql.replace("notes", entity); - for (const image of note.images) { - await sql.insert("images", image); - } + await sql.remove("links", entity.note_id); - for (const history of note.history) { - delete history['id']; + for (const link of body.links) { + delete link['lnk_id']; - await sql.insert("notes_history", history, true); - } + await sql.insert('link', link); + } + }); - logSync("Update/sync note " + note.detail.note_id, syncLog); + logSync("Update/sync note " + entity.note_id, syncLog); } - catch (e) { - throw new Error("Update note " + note.detail.note_id + " failed, inner exception: " + e.stack); + else { + logSync("Sync conflict in note " + entity.note_id, syncLog); + } +} + +async function updateNoteTree(body, syncLog) { + const entity = body.entity; + + const orig = await sql.getSingleResultOrNull("select * from notes_tree where note_id = ?", [entity.note_id]); + + if (orig === null || orig.date_modified < entity.date_modified) { + await sql.replace('notes_tree', entity); + + logSync("Update/sync note tree " + entity.note_id, syncLog); + } + else { + logSync("Sync conflict in note tree " + entity.note_id, syncLog); + } +} + +async function updateNoteHistory(body, syncLog) { + const entity = body.entity; + + const orig = await sql.getSingleResultOrNull("select * from notes_history where note_id = ? and date_modified_from = ?", [entity.note_id, entity.date_modified_from]); + + if (orig === null || orig.date_modified_to < entity.date_modified_to) { + await sql.execute("delete from notes_history where note_id = ? and date_modified_from = ?", [entity.note_id, entity.date_modified_from]); + + delete entity['id']; + + await sql.insert('notes_history', entity); + + logSync("Update/sync note history " + entity.note_id, syncLog); + } + else { + logSync("Sync conflict in note history for " + entity.note_id + ", from=" + entity.date_modified_from + ", to=" + entity.date_modified_to, syncLog); } } @@ -282,5 +322,7 @@ module.exports = { getChangedSince, getNoteSince, putChanged, - putNote + updateNote, + updateNoteTree, + updateNoteHistory }; \ No newline at end of file