From 12cbd383bf85a356d5203b177e88dfc2d274afde Mon Sep 17 00:00:00 2001 From: Juan Tejada Date: Mon, 6 Mar 2017 22:40:07 -0800 Subject: [PATCH] [client-app] (deltas P6) Split local and cloud deltas Summary: This commit splits apart the `AccountDeltaConnection` class, which was in charge of listening to both cloud /and/ local deltas by way of an artificial interface, `DeltaStreamingInMemoryConnection`. Splitting this into 2 modules with separate responsibilities will hopefully make this code easier to reason about and reduce some cruft and unnecessary indirection. Specifically, this commit makes it so: - `DeltaConnectionManager` is only in charge of starting and ending `DeltaStreamingConnection`s, which are solely in charge of listening to deltas from the cloud api - `LocalSyncDeltaEmitter` no longer unnecessarily emits events for the `deltas` package to listen to but rather directly processes and saves those deltas from the K2 db to edgehill.db - `LocalSyncDeltaEmitter` is also in charge of keeping track of the latest received cursor, under its own JSONBlob key in edgehill.db. This migrates localSync cursors saved under the old key. - `LocalSyncDeltaEmitter` is now instantiated and managed from within the `SyncProcessManager` as opposed to the `SyncWorker`. Apart from removing extra state from the `SyncWorker`, this removes dependencies on the client-app environment from the sync-worker. - `DeltaStreamingInMemoryConnection` and `AccountDeltaConnection` are now gone (Sorry for the big diff! This one was a little hard to split up without landing something broken) Depends on D4121 Test Plan: manual + unit tests planned in upcoming diff Reviewers: halla, mark, evan, spang Reviewed By: evan Differential Revision: https://phab.nylas.com/D4122 --- .../deltas/lib/account-delta-connection.es6 | 142 ------------------ .../deltas/lib/delta-connection-store.es6 | 32 ++-- .../deltas/lib/delta-streaming-connection.es6 | 73 +++++---- .../delta-streaming-in-memory-connection.es6 | 52 ------- packages/client-app/src/nylas-env.es6 | 2 +- .../local-sync-delta-emitter.es6 | 86 ++++++++--- .../local-sync-worker/sync-process-manager.js | 32 ++-- .../src/local-sync-worker/sync-worker.es6 | 3 +- 8 files changed, 141 insertions(+), 281 deletions(-) delete mode 100644 packages/client-app/internal_packages/deltas/lib/account-delta-connection.es6 delete mode 100644 packages/client-app/internal_packages/deltas/lib/delta-streaming-in-memory-connection.es6 diff --git a/packages/client-app/internal_packages/deltas/lib/account-delta-connection.es6 b/packages/client-app/internal_packages/deltas/lib/account-delta-connection.es6 deleted file mode 100644 index 89768e6c0..000000000 --- a/packages/client-app/internal_packages/deltas/lib/account-delta-connection.es6 +++ /dev/null @@ -1,142 +0,0 @@ -import _ from 'underscore'; -import { - Actions, - Account, - APIError, - N1CloudAPI, - DatabaseStore, - NylasLongConnection, -} from 'nylas-exports'; -import DeltaStreamingConnection from './delta-streaming-connection'; -import DeltaStreamingInMemoryConnection from './delta-streaming-in-memory-connection'; -import DeltaProcessor from './delta-processor' -import ContactRankingsCache from './contact-rankings-cache'; - -const MAX_RETRY_DELAY = 10 * 60 * 1000; // 15 minutes -const BASE_RETRY_DELAY = 1000; - -/** This manages the syncing of N1 assets. We create one - * AccountDeltaConnection per email account. We save the state of the - * AccountDeltaConnection in the database. - * - * The `state` takes the following schema: - * this._state = { - * "deltaCursors": { - * n1Cloud: 523, - * localSync: 1108, - * } - * "deltaStatus": { - * n1Cloud: "closed", - * localSync: "connecting", - * } - * } - * - * It can be null to indicate - */ -export default class AccountDeltaConnection { - - - constructor(account) { - // TODO This class is in the process of being ripped apart, and replaced by - // DeltaStreamingConnection, and will disappear in - // the next diff, but for the purposes of making this diff smaller, I - // haven't removed it yet. - this._n1CloudConn = new DeltaStreamingConnection(account) - - this._state = { deltaCursors: {}, deltaStatus: {} } - this.retryDelay = BASE_RETRY_DELAY; - this._writeStateDebounced = _.debounce(this._writeState, 100) - this._account = account; - this._unlisten = Actions.retryDeltaConnection.listen(() => this.refresh()); - this._deltaStreams = this._setupDeltaStreams(account); - this._refreshingCaches = [new ContactRankingsCache(account.id)]; - NylasEnv.onBeforeUnload = (readyToUnload) => { - this._writeState().finally(readyToUnload) - } - NylasEnv.localSyncEmitter.on("refreshLocalDeltas", (accountId) => { - if (accountId !== account.id) return; - this._deltaStreams.localSync.end() - this._deltaStreams.localSync.start() - }) - } - - loadStateFromDatabase() { - return DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`).then(json => { - if (!json) return; - this._state = json; - if (!this._state.deltaCursors) this._state.deltaCursors = {} - if (!this._state.deltaStatus) this._state.deltaStatus = {} - }); - } - - account() { - return this._account; - } - - refresh() { - this.cleanup(); - this._unlisten = Actions.retryDeltaConnection.listen(() => this.refresh()); - // Cleanup defaults to an "ENDED" socket. We need to indicate it's - // merely closed and can be re-opened again immediately. - _.map(this._deltaStreams, s => s.setStatus(NylasLongConnection.Status.Closed)) - return this.start(); - } - - start = () => { - try { - this._n1CloudConn.start() - this._refreshingCaches.map(c => c.start()); - _.map(this._deltaStreams, s => s.start()) - } catch (err) { - this._onError(err) - } - } - - cleanup() { - this._unlisten(); - _.map(this._deltaStreams, s => s.end()) - this._refreshingCaches.map(c => c.end()); - } - - _setupDeltaStreams = (account) => { - const localSync = new DeltaStreamingInMemoryConnection(account.id, this._deltaStreamOpts("localSync")); - return {localSync}; - } - - _deltaStreamOpts = (streamName) => { - return { - getCursor: () => this._state.deltaCursors[streamName], - setCursor: (val) => { - this._state.deltaCursors[streamName] = val; - this._writeStateDebounced(); - this.retryDelay = BASE_RETRY_DELAY; - }, - onDeltas: DeltaProcessor.process.bind(DeltaProcessor), - onStatusChanged: (status) => { - this._state.deltaStatus[streamName] = status; - this._writeStateDebounced(); - }, - onError: this._onError, - } - } - - _onError = (err = {}) => { - if (err instanceof APIError && err.statusCode === 401) { - Actions.updateAccount(this._account.id, { - syncState: Account.SYNC_STATE_AUTH_FAILED, - syncError: err.toJSON(), - }) - } - NylasEnv.reportError(`Error connecting to delta stream: ${err.message}`) - this.cleanup() - setTimeout(() => this.refresh(), this.retryDelay); - this.retryDelay = Math.min(MAX_RETRY_DELAY, this.retryDelay * 1.2) - return - } - - _writeState() { - return DatabaseStore.inTransaction(t => { - return t.persistJSONBlob(`NylasSyncWorker:${this._account.id}`, this._state); - }); - } -} diff --git a/packages/client-app/internal_packages/deltas/lib/delta-connection-store.es6 b/packages/client-app/internal_packages/deltas/lib/delta-connection-store.es6 index 290e4a8d1..07f4a6f8f 100644 --- a/packages/client-app/internal_packages/deltas/lib/delta-connection-store.es6 +++ b/packages/client-app/internal_packages/deltas/lib/delta-connection-store.es6 @@ -1,11 +1,11 @@ import _ from 'underscore'; import {AccountStore} from 'nylas-exports' -import AccountDeltaConnection from './account-delta-connection'; +import DeltaStreamingConnection from './delta-streaming-connection'; class DeltaConnectionStore { constructor() { - this._accountConnections = []; + this._connections = []; this._unsubscribe = () => {} } @@ -18,10 +18,6 @@ class DeltaConnectionStore { this._unsubscribe() } - _existingConnectionsForAccount(account) { - return _.find(this._accountConnections, c => c.account().id === account.id); - } - async _ensureConnections() { if (NylasEnv.inSpecMode()) { return; } @@ -33,25 +29,27 @@ class DeltaConnectionStore { this._isBuildingDeltaConnections = true; try { - const originalConnections = this._accountConnections; - const currentConnections = [] + const currentConnections = this._connections; + const nextConnections = [] for (const account of AccountStore.accounts()) { - const existingDeltaConnection = this._existingConnectionsForAccount(account) - if (existingDeltaConnection) { - currentConnections.push(existingDeltaConnection); + const existingConnection = ( + currentConnections + .find(conn => conn.account().id === account.id) + ) + if (existingConnection) { + nextConnections.push(existingConnection); continue } - const newDeltaConnection = new AccountDeltaConnection(account); - await newDeltaConnection.loadStateFromDatabase() - newDeltaConnection.start() - currentConnections.push(newDeltaConnection); + const newDeltaConnection = new DeltaStreamingConnection(account); + await newDeltaConnection.start() + nextConnections.push(newDeltaConnection); } - const oldDeltaConnections = _.difference(originalConnections, currentConnections); + const oldDeltaConnections = _.difference(currentConnections, nextConnections); for (const deltaConnection of oldDeltaConnections) { deltaConnection.end() } - this._accountConnections = currentConnections; + this._connections = nextConnections; } finally { this._isBuildingDeltaConnections = false; } diff --git a/packages/client-app/internal_packages/deltas/lib/delta-streaming-connection.es6 b/packages/client-app/internal_packages/deltas/lib/delta-streaming-connection.es6 index f98263920..92a6bccaf 100644 --- a/packages/client-app/internal_packages/deltas/lib/delta-streaming-connection.es6 +++ b/packages/client-app/internal_packages/deltas/lib/delta-streaming-connection.es6 @@ -18,7 +18,7 @@ const BASE_RETRY_DELAY = 1000; class DeltaStreamingConnection { constructor(account) { this._account = account - this._state = {cursor: null, status: null} + this._state = null this._longConnection = null this._writeStateDebounced = _.debounce(this._writeState, 100) this._unsubscribers = [] @@ -33,8 +33,15 @@ class DeltaStreamingConnection { } } - start() { + account() { + return this._account + } + + async start() { try { + if (!this._state) { + this._state = await this._loadState() + } const {cursor = 0} = this._state this._longConnection = new NylasLongConnection({ api: N1CloudAPI, @@ -70,28 +77,11 @@ class DeltaStreamingConnection { } end() { + this._state = null this._disposeListeners() this._longConnection.end() } - async loadStateFromDatabase() { - let json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`) - - if (!json) { - // Migrate from old storage key - const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`) - if (!oldState) { return; } - const {deltaCursors = {}, deltaStatus = {}} = oldState - json = { - cursor: deltaCursors.n1Cloud || null, - status: deltaStatus.n1Cloud || null, - } - } - - if (!json) { return } - this._state = json; - } - _setupListeners() { this._unsubscribers = [ Actions.retryDeltaConnection.listen(this.restart, this), @@ -104,17 +94,6 @@ class DeltaStreamingConnection { this._unsubscribers = [] } - _writeState() { - return DatabaseStore.inTransaction(t => - t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state) - ); - } - - _setCursor = (cursor) => { - this._state.cursor = cursor; - this._writeStateDebounced(); - } - _onOnlineStatusChanged = () => { if (OnlineStatusStore.isOnline()) { this.restart() @@ -175,6 +154,38 @@ class DeltaStreamingConnection { setTimeout(() => this.restart(), this._backoffScheduler.nextDelay()); } + + _setCursor = (cursor) => { + this._state.cursor = cursor; + this._writeStateDebounced(); + } + + async _loadState() { + const json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`) + if (json) { + return json + } + + // Migrate from old storage key + const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`) + if (!oldState) { + return {cursor: null, status: null}; + } + + const {deltaCursors = {}, deltaStatus = {}} = oldState + return { + cursor: deltaCursors.n1Cloud, + status: deltaStatus.n1Cloud, + } + } + + async _writeState() { + if (!this._state) { return } + await DatabaseStore.inTransaction(t => + t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state) + ); + } + } export default DeltaStreamingConnection diff --git a/packages/client-app/internal_packages/deltas/lib/delta-streaming-in-memory-connection.es6 b/packages/client-app/internal_packages/deltas/lib/delta-streaming-in-memory-connection.es6 deleted file mode 100644 index 9fd03e668..000000000 --- a/packages/client-app/internal_packages/deltas/lib/delta-streaming-in-memory-connection.es6 +++ /dev/null @@ -1,52 +0,0 @@ -/** - * This implements the same interface as the DeltaStreamingConnection - */ -class DeltaStreamingInMemoryConnection { - constructor(accountId, opts) { - this._accountId = accountId - this._getCursor = opts.getCursor - this._setCursor = opts.setCursor - this._onDeltas = opts.onDeltas - this._onStatusChanged = opts.onStatusChanged - this._status = "none" - } - - onDeltas = (allDeltas = []) => { - const deltas = allDeltas.filter((d) => d.accountId === this._accountId); - this._onDeltas(deltas, {source: "localSync"}); - const last = deltas[deltas.length - 1] - if (last) this._setCursor(last.cursor); - } - - get accountId() { - return this._accountId; - } - - get status() { - return this._status; - } - - setStatus(status) { - this._status = status - this._onStatusChanged(status) - } - - start() { - this._disp = NylasEnv.localSyncEmitter.on("localSyncDeltas", this.onDeltas); - NylasEnv.localSyncEmitter.emit("startDeltasFor", { - cursor: this._getCursor() || 0, - accountId: this._accountId, - }) - this.setStatus("connected") - } - - end() { - if (this._disp && this._disp.dispose) this._disp.dispose() - NylasEnv.localSyncEmitter.emit("endDeltasFor", { - accountId: this._accountId, - }) - this.setStatus("ended") - } -} - -export default DeltaStreamingInMemoryConnection diff --git a/packages/client-app/src/nylas-env.es6 b/packages/client-app/src/nylas-env.es6 index f02aa78e8..16987d2dd 100644 --- a/packages/client-app/src/nylas-env.es6 +++ b/packages/client-app/src/nylas-env.es6 @@ -226,7 +226,7 @@ export default class NylasEnvConstructor { this.timer = remote.getGlobal('application').timer; - this.localSyncEmitter = new Emitter(); + this.globalWindowEmitter = new Emitter(); if (!this.inSpecMode()) { this.actionBridge = new ActionBridge(ipcRenderer); diff --git a/packages/client-sync/src/local-sync-worker/local-sync-delta-emitter.es6 b/packages/client-sync/src/local-sync-worker/local-sync-delta-emitter.es6 index ffd4ec8c5..a0b7f7799 100644 --- a/packages/client-sync/src/local-sync-worker/local-sync-delta-emitter.es6 +++ b/packages/client-sync/src/local-sync-worker/local-sync-delta-emitter.es6 @@ -1,35 +1,73 @@ -const TransactionConnector = require('../shared/transaction-connector') -const {DeltaStreamBuilder} = require('isomorphic-core') +import _ from 'underscore' +import {DeltaStreamBuilder} from 'isomorphic-core' +import {DatabaseStore, DeltaProcessor} from 'nylas-exports' +import TransactionConnector from '../shared/transaction-connector' + export default class LocalSyncDeltaEmitter { - constructor(db, accountId) { + constructor(account, db) { this._db = db; - this._accountId = accountId; - NylasEnv.localSyncEmitter.on("startDeltasFor", this._startDeltasFor) - NylasEnv.localSyncEmitter.on("endDeltasFor", this._endDeltasFor) - /** - * The local-sync/sync-worker starts up asynchronously. We need to - * notify N1 client that there are more deltas it should be looking - * for. - */ - NylasEnv.localSyncEmitter.emit("refreshLocalDeltas", accountId) + this._state = null + this._account = account; + this._disposable = {dispose: () => {}} + this._writeStateDebounced = _.debounce(this._writeState, 100) } - _startDeltasFor = ({accountId, cursor}) => { - if (accountId !== this._accountId) return; - if (this._disp && this._disp.dispose) this._disp.dispose() - this._disp = DeltaStreamBuilder.buildDeltaObservable({ + async activate() { + if (this._disposable && this._disposable.dispose) { + this._disposable.dispose() + } + if (!this._state) { + this._state = await this._loadState() + } + const {cursor = 0} = this._state + this._disposable = DeltaStreamBuilder.buildDeltaObservable({ + cursor, db: this._db, - cursor: cursor, - accountId: accountId, - deltasSource: TransactionConnector.getObservableForAccountId(accountId), - }).subscribe((deltas) => { - NylasEnv.localSyncEmitter.emit("localSyncDeltas", deltas) + accountId: this._account.id, + deltasSource: TransactionConnector.getObservableForAccountId(this._account.id), + }) + .subscribe((deltas) => { + this._onDeltasReceived(deltas) }) } - _endDeltasFor = ({accountId}) => { - if (accountId !== this._accountId) return; - if (this._disp && this._disp.dispose) this._disp.dispose() + deactivate() { + this._state = null + if (this._disposable && this._disposable.dispose) { + this._disposable.dispose() + } + } + + _onDeltasReceived(deltas = []) { + const last = deltas[deltas.length - 1] + if (last) { + this._state.cursor = last.cursor; + this._writeStateDebounced(); + } + DeltaProcessor.process(deltas, {source: "localSync"}) + } + + async _loadState() { + const json = await DatabaseStore.findJSONBlob(`LocalSyncStatus:${this._account.id}`) + if (json) { + return json + } + + // Migrate from old storage key + const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`) + if (!oldState) { + return {} + } + + const {deltaCursors = {}} = oldState + return {cursor: deltaCursors.localSync} + } + + async _writeState() { + if (!this._state) { return } + await DatabaseStore.inTransaction(t => + t.persistJSONBlob(`LocalSyncStatus:${this._account.id}`, this._state) + ); } } diff --git a/packages/client-sync/src/local-sync-worker/sync-process-manager.js b/packages/client-sync/src/local-sync-worker/sync-process-manager.js index f934558c4..6a2e774f8 100644 --- a/packages/client-sync/src/local-sync-worker/sync-process-manager.js +++ b/packages/client-sync/src/local-sync-worker/sync-process-manager.js @@ -3,27 +3,28 @@ const fs = require('fs') const {remote} = require('electron') const {Actions, OnlineStatusStore} = require('nylas-exports') const SyncWorker = require('./sync-worker'); +const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default const LocalDatabaseConnector = require('../shared/local-database-connector') class SyncProcessManager { constructor() { - this._workers = {}; - this._accounts = [] this._exiting = false; this._resettingEmailCache = false + this._workers = {}; + this._localSyncDeltaEmitters = new Map() + OnlineStatusStore.listen(this._onOnlineStatusChanged, this) + Actions.resetEmailCache.listen(this._resetEmailCache, this) + Actions.debugSync.listen(this._onDebugSync, this) Actions.wakeLocalSyncWorkerForAccount.listen((accountId) => this.wakeWorkerForAccount(accountId, {interrupt: true}) ); - Actions.resetEmailCache.listen(this._resetEmailCache, this) - Actions.debugSync.listen(this._onDebugSync, this) - OnlineStatusStore.listen(this._onOnlineStatusChanged, this) } _onOnlineStatusChanged() { if (OnlineStatusStore.isOnline()) { - this._accounts.forEach(({id}) => { + Object.keys(this._workers).forEach((id) => { this.wakeWorkerForAccount(id, {reason: 'Came back online'}) }) } @@ -96,13 +97,16 @@ class SyncProcessManager { const logger = global.Logger.forAccount(account) try { - const db = await LocalDatabaseConnector.forAccount(account.id); if (this._workers[account.id]) { - logger.warn(`SyncProcessManager: Worker for account already exists`) + logger.warn(`SyncProcessManager.addWorkerForAccount: Worker for account already exists - skipping`) return } - this._accounts.push(account) + const db = await LocalDatabaseConnector.forAccount(account.id); this._workers[account.id] = new SyncWorker(account, db, this); + + const localSyncDeltaEmitter = new LocalSyncDeltaEmitter(account, db) + await localSyncDeltaEmitter.activate() + this._localSyncDeltaEmitters.set(account.id, localSyncDeltaEmitter) logger.log(`SyncProcessManager: Claiming Account Succeeded`) } catch (err) { logger.error(`SyncProcessManager: Claiming Account Failed`, err) @@ -114,9 +118,13 @@ class SyncProcessManager { await this._workers[accountId].cleanup(); this._workers[accountId] = null; } + + if (this._localSyncDeltaEmitters.has(accountId)) { + this._localSyncDeltaEmitters.get(accountId).deactivate(); + this._localSyncDeltaEmitters.delete(accountId) + } } } -window.syncProcessManager = new SyncProcessManager(); -window.dbs = window.syncProcessManager.dbs.bind(window.syncProcessManager) -module.exports = window.syncProcessManager +window.$n.SyncProcessManager = new SyncProcessManager(); +module.exports = window.$n.SyncProcessManager diff --git a/packages/client-sync/src/local-sync-worker/sync-worker.es6 b/packages/client-sync/src/local-sync-worker/sync-worker.es6 index 082af9846..c6ef70a00 100644 --- a/packages/client-sync/src/local-sync-worker/sync-worker.es6 +++ b/packages/client-sync/src/local-sync-worker/sync-worker.es6 @@ -19,7 +19,7 @@ import { import Interruptible from '../shared/interruptible' import SyncTaskFactory from './sync-task-factory'; import SyncbackTaskRunner from './syncback-task-runner' -import LocalSyncDeltaEmitter from './local-sync-delta-emitter' + const {SYNC_STATE_RUNNING, SYNC_STATE_AUTH_FAILED, SYNC_STATE_ERROR} = Account const AC_SYNC_LOOP_INTERVAL_MS = 10 * 1000 // 10 sec @@ -37,7 +37,6 @@ class SyncWorker { this._currentTask = null this._mailListenerConn = null this._interruptible = new Interruptible() - this._localDeltas = new LocalSyncDeltaEmitter(db, account.id) this._logger = global.Logger.forAccount(account) this._startTime = Date.now()