fix incorrect processing of sync rows, closes #1019

This commit is contained in:
zadam 2020-05-14 13:08:06 +02:00
parent 37da0adb8a
commit 108afe8896
2 changed files with 54 additions and 46 deletions

View file

@ -4,7 +4,7 @@ export default class LoadResults {
this.noteIdToSourceId = {}; this.noteIdToSourceId = {};
this.sourceIdToNoteIds = {}; this.sourceIdToNoteIds = {};
this.branches = []; this.branches = [];
this.attributes = []; this.attributes = [];
@ -103,10 +103,10 @@ export default class LoadResults {
/** /**
* @return {boolean} true if there are changes which could affect the attributes (including inherited ones) * @return {boolean} true if there are changes which could affect the attributes (including inherited ones)
* notably changes in note itself should not have any effect on attributes
*/ */
hasAttributeRelatedChanges() { hasAttributeRelatedChanges() {
return Object.keys(this.noteIdToSourceId).length === 0 return this.branches.length === 0
&& this.branches.length === 0
&& this.attributes.length === 0; && this.attributes.length === 0;
} }
@ -119,4 +119,4 @@ export default class LoadResults {
&& this.contentNoteIdToSourceId.length === 0 && this.contentNoteIdToSourceId.length === 0
&& this.options.length === 0; && this.options.length === 0;
} }
} }

View file

@ -39,7 +39,18 @@ let consumeQueuePromise = null;
// most sync events are sent twice - once immediatelly after finishing the transaction and once during the scheduled ping // most sync events are sent twice - once immediatelly after finishing the transaction and once during the scheduled ping
// but we want to process only once // but we want to process only once
const receivedSyncIds = new Set(); const processedSyncIds = new Set();
function logRows(syncRows) {
const filteredRows = syncRows.filter(row =>
!processedSyncIds.has(row.id)
&& row.entityName !== 'recent_notes'
&& (row.entityName !== 'options' || row.entityId !== 'openTabs'));
if (filteredRows.length > 0) {
console.debug(utils.now(), "Sync data: ", filteredRows);
}
}
async function handleMessage(event) { async function handleMessage(event) {
const message = JSON.parse(event.data); const message = JSON.parse(event.data);
@ -55,20 +66,9 @@ async function handleMessage(event) {
$outstandingSyncsCount.html(message.outstandingSyncs); $outstandingSyncsCount.html(message.outstandingSyncs);
if (syncRows.length > 0) { if (syncRows.length > 0) {
const filteredRows = syncRows.filter(row => logRows(syncRows);
!receivedSyncIds.has(row.id)
&& row.entityName !== 'recent_notes'
&& (row.entityName !== 'options' || row.entityId !== 'openTabs'));
if (filteredRows.length > 0) { syncDataQueue.push(...syncRows);
console.debug(utils.now(), "Sync data: ", filteredRows);
}
for (const row of filteredRows) {
receivedSyncIds.add(row.id);
}
syncDataQueue.push(...filteredRows);
// we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates // we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates
lastAcceptedSyncId = Math.max(lastAcceptedSyncId, syncRows[syncRows.length - 1].id); lastAcceptedSyncId = Math.max(lastAcceptedSyncId, syncRows[syncRows.length - 1].id);
@ -142,13 +142,21 @@ async function runSafely(syncHandler, syncData) {
} }
} }
/**
* TODO: we should rethink the fact that each sync row is sent twice (once at the end of transaction, once periodically)
* and we keep both lastProcessedSyncId and processedSyncIds
* it even seems incorrect that when transaction sync rows are received, we incorrectly increase lastProcessedSyncId
* and then some syncs might lost (or are *all* sync rows sent from transactions?)
*/
async function consumeSyncData() { async function consumeSyncData() {
if (syncDataQueue.length > 0) { if (syncDataQueue.length > 0) {
const allSyncData = syncDataQueue; const allSyncRows = syncDataQueue;
syncDataQueue = []; syncDataQueue = [];
const nonProcessedSyncRows = allSyncRows.filter(sync => !processedSyncIds.has(sync.id));
try { try {
await processSyncRows(allSyncData); await processSyncRows(nonProcessedSyncRows);
} }
catch (e) { catch (e) {
logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`); logError(`Encountered error ${e.message}: ${e.stack}, reloading frontend.`);
@ -157,7 +165,7 @@ async function consumeSyncData() {
utils.reloadApp(); utils.reloadApp();
} }
lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncData[allSyncData.length - 1].id); lastProcessedSyncId = Math.max(lastProcessedSyncId, allSyncRows[allSyncRows.length - 1].id);
} }
checkSyncIdListeners(); checkSyncIdListeners();
@ -221,18 +229,18 @@ subscribeToMessages(message => {
async function processSyncRows(syncRows) { async function processSyncRows(syncRows) {
const missingNoteIds = []; const missingNoteIds = [];
syncRows.forEach(({entityName, entity}) => { for (const {id, entityName, entity} of syncRows) {
if (entityName === 'branches' && !(entity.parentNoteId in treeCache.notes)) { if (entityName === 'branches' && !(entity.parentNoteId in treeCache.notes)) {
missingNoteIds.push(entity.parentNoteId); missingNoteIds.push(entity.parentNoteId);
} }
else if (entityName === 'attributes' else if (entityName === 'attributes'
&& entity.type === 'relation' && entity.type === 'relation'
&& entity.name === 'template' && entity.name === 'template'
&& !(entity.noteId in treeCache.notes)) { && !(entity.noteId in treeCache.notes)) {
missingNoteIds.push(entity.value); missingNoteIds.push(entity.value);
} }
}); }
if (missingNoteIds.length > 0) { if (missingNoteIds.length > 0) {
await treeCache.reloadNotes(missingNoteIds); await treeCache.reloadNotes(missingNoteIds);
@ -240,16 +248,16 @@ async function processSyncRows(syncRows) {
const loadResults = new LoadResults(treeCache); const loadResults = new LoadResults(treeCache);
syncRows.filter(sync => sync.entityName === 'notes').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'notes')) {
const note = treeCache.notes[sync.entityId]; const note = treeCache.notes[sync.entityId];
if (note) { if (note) {
note.update(sync.entity); note.update(sync.entity);
loadResults.addNote(sync.entityId, sync.sourceId); loadResults.addNote(sync.entityId, sync.sourceId);
} }
}); }
syncRows.filter(sync => sync.entityName === 'branches').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'branches')) {
let branch = treeCache.branches[sync.entityId]; let branch = treeCache.branches[sync.entityId];
const childNote = treeCache.notes[sync.entity.noteId]; const childNote = treeCache.notes[sync.entity.noteId];
const parentNote = treeCache.notes[sync.entity.parentNoteId]; const parentNote = treeCache.notes[sync.entity.parentNoteId];
@ -295,9 +303,9 @@ async function processSyncRows(syncRows) {
} }
} }
} }
}); }
syncRows.filter(sync => sync.entityName === 'note_reordering').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'note_reordering')) {
for (const branchId in sync.positions) { for (const branchId in sync.positions) {
const branch = treeCache.branches[branchId]; const branch = treeCache.branches[branchId];
@ -307,10 +315,10 @@ async function processSyncRows(syncRows) {
} }
loadResults.addNoteReordering(sync.entityId, sync.sourceId); loadResults.addNoteReordering(sync.entityId, sync.sourceId);
}); }
// missing reloading the relation target note // missing reloading the relation target note
syncRows.filter(sync => sync.entityName === 'attributes').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'attributes')) {
let attribute = treeCache.attributes[sync.entityId]; let attribute = treeCache.attributes[sync.entityId];
const sourceNote = treeCache.notes[sync.entity.noteId]; const sourceNote = treeCache.notes[sync.entity.noteId];
const targetNote = sync.entity.type === 'relation' && treeCache.notes[sync.entity.value]; const targetNote = sync.entity.type === 'relation' && treeCache.notes[sync.entity.value];
@ -346,27 +354,27 @@ async function processSyncRows(syncRows) {
} }
} }
} }
}); }
syncRows.filter(sync => sync.entityName === 'note_contents').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'note_contents')) {
delete treeCache.noteComplementPromises[sync.entityId]; delete treeCache.noteComplementPromises[sync.entityId];
loadResults.addNoteContent(sync.entityId, sync.sourceId); loadResults.addNoteContent(sync.entityId, sync.sourceId);
}); }
syncRows.filter(sync => sync.entityName === 'note_revisions').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'note_revisions')) {
loadResults.addNoteRevision(sync.entityId, sync.noteId, sync.sourceId); loadResults.addNoteRevision(sync.entityId, sync.noteId, sync.sourceId);
}); }
syncRows.filter(sync => sync.entityName === 'options').forEach(sync => { for (const sync of syncRows.filter(sync => sync.entityName === 'options')) {
if (sync.entity.name === 'openTabs') { if (sync.entity.name === 'openTabs') {
return; // only noise continue; // only noise
} }
options.set(sync.entity.name, sync.entity.value); options.set(sync.entity.name, sync.entity.value);
loadResults.addOption(sync.entity.name); loadResults.addOption(sync.entity.name);
}); }
if (!loadResults.isEmpty()) { if (!loadResults.isEmpty()) {
if (loadResults.hasAttributeRelatedChanges()) { if (loadResults.hasAttributeRelatedChanges()) {