2017-10-22 09:10:33 +08:00
"use strict" ;
2017-10-26 10:39:21 +08:00
const log = require ( './log' ) ;
const rp = require ( 'request-promise' ) ;
const sql = require ( './sql' ) ;
const migration = require ( './migration' ) ;
2017-10-27 09:16:21 +08:00
const utils = require ( './utils' ) ;
2017-10-27 11:21:31 +08:00
const config = require ( './config' ) ;
const audit _category = require ( './audit_category' ) ;
2017-10-29 10:17:00 +08:00
const crypto = require ( 'crypto' ) ;
2017-10-23 08:22:09 +08:00
2017-10-27 11:21:31 +08:00
const SYNC _SERVER = config [ 'Sync' ] [ 'syncServerHost' ] ;
2017-10-26 10:39:21 +08:00
let syncInProgress = false ;
2017-10-29 23:22:41 +08:00
async function pullSync ( cookieJar , syncLog ) {
2017-10-27 09:16:21 +08:00
const lastSyncedPull = parseInt ( await sql . getOption ( 'last_synced_pull' ) ) ;
2017-10-27 08:31:31 +08:00
2017-10-30 02:55:48 +08:00
let resp ;
try {
resp = await rp ( {
uri : SYNC _SERVER + '/api/sync/changed/' + lastSyncedPull ,
headers : {
auth : 'sync'
} ,
jar : cookieJar ,
json : true
} ) ;
}
catch ( e ) {
throw new Error ( "Can't pull changed, inner exception: " + e . stack ) ;
}
2017-10-27 08:31:31 +08:00
2017-10-26 10:39:21 +08:00
try {
2017-10-30 06:50:28 +08:00
await sql . doInTransaction ( async ( ) => {
await putChanged ( resp , syncLog ) ;
for ( const noteId of resp . notes ) {
let note ;
try {
note = await rp ( {
uri : SYNC _SERVER + "/api/sync/note/" + noteId + "/" + lastSyncedPull ,
headers : {
auth : 'sync'
} ,
json : true ,
jar : cookieJar
} ) ;
}
catch ( e ) {
throw new Error ( "Can't pull note " + noteId + ", inner exception: " + e . stack ) ;
}
await putNote ( note , syncLog ) ;
2017-10-30 02:55:48 +08:00
}
2017-10-30 06:50:28 +08:00
if ( resp . notes . length > 0 ) {
await sql . addAudit ( audit _category . SYNC ) ;
}
2017-10-26 10:39:21 +08:00
2017-10-30 06:50:28 +08:00
await sql . setOption ( 'last_synced_pull' , resp . syncTimestamp ) ;
} ) ;
2017-10-27 08:31:31 +08:00
}
catch ( e ) {
throw e ;
}
}
2017-10-26 10:39:21 +08:00
2017-10-30 10:22:30 +08:00
async function syncEntity ( entity , entityName , cookieJar , syncLog ) {
try {
const payload = {
entity : entity
} ;
2017-10-26 10:39:21 +08:00
2017-10-30 10:22:30 +08:00
if ( entityName === 'notes' ) {
payload . links = await sql . getResults ( 'select * from links where note_id = ?' , [ entity . note _id ] ) ;
2017-10-30 02:55:48 +08:00
}
2017-10-30 10:22:30 +08:00
await rp ( {
method : 'PUT' ,
uri : SYNC _SERVER + '/api/sync/' + entityName ,
body : payload ,
json : true ,
timeout : 60 * 1000 ,
jar : cookieJar
} ) ;
2017-10-29 23:22:41 +08:00
}
2017-10-30 10:22:30 +08:00
catch ( e ) {
throw new Error ( "Failed sending update for entity " + entityName + ", inner exception: " + e . stack ) ;
}
}
2017-10-27 09:16:21 +08:00
2017-10-30 10:22:30 +08:00
async function syncEntities ( entities , entityName , cookieJar , syncLog ) {
for ( const entity of entities ) {
await syncEntity ( entity , entityName , cookieJar , syncLog ) ;
2017-10-27 09:16:21 +08:00
}
2017-10-30 10:22:30 +08:00
}
async function pushSync ( cookieJar , syncLog ) {
let lastSyncedPush = parseInt ( await sql . getOption ( 'last_synced_push' ) ) ;
const syncStarted = utils . nowTimestamp ( ) ;
while ( true ) {
const oldestUnsyncedDateModified = await sql . getSingleValue ( `
SELECT MIN ( date _modified ) FROM (
SELECT MIN ( date _modified ) AS date _modified FROM notes _tree WHERE date _modified > ?
UNION
SELECT MIN ( date _modified ) AS date _modified FROM notes WHERE date _modified > ?
UNION
SELECT MIN ( date _modified _to ) AS date _modified FROM notes _history WHERE date _modified _to > ?
) ` , [lastSyncedPush, lastSyncedPush, lastSyncedPush]);
if ( oldestUnsyncedDateModified === null ) {
break ;
}
await sql . doInTransaction ( async ( ) => {
const notesTree = await sql . getResults ( 'SELECT * FROM notes_tree WHERE date_modified = ?' , [ oldestUnsyncedDateModified ] ) ;
await syncEntities ( notesTree , 'notes_tree' , cookieJar , syncLog ) ;
const notes = await sql . getResults ( 'SELECT * FROM notes WHERE date_modified = ?' , [ oldestUnsyncedDateModified ] ) ;
await syncEntities ( notes , 'notes' , cookieJar , syncLog ) ;
const notesHistory = await sql . getResults ( 'SELECT * FROM notes_history WHERE date_modified_to = ?' , [ oldestUnsyncedDateModified ] ) ;
await syncEntities ( notesHistory , 'notes_history' , cookieJar , syncLog ) ;
lastSyncedPush = oldestUnsyncedDateModified ;
2017-10-27 09:16:21 +08:00
2017-10-31 06:44:26 +08:00
// if the sync started in the same second as the last changes then it's possible we synced only parts
// of this second's changes. In that case we'll leave the last_synced_push as it is and stop the sync
// so next time we'll re-do this second again, this guaranteeing all changes have been pushed
if ( lastSyncedPush === syncStarted ) {
return ;
}
2017-10-30 10:22:30 +08:00
await sql . setOption ( 'last_synced_push' , lastSyncedPush ) ;
} ) ;
}
2017-10-27 08:31:31 +08:00
}
2017-10-26 10:39:21 +08:00
2017-10-29 10:17:00 +08:00
async function login ( ) {
const timestamp = utils . nowTimestamp ( ) ;
2017-10-29 23:22:41 +08:00
const documentSecret = await sql . getOption ( 'document_secret' ) ;
2017-10-30 02:55:48 +08:00
const hash = utils . hmac ( documentSecret , timestamp ) ;
2017-10-29 10:17:00 +08:00
const cookieJar = rp . jar ( ) ;
2017-10-30 02:55:48 +08:00
try {
await rp ( {
method : 'POST' ,
uri : SYNC _SERVER + '/api/login' ,
body : {
timestamp : timestamp ,
dbVersion : migration . APP _DB _VERSION ,
hash : hash
} ,
json : true ,
timeout : 5 * 1000 ,
jar : cookieJar
} ) ;
return cookieJar ;
}
catch ( e ) {
throw new Error ( "Can't login to API for sync, inner exception: " + e . stack ) ;
}
2017-10-29 10:17:00 +08:00
}
2017-10-27 08:31:31 +08:00
async function sync ( ) {
2017-10-30 02:55:48 +08:00
const syncLog = [ ] ;
2017-10-30 04:14:59 +08:00
if ( syncInProgress ) {
syncLog . push ( "Sync already in progress" ) ;
return syncLog ;
}
2017-10-26 10:39:21 +08:00
2017-10-27 08:31:31 +08:00
syncInProgress = true ;
2017-10-27 07:22:21 +08:00
2017-10-27 08:31:31 +08:00
try {
if ( ! await migration . isDbUpToDate ( ) ) {
2017-10-30 02:55:48 +08:00
syncLog . push ( "DB not up to date" ) ;
return syncLog ;
2017-10-26 10:39:21 +08:00
}
2017-10-29 10:17:00 +08:00
const cookieJar = await login ( ) ;
2017-10-29 23:22:41 +08:00
await pushSync ( cookieJar , syncLog ) ;
2017-10-27 08:31:31 +08:00
2017-10-31 06:44:26 +08:00
await pullSync ( cookieJar , syncLog ) ;
2017-10-26 10:39:21 +08:00
}
catch ( e ) {
2017-10-29 23:22:41 +08:00
logSync ( "sync failed: " + e . stack , syncLog ) ;
2017-10-26 10:39:21 +08:00
}
finally {
syncInProgress = false ;
}
2017-10-29 23:22:41 +08:00
return syncLog ;
}
function logSync ( message , syncLog ) {
log . info ( message ) ;
2017-10-30 02:55:48 +08:00
if ( syncLog ) {
2017-10-29 23:22:41 +08:00
syncLog . push ( message ) ;
}
2017-10-30 02:55:48 +08:00
console . log ( message ) ;
2017-10-23 08:22:09 +08:00
}
2017-10-27 09:16:21 +08:00
async function getChangedSince ( since ) {
return {
'syncTimestamp' : utils . nowTimestamp ( ) ,
'tree' : await sql . getResults ( "select * from notes_tree where date_modified >= ?" , [ since ] ) ,
'notes' : await sql . getFlattenedResults ( 'note_id' , "select note_id from notes where date_modified >= ?" , [ since ] ) ,
2017-10-30 02:55:48 +08:00
'audit_log' : await sql . getResults ( "select * from audit_log where category != 'SYNC' and date_modified >= ?" , [ since ] )
2017-10-27 09:16:21 +08:00
} ;
}
async function getNoteSince ( noteId , since ) {
return {
'detail' : await sql . getSingleResult ( "select * from notes where note_id = ?" , [ noteId ] ) ,
'images' : await sql . getResults ( "select * from images where note_id = ? order by note_offset" , [ noteId ] ) ,
'history' : await sql . getResults ( "select * from notes_history where note_id = ? and date_modified_to >= ?" , [ noteId , since ] )
} ;
}
2017-10-29 23:22:41 +08:00
async function putChanged ( changed , syncLog ) {
2017-10-27 09:16:21 +08:00
for ( const treeItem of changed . tree ) {
delete treeItem [ 'id' ] ;
await sql . insert ( "notes_tree" , treeItem , true ) ;
2017-10-29 23:22:41 +08:00
logSync ( "Update/sync notes_tree " + treeItem . note _id , syncLog ) ;
2017-10-27 09:16:21 +08:00
}
for ( const audit of changed . audit _log ) {
await sql . insert ( "audit_log" , audit , true ) ;
2017-10-30 02:55:48 +08:00
logSync ( "Update/sync audit_log for category=" + audit . category + ", noteId=" + audit . note _id , syncLog ) ;
2017-10-27 09:16:21 +08:00
}
2017-10-27 11:21:31 +08:00
if ( changed . tree . length > 0 || changed . audit _log . length > 0 ) {
2017-10-29 23:22:41 +08:00
logSync ( "Added final audit" , syncLog ) ;
2017-10-27 11:21:31 +08:00
await sql . addAudit ( audit _category . SYNC ) ;
}
2017-10-27 09:16:21 +08:00
}
2017-10-30 10:22:30 +08:00
async function updateNote ( body , syncLog ) {
const entity = body . entity ;
2017-10-29 07:55:55 +08:00
2017-10-30 10:22:30 +08:00
const origNote = await sql . getSingleResult ( "select * from notes where note_id = ?" , [ entity . note _id ] ) ;
2017-10-27 09:16:21 +08:00
2017-10-30 10:22:30 +08:00
if ( origNote === null || origNote . date _modified <= entity . date _modified ) {
await sql . doInTransaction ( async ( ) => {
await sql . replace ( "notes" , entity ) ;
2017-10-27 09:16:21 +08:00
2017-10-30 10:22:30 +08:00
await sql . remove ( "links" , entity . note _id ) ;
2017-10-27 09:16:21 +08:00
2017-10-30 10:22:30 +08:00
for ( const link of body . links ) {
delete link [ 'lnk_id' ] ;
2017-10-27 09:16:21 +08:00
2017-10-30 10:22:30 +08:00
await sql . insert ( 'link' , link ) ;
}
} ) ;
2017-10-27 11:21:31 +08:00
2017-10-30 10:22:30 +08:00
logSync ( "Update/sync note " + entity . note _id , syncLog ) ;
2017-10-30 02:55:48 +08:00
}
2017-10-30 10:22:30 +08:00
else {
logSync ( "Sync conflict in note " + entity . note _id , syncLog ) ;
}
}
async function updateNoteTree ( body , syncLog ) {
const entity = body . entity ;
const orig = await sql . getSingleResultOrNull ( "select * from notes_tree where note_id = ?" , [ entity . note _id ] ) ;
if ( orig === null || orig . date _modified < entity . date _modified ) {
await sql . replace ( 'notes_tree' , entity ) ;
logSync ( "Update/sync note tree " + entity . note _id , syncLog ) ;
}
else {
logSync ( "Sync conflict in note tree " + entity . note _id , syncLog ) ;
}
}
async function updateNoteHistory ( body , syncLog ) {
const entity = body . entity ;
const orig = await sql . getSingleResultOrNull ( "select * from notes_history where note_id = ? and date_modified_from = ?" , [ entity . note _id , entity . date _modified _from ] ) ;
if ( orig === null || orig . date _modified _to < entity . date _modified _to ) {
await sql . execute ( "delete from notes_history where note_id = ? and date_modified_from = ?" , [ entity . note _id , entity . date _modified _from ] ) ;
delete entity [ 'id' ] ;
await sql . insert ( 'notes_history' , entity ) ;
logSync ( "Update/sync note history " + entity . note _id , syncLog ) ;
}
else {
logSync ( "Sync conflict in note history for " + entity . note _id + ", from=" + entity . date _modified _from + ", to=" + entity . date _modified _to , syncLog ) ;
2017-10-30 02:55:48 +08:00
}
2017-10-27 09:16:21 +08:00
}
2017-10-30 04:14:59 +08:00
if ( SYNC _SERVER ) {
2017-10-27 11:21:31 +08:00
log . info ( "Setting up sync" ) ;
2017-10-26 10:39:21 +08:00
2017-10-27 11:21:31 +08:00
setInterval ( sync , 60000 ) ;
// kickoff initial sync immediately
setTimeout ( sync , 1000 ) ;
}
else {
log . info ( "Sync server not configured, sync timer not running." )
}
2017-10-27 09:16:21 +08:00
module . exports = {
2017-10-29 23:22:41 +08:00
sync ,
2017-10-27 09:16:21 +08:00
getChangedSince ,
getNoteSince ,
putChanged ,
2017-10-30 10:22:30 +08:00
updateNote ,
updateNoteTree ,
updateNoteHistory
2017-10-27 09:16:21 +08:00
} ;