From f117ae491574fb820e37cd00d6f72aec82341346 Mon Sep 17 00:00:00 2001 From: zadam Date: Sun, 21 Jun 2020 13:44:47 +0200 Subject: [PATCH] fix sending sync rows via WebSocket after transaction is committed --- src/services/cls.js | 10 +++++++--- src/services/sql.js | 20 +++----------------- src/services/ws.js | 17 ++++++----------- 3 files changed, 16 insertions(+), 31 deletions(-) diff --git a/src/services/cls.js b/src/services/cls.js index ba026dfae..32af8ff6c 100644 --- a/src/services/cls.js +++ b/src/services/cls.js @@ -33,8 +33,12 @@ function isEntityEventsDisabled() { return !!namespace.get('disableEntityEvents'); } -function getSyncRows() { - return namespace.get('syncRows') || []; +function getAndClearSyncRows() { + const syncRows = namespace.get('syncRows') || []; + + namespace.set('syncRows', []); + + return syncRows; } function addSyncRow(syncRow) { @@ -68,7 +72,7 @@ module.exports = { disableEntityEvents, isEntityEventsDisabled, reset, - getSyncRows, + getAndClearSyncRows, addSyncRow, getEntityFromCache, setEntityToCache diff --git a/src/services/sql.js b/src/services/sql.js index 1b997ab7c..7529890f0 100644 --- a/src/services/sql.js +++ b/src/services/sql.js @@ -1,7 +1,6 @@ "use strict"; const log = require('./log'); -const cls = require('./cls'); const Database = require('better-sqlite3'); const dataDir = require('./data_dir'); @@ -74,21 +73,6 @@ function stmt(sql) { return statementCache[sql]; } -function beginTransaction() { - // DEFERRED means that the transaction does not actually start until the database is first accessed. - // Internally, the BEGIN DEFERRED statement merely sets a flag on the database connection that turns off - // the automatic commit that would normally occur when the last statement finishes. - return stmt("BEGIN DEFERRED").run(); -} - -function commit() { - return stmt("COMMIT").run(); -} - -function rollback() { - return stmt("ROLLBACK").run(); -} - function getRow(query, params = []) { return wrap(query, s => s.get(params)); } @@ -213,7 +197,9 @@ function wrap(query, func) { function transactional(func) { const ret = dbConnection.transaction(func).deferred(); - require('./ws.js').sendPingToAllClients(); + if (!dbConnection.inTransaction) { // i.e. transaction was really committed (and not just savepoint released) + require('./ws.js').sendTransactionSyncsToAllClients(); + } return ret; } diff --git a/src/services/ws.js b/src/services/ws.js index 1e1d5a446..83a22a319 100644 --- a/src/services/ws.js +++ b/src/services/ws.js @@ -7,7 +7,6 @@ const syncMutexService = require('./sync_mutex'); const protectedSessionService = require('./protected_session'); let webSocketServer; -let lastAcceptedSyncIds = {}; function init(httpServer, sessionParser) { webSocketServer = new WebSocket.Server({ @@ -28,8 +27,6 @@ function init(httpServer, sessionParser) { webSocketServer.on('connection', (ws, req) => { ws.id = utils.randomString(10); - lastAcceptedSyncIds[ws.id] = 0; - console.log(`websocket client connected`); ws.on('message', async messageJson => { @@ -39,8 +36,6 @@ function init(httpServer, sessionParser) { log.info('JS Error: ' + message.error + '\r\nStack: ' + message.stack); } else if (message.type === 'ping') { - lastAcceptedSyncIds[ws.id] = message.lastSyncId; - await syncMutexService.doExclusively(() => sendPing(ws)); } else { @@ -97,9 +92,7 @@ function fillInAdditionalProperties(sync) { } } -function sendPing(client) { - const syncRows = cls.getSyncRows(); - +function sendPing(client, syncRows = []) { for (const sync of syncRows) { try { fillInAdditionalProperties(sync); @@ -119,10 +112,12 @@ function sendPing(client) { }); } -function sendPingToAllClients() { +function sendTransactionSyncsToAllClients() { if (webSocketServer) { + const syncRows = cls.getAndClearSyncRows(); + webSocketServer.clients.forEach(function each(client) { - sendPing(client); + sendPing(client, syncRows); }); } } @@ -140,5 +135,5 @@ module.exports = { sendMessageToAllClients, syncPullInProgress, syncPullFinished, - sendPingToAllClients + sendTransactionSyncsToAllClients };