sync fixes

This commit is contained in:
zadam 2022-01-09 21:25:15 +01:00
parent 96f4230bc1
commit 2d2641dbd7
12 changed files with 90 additions and 64 deletions

View file

@ -0,0 +1,24 @@
CREATE TABLE IF NOT EXISTS "mig_entity_changes" (
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
`entityName` TEXT NOT NULL,
`entityId` TEXT NOT NULL,
`hash` TEXT NOT NULL,
`isErased` INT NOT NULL,
`changeId` TEXT NOT NULL,
`componentId` TEXT NOT NULL,
`instanceId` TEXT NOT NULL,
`isSynced` INTEGER NOT NULL,
`utcDateChanged` TEXT NOT NULL
);
INSERT INTO mig_entity_changes (id, entityName, entityId, hash, isErased, changeId, componentId, instanceId, isSynced, utcDateChanged)
SELECT id, entityName, entityId, hash, isErased, changeId, '', '', isSynced, utcDateChanged FROM entity_changes;
DROP TABLE entity_changes;
ALTER TABLE mig_entity_changes RENAME TO entity_changes;
CREATE UNIQUE INDEX `IDX_entityChanges_entityName_entityId` ON "entity_changes" (
`entityName`,
`entityId`
);

View file

@ -0,0 +1 @@
CREATE INDEX `IDX_entity_changes_changeId` ON `entity_changes` (`changeId`);

View file

@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS "entity_changes" (
`isErased` INT NOT NULL,
`changeId` TEXT NOT NULL,
`componentId` TEXT NOT NULL,
`memberId` TEXT NOT NULL,
`instanceId` TEXT NOT NULL,
`isSynced` INTEGER NOT NULL,
`utcDateChanged` TEXT NOT NULL
);
@ -98,6 +98,7 @@ CREATE INDEX `IDX_note_revisions_utcDateCreated` ON `note_revisions` (`utcDateCr
CREATE INDEX `IDX_note_revisions_utcDateLastEdited` ON `note_revisions` (`utcDateLastEdited`);
CREATE INDEX `IDX_note_revisions_dateCreated` ON `note_revisions` (`dateCreated`);
CREATE INDEX `IDX_note_revisions_dateLastEdited` ON `note_revisions` (`dateLastEdited`);
CREATE INDEX `IDX_entity_changes_changeId` ON `entity_changes` (`changeId`);
CREATE INDEX IDX_attributes_name_value
on attributes (name, value);
CREATE INDEX IDX_attributes_noteId_index

View file

@ -23,8 +23,6 @@ function SetupModel() {
this.syncProxy = ko.observable();
this.password = ko.observable();
this.instanceType = utils.isElectron() ? "desktop" : "server";
this.setupTypeSelected = () => !!this.setupType();
this.selectSetupType = () => {

View file

@ -3,7 +3,7 @@
const options = require('../../services/options');
const utils = require('../../services/utils');
const dateUtils = require('../../services/date_utils');
const memberId = require('../../services/member_id');
const instanceId = require('../../services/member_id');
const passwordEncryptionService = require('../../services/password_encryption');
const protectedSessionService = require('../../services/protected_session');
const appInfo = require('../../services/app_info');
@ -47,7 +47,7 @@ function loginSync(req) {
req.session.loggedIn = true;
return {
memberId: memberId,
instanceId: instanceId,
maxEntityChangeId: sql.getValue("SELECT COALESCE(MAX(id), 0) FROM entity_changes WHERE isSynced = 1")
};
}

View file

@ -124,7 +124,7 @@ function getChanged(req) {
const startTime = Date.now();
let lastEntityChangeId = parseInt(req.query.lastEntityChangeId);
const clientMemberId = req.query.memberId;
const clientinstanceId = req.query.instanceId;
let filteredEntityChanges = [];
while (filteredEntityChanges.length === 0) {
@ -140,19 +140,28 @@ function getChanged(req) {
break;
}
filteredEntityChanges = entityChanges.filter(ec => ec.memberId !== clientMemberId);
filteredEntityChanges = entityChanges.filter(ec => ec.instanceId !== clientinstanceId);
if (filteredEntityChanges.length === 0) {
lastEntityChangeId = entityChanges[entityChanges.length - 1].id;
}
}
const entityChangesRecords = syncService.getEntityChangesRecords(filteredEntityChanges);
const entityChangeRecords = syncService.getEntityChangeRecords(filteredEntityChanges);
if (entityChangesRecords.length > 0) {
lastEntityChangeId = entityChangesRecords[entityChangesRecords.length - 1].entityChange.id;
if (entityChangeRecords.length > 0) {
lastEntityChangeId = entityChangeRecords[entityChangeRecords.length - 1].entityChange.id;
}
const ret = {
entityChanges: entityChangesRecords,
maxEntityChangeId: sql.getValue('SELECT COALESCE(MAX(id), 0) FROM entity_changes WHERE isSynced = 1'),
lastEntityChangeId
entityChanges: entityChangeRecords,
lastEntityChangeId,
outstandingPullCount: sql.getValue(`
SELECT COUNT(id)
FROM entity_changes
WHERE isSynced = 1
AND instanceId != ?
AND id > ?`, [clientinstanceId, lastEntityChangeId])
};
if (ret.entityChanges.length > 0) {
@ -197,10 +206,10 @@ function update(req) {
}
}
const {entities, memberId} = body;
const {entities, instanceId} = body;
for (const {entityChange, entity} of entities) {
syncUpdateService.updateEntity(entityChange, entity, memberId);
syncUpdateService.updateEntity(entityChange, entity, instanceId);
}
}

View file

@ -4,7 +4,7 @@ const build = require('./build');
const packageJson = require('../../package');
const {TRILIUM_DATA_DIR} = require('./data_dir');
const APP_DB_VERSION = 191;
const APP_DB_VERSION = 192;
const SYNC_VERSION = 25;
const CLIPPER_PROTOCOL_VERSION = "1.0";

View file

@ -3,13 +3,13 @@ const dateUtils = require('./date_utils');
const log = require('./log');
const cls = require('./cls');
const utils = require('./utils');
const memberId = require('./member_id');
const instanceId = require('./member_id');
const becca = require("../becca/becca");
let maxEntityChangeId = 0;
function addEntityChangeWithMemberId(origEntityChange, memberId) {
const ec = {...origEntityChange, memberId};
function addEntityChangeWithinstanceId(origEntityChange, instanceId) {
const ec = {...origEntityChange, instanceId};
return addEntityChange(ec);
}
@ -24,7 +24,7 @@ function addEntityChange(origEntityChange) {
}
ec.componentId = ec.componentId || cls.getComponentId() || "";
ec.memberId = ec.memberId || memberId;
ec.instanceId = ec.instanceId || instanceId;
ec.isSynced = ec.isSynced ? 1 : 0;
ec.isErased = ec.isErased ? 1 : 0;
ec.id = sql.replace("entity_changes", ec);
@ -43,7 +43,7 @@ function addNoteReorderingEntityChange(parentNoteId, componentId) {
utcDateChanged: dateUtils.utcNowDateTime(),
isSynced: true,
componentId,
memberId: memberId
instanceId: instanceId
});
const eventService = require('./events');
@ -146,7 +146,7 @@ module.exports = {
addNoteReorderingEntityChange,
moveEntityChangeToTop,
addEntityChange,
addEntityChangeWithMemberId,
addEntityChangeWithinstanceId,
fillAllEntityChanges,
addEntityChangesForSector,
getMaxEntityChangeId: () => maxEntityChangeId

View file

@ -1,5 +1,5 @@
const utils = require('./utils');
const memberId = utils.randomString(12);
const instanceId = utils.randomString(12);
module.exports = memberId;
module.exports = instanceId;

View file

@ -4,7 +4,7 @@ const log = require('./log');
const sql = require('./sql');
const optionService = require('./options');
const utils = require('./utils');
const memberId = require('./member_id');
const instanceId = require('./member_id');
const dateUtils = require('./date_utils');
const syncUpdateService = require('./sync_update');
const contentHashService = require('./content_hash');
@ -107,11 +107,11 @@ async function doLogin() {
hash: hash
});
if (resp.memberId === memberId) {
throw new Error(`Sync server has member ID ${resp.memberId} which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`);
if (resp.instanceId === instanceId) {
throw new Error(`Sync server has member ID ${resp.instanceId} which is also local. This usually happens when the sync client is (mis)configured to sync with itself (URL points back to client) instead of the correct sync server.`);
}
syncContext.memberId = resp.memberId;
syncContext.instanceId = resp.instanceId;
const lastSyncedPull = getLastSyncedPull();
@ -131,22 +131,21 @@ async function pullChanges(syncContext) {
while (true) {
const lastSyncedPull = getLastSyncedPull();
const changesUri = `/api/sync/changed?memberId=${memberId}&lastEntityChangeId=${lastSyncedPull}`;
const changesUri = `/api/sync/changed?instanceId=${instanceId}&lastEntityChangeId=${lastSyncedPull}`;
const startDate = Date.now();
const resp = await syncRequest(syncContext, 'GET', changesUri);
const {entityChanges, lastEntityChangeId} = resp;
outstandingPullCount = resp.outstandingPullCount;
const pulledDate = Date.now();
outstandingPullCount = Math.max(0, resp.maxEntityChangeId - lastSyncedPull);
const {entityChanges, lastEntityChangeId} = resp;
sql.transactional(() => {
for (const {entityChange, entity} of entityChanges) {
const changeAppliedAlready = !entityChange.changeId
|| !!sql.getValue("SELECT id FROM entity_changes WHERE changeId = ?", [entityChange.changeId]);
const changeAppliedAlready = entityChange.changeId
&& !!sql.getValue("SELECT id FROM entity_changes WHERE changeId = ?", [entityChange.changeId]);
if (!changeAppliedAlready) {
if (!atLeastOnePullApplied) { // send only for first
@ -155,10 +154,8 @@ async function pullChanges(syncContext) {
atLeastOnePullApplied = true;
}
syncUpdateService.updateEntity(entityChange, entity, syncContext.memberId);
syncUpdateService.updateEntity(entityChange, entity, syncContext.instanceId);
}
outstandingPullCount = Math.max(0, resp.maxEntityChangeId - entityChange.id);
}
if (lastSyncedPull !== lastEntityChangeId) {
@ -191,7 +188,7 @@ async function pushChanges(syncContext) {
}
const filteredEntityChanges = entityChanges.filter(entityChange => {
if (entityChange.memberId === syncContext.memberId) {
if (entityChange.instanceId === syncContext.instanceId) {
// this may set lastSyncedPush beyond what's actually sent (because of size limit)
// so this is applied to the database only if there's no actual update
lastSyncedPush = entityChange.id;
@ -211,12 +208,12 @@ async function pushChanges(syncContext) {
continue;
}
const entityChangesRecords = getEntityChangesRecords(filteredEntityChanges);
const entityChangesRecords = getEntityChangeRecords(filteredEntityChanges);
const startDate = new Date();
await syncRequest(syncContext, 'PUT', '/api/sync/update', {
entities: entityChangesRecords,
memberId
instanceId
});
ws.syncPushInProgress();
@ -253,6 +250,11 @@ async function checkContentHash(syncContext) {
const failedChecks = contentHashService.checkContentHashes(resp.entityHashes);
process.exit(0);
throw new Error("AAAA");
return;
if (failedChecks.length > 0) {
// before requeuing sectors make sure the entity changes are correct
const consistencyChecks = require("./consistency_checks");
@ -332,7 +334,7 @@ function getEntityChangeRow(entityName, entityId) {
}
}
function getEntityChangesRecords(entityChanges) {
function getEntityChangeRecords(entityChanges) {
const records = [];
let length = 0;
@ -345,13 +347,6 @@ function getEntityChangesRecords(entityChanges) {
const entity = getEntityChangeRow(entityChange.entityName, entityChange.entityId);
if (entityChange.entityName === 'options' && !entity.isSynced) {
// if non-synced entities should count towards "lastSyncedPush"
records.push({entityChange});
continue;
}
const record = { entityChange, entity };
records.push(record);
@ -423,7 +418,7 @@ require("../becca/becca_loader").beccaLoaded.then(() => {
module.exports = {
sync,
login,
getEntityChangesRecords,
getEntityChangeRecords,
getOutstandingPullCount,
getMaxEntityChangeId
};

View file

@ -4,12 +4,12 @@ const entityChangesService = require('./entity_changes');
const eventService = require('./events');
const entityConstructor = require("../becca/entity_constructor");
function updateEntity(entityChange, entityRow, memberId) {
function updateEntity(entityChange, entityRow, instanceId) {
// can be undefined for options with isSynced=false
if (!entityRow) {
if (entityChange.isSynced) {
if (entityChange.isErased) {
eraseEntity(entityChange, memberId);
eraseEntity(entityChange, instanceId);
}
else {
log.info(`Encountered synced non-erased entity change without entity: ${JSON.stringify(entityChange)}`);
@ -23,8 +23,8 @@ function updateEntity(entityChange, entityRow, memberId) {
}
const updated = entityChange.entityName === 'note_reordering'
? updateNoteReordering(entityChange, entityRow, memberId)
: updateNormalEntity(entityChange, entityRow, memberId);
? updateNoteReordering(entityChange, entityRow, instanceId)
: updateNormalEntity(entityChange, entityRow, instanceId);
if (updated) {
if (entityRow.isDeleted) {
@ -42,7 +42,7 @@ function updateEntity(entityChange, entityRow, memberId) {
}
}
function updateNormalEntity(remoteEntityChange, entity, memberId) {
function updateNormalEntity(remoteEntityChange, entity, instanceId) {
const localEntityChange = sql.getRow(`
SELECT utcDateChanged, hash, isErased
FROM entity_changes
@ -54,7 +54,7 @@ function updateNormalEntity(remoteEntityChange, entity, memberId) {
sql.execute(`DELETE FROM ${remoteEntityChange.entityName} WHERE ${primaryKey} = ?`, remoteEntityChange.entityId);
entityChangesService.addEntityChangeWithMemberId(remoteEntityChange, memberId);
entityChangesService.addEntityChangeWithinstanceId(remoteEntityChange, instanceId);
});
return true;
@ -71,7 +71,7 @@ function updateNormalEntity(remoteEntityChange, entity, memberId) {
sql.transactional(() => {
sql.replace(remoteEntityChange.entityName, entity);
entityChangesService.addEntityChangeWithMemberId(remoteEntityChange, memberId);
entityChangesService.addEntityChangeWithinstanceId(remoteEntityChange, instanceId);
});
return true;
@ -80,13 +80,13 @@ function updateNormalEntity(remoteEntityChange, entity, memberId) {
return false;
}
function updateNoteReordering(entityChange, entity, memberId) {
function updateNoteReordering(entityChange, entity, instanceId) {
sql.transactional(() => {
for (const key in entity) {
sql.execute("UPDATE branches SET notePosition = ? WHERE branchId = ?", [entity[key], key]);
}
entityChangesService.addEntityChangeWithMemberId(entityChange, memberId);
entityChangesService.addEntityChangeWithinstanceId(entityChange, instanceId);
});
return true;
@ -105,7 +105,7 @@ function handleContent(content) {
return content;
}
function eraseEntity(entityChange, memberId) {
function eraseEntity(entityChange, instanceId) {
const {entityName, entityId} = entityChange;
if (!["notes", "note_contents", "branches", "attributes", "note_revisions", "note_revision_contents"].includes(entityName)) {
@ -119,7 +119,7 @@ function eraseEntity(entityChange, memberId) {
eventService.emit(eventService.ENTITY_DELETE_SYNCED, { entityName, entityId });
entityChangesService.addEntityChangeWithMemberId(entityChange, memberId);
entityChangesService.addEntityChangeWithinstanceId(entityChange, instanceId);
}
module.exports = {

View file

@ -135,9 +135,7 @@
<div class="alert alert-success">Sync has been correctly set up. It will take some time for the initial sync to finish. Once it's done, you'll be redirected to the login page.</div>
<div data-bind="if: instanceType == 'desktop'">
Outstanding sync items: <strong id="outstanding-syncs">N/A</strong>
</div>
<div>Outstanding sync items: <strong id="outstanding-syncs">N/A</strong></div>
</div>
</div>
</div>