[client-app] (deltas P6) Split local and cloud deltas

Summary:
This commit splits apart the `AccountDeltaConnection` class, which was
in charge of listening to both cloud /and/ local deltas by way of an
artificial interface, `DeltaStreamingInMemoryConnection`.

Splitting this into 2 modules with separate responsibilities will hopefully
make this code easier to reason about and reduce some cruft and unnecessary
indirection.

Specifically, this commit makes it so:

- `DeltaConnectionManager` is only in charge of starting and ending `DeltaStreamingConnection`s, which are solely in charge of listening to deltas from the cloud api
- `LocalSyncDeltaEmitter` no longer unnecessarily emits events for the `deltas` package to listen to but rather directly processes and saves those deltas from the K2 db to edgehill.db
- `LocalSyncDeltaEmitter` is also in charge of keeping track of the latest received cursor, under its own JSONBlob key in edgehill.db. This migrates localSync cursors saved under the old key.
- `LocalSyncDeltaEmitter` is now instantiated and managed from within the `SyncProcessManager` as opposed to the `SyncWorker`. Apart from removing extra state from the `SyncWorker`, this removes dependencies on the client-app environment from the sync-worker.
- `DeltaStreamingInMemoryConnection` and `AccountDeltaConnection` are now gone

(Sorry for the big diff! This one was a little hard to split up without landing something broken)

Depends on D4121

Test Plan: manual + unit tests planned in upcoming diff

Reviewers: halla, mark, evan, spang

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D4122
This commit is contained in:
Juan Tejada 2017-03-06 22:40:07 -08:00
parent 52b04bcb39
commit 12cbd383bf
8 changed files with 141 additions and 281 deletions

View file

@ -1,142 +0,0 @@
import _ from 'underscore';
import {
Actions,
Account,
APIError,
N1CloudAPI,
DatabaseStore,
NylasLongConnection,
} from 'nylas-exports';
import DeltaStreamingConnection from './delta-streaming-connection';
import DeltaStreamingInMemoryConnection from './delta-streaming-in-memory-connection';
import DeltaProcessor from './delta-processor'
import ContactRankingsCache from './contact-rankings-cache';
const MAX_RETRY_DELAY = 10 * 60 * 1000; // 15 minutes
const BASE_RETRY_DELAY = 1000;
/** This manages the syncing of N1 assets. We create one
* AccountDeltaConnection per email account. We save the state of the
* AccountDeltaConnection in the database.
*
* The `state` takes the following schema:
* this._state = {
* "deltaCursors": {
* n1Cloud: 523,
* localSync: 1108,
* }
* "deltaStatus": {
* n1Cloud: "closed",
* localSync: "connecting",
* }
* }
*
* It can be null to indicate
*/
export default class AccountDeltaConnection {
constructor(account) {
// TODO This class is in the process of being ripped apart, and replaced by
// DeltaStreamingConnection, and will disappear in
// the next diff, but for the purposes of making this diff smaller, I
// haven't removed it yet.
this._n1CloudConn = new DeltaStreamingConnection(account)
this._state = { deltaCursors: {}, deltaStatus: {} }
this.retryDelay = BASE_RETRY_DELAY;
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._account = account;
this._unlisten = Actions.retryDeltaConnection.listen(() => this.refresh());
this._deltaStreams = this._setupDeltaStreams(account);
this._refreshingCaches = [new ContactRankingsCache(account.id)];
NylasEnv.onBeforeUnload = (readyToUnload) => {
this._writeState().finally(readyToUnload)
}
NylasEnv.localSyncEmitter.on("refreshLocalDeltas", (accountId) => {
if (accountId !== account.id) return;
this._deltaStreams.localSync.end()
this._deltaStreams.localSync.start()
})
}
loadStateFromDatabase() {
return DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`).then(json => {
if (!json) return;
this._state = json;
if (!this._state.deltaCursors) this._state.deltaCursors = {}
if (!this._state.deltaStatus) this._state.deltaStatus = {}
});
}
account() {
return this._account;
}
refresh() {
this.cleanup();
this._unlisten = Actions.retryDeltaConnection.listen(() => this.refresh());
// Cleanup defaults to an "ENDED" socket. We need to indicate it's
// merely closed and can be re-opened again immediately.
_.map(this._deltaStreams, s => s.setStatus(NylasLongConnection.Status.Closed))
return this.start();
}
start = () => {
try {
this._n1CloudConn.start()
this._refreshingCaches.map(c => c.start());
_.map(this._deltaStreams, s => s.start())
} catch (err) {
this._onError(err)
}
}
cleanup() {
this._unlisten();
_.map(this._deltaStreams, s => s.end())
this._refreshingCaches.map(c => c.end());
}
_setupDeltaStreams = (account) => {
const localSync = new DeltaStreamingInMemoryConnection(account.id, this._deltaStreamOpts("localSync"));
return {localSync};
}
_deltaStreamOpts = (streamName) => {
return {
getCursor: () => this._state.deltaCursors[streamName],
setCursor: (val) => {
this._state.deltaCursors[streamName] = val;
this._writeStateDebounced();
this.retryDelay = BASE_RETRY_DELAY;
},
onDeltas: DeltaProcessor.process.bind(DeltaProcessor),
onStatusChanged: (status) => {
this._state.deltaStatus[streamName] = status;
this._writeStateDebounced();
},
onError: this._onError,
}
}
_onError = (err = {}) => {
if (err instanceof APIError && err.statusCode === 401) {
Actions.updateAccount(this._account.id, {
syncState: Account.SYNC_STATE_AUTH_FAILED,
syncError: err.toJSON(),
})
}
NylasEnv.reportError(`Error connecting to delta stream: ${err.message}`)
this.cleanup()
setTimeout(() => this.refresh(), this.retryDelay);
this.retryDelay = Math.min(MAX_RETRY_DELAY, this.retryDelay * 1.2)
return
}
_writeState() {
return DatabaseStore.inTransaction(t => {
return t.persistJSONBlob(`NylasSyncWorker:${this._account.id}`, this._state);
});
}
}

View file

@ -1,11 +1,11 @@
import _ from 'underscore';
import {AccountStore} from 'nylas-exports'
import AccountDeltaConnection from './account-delta-connection';
import DeltaStreamingConnection from './delta-streaming-connection';
class DeltaConnectionStore {
constructor() {
this._accountConnections = [];
this._connections = [];
this._unsubscribe = () => {}
}
@ -18,10 +18,6 @@ class DeltaConnectionStore {
this._unsubscribe()
}
_existingConnectionsForAccount(account) {
return _.find(this._accountConnections, c => c.account().id === account.id);
}
async _ensureConnections() {
if (NylasEnv.inSpecMode()) { return; }
@ -33,25 +29,27 @@ class DeltaConnectionStore {
this._isBuildingDeltaConnections = true;
try {
const originalConnections = this._accountConnections;
const currentConnections = []
const currentConnections = this._connections;
const nextConnections = []
for (const account of AccountStore.accounts()) {
const existingDeltaConnection = this._existingConnectionsForAccount(account)
if (existingDeltaConnection) {
currentConnections.push(existingDeltaConnection);
const existingConnection = (
currentConnections
.find(conn => conn.account().id === account.id)
)
if (existingConnection) {
nextConnections.push(existingConnection);
continue
}
const newDeltaConnection = new AccountDeltaConnection(account);
await newDeltaConnection.loadStateFromDatabase()
newDeltaConnection.start()
currentConnections.push(newDeltaConnection);
const newDeltaConnection = new DeltaStreamingConnection(account);
await newDeltaConnection.start()
nextConnections.push(newDeltaConnection);
}
const oldDeltaConnections = _.difference(originalConnections, currentConnections);
const oldDeltaConnections = _.difference(currentConnections, nextConnections);
for (const deltaConnection of oldDeltaConnections) {
deltaConnection.end()
}
this._accountConnections = currentConnections;
this._connections = nextConnections;
} finally {
this._isBuildingDeltaConnections = false;
}

View file

@ -18,7 +18,7 @@ const BASE_RETRY_DELAY = 1000;
class DeltaStreamingConnection {
constructor(account) {
this._account = account
this._state = {cursor: null, status: null}
this._state = null
this._longConnection = null
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._unsubscribers = []
@ -33,8 +33,15 @@ class DeltaStreamingConnection {
}
}
start() {
account() {
return this._account
}
async start() {
try {
if (!this._state) {
this._state = await this._loadState()
}
const {cursor = 0} = this._state
this._longConnection = new NylasLongConnection({
api: N1CloudAPI,
@ -70,28 +77,11 @@ class DeltaStreamingConnection {
}
end() {
this._state = null
this._disposeListeners()
this._longConnection.end()
}
async loadStateFromDatabase() {
let json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)
if (!json) {
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) { return; }
const {deltaCursors = {}, deltaStatus = {}} = oldState
json = {
cursor: deltaCursors.n1Cloud || null,
status: deltaStatus.n1Cloud || null,
}
}
if (!json) { return }
this._state = json;
}
_setupListeners() {
this._unsubscribers = [
Actions.retryDeltaConnection.listen(this.restart, this),
@ -104,17 +94,6 @@ class DeltaStreamingConnection {
this._unsubscribers = []
}
_writeState() {
return DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
);
}
_setCursor = (cursor) => {
this._state.cursor = cursor;
this._writeStateDebounced();
}
_onOnlineStatusChanged = () => {
if (OnlineStatusStore.isOnline()) {
this.restart()
@ -175,6 +154,38 @@ class DeltaStreamingConnection {
setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}
_setCursor = (cursor) => {
this._state.cursor = cursor;
this._writeStateDebounced();
}
async _loadState() {
const json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)
if (json) {
return json
}
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) {
return {cursor: null, status: null};
}
const {deltaCursors = {}, deltaStatus = {}} = oldState
return {
cursor: deltaCursors.n1Cloud,
status: deltaStatus.n1Cloud,
}
}
async _writeState() {
if (!this._state) { return }
await DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
);
}
}
export default DeltaStreamingConnection

View file

@ -1,52 +0,0 @@
/**
* This implements the same interface as the DeltaStreamingConnection
*/
class DeltaStreamingInMemoryConnection {
constructor(accountId, opts) {
this._accountId = accountId
this._getCursor = opts.getCursor
this._setCursor = opts.setCursor
this._onDeltas = opts.onDeltas
this._onStatusChanged = opts.onStatusChanged
this._status = "none"
}
onDeltas = (allDeltas = []) => {
const deltas = allDeltas.filter((d) => d.accountId === this._accountId);
this._onDeltas(deltas, {source: "localSync"});
const last = deltas[deltas.length - 1]
if (last) this._setCursor(last.cursor);
}
get accountId() {
return this._accountId;
}
get status() {
return this._status;
}
setStatus(status) {
this._status = status
this._onStatusChanged(status)
}
start() {
this._disp = NylasEnv.localSyncEmitter.on("localSyncDeltas", this.onDeltas);
NylasEnv.localSyncEmitter.emit("startDeltasFor", {
cursor: this._getCursor() || 0,
accountId: this._accountId,
})
this.setStatus("connected")
}
end() {
if (this._disp && this._disp.dispose) this._disp.dispose()
NylasEnv.localSyncEmitter.emit("endDeltasFor", {
accountId: this._accountId,
})
this.setStatus("ended")
}
}
export default DeltaStreamingInMemoryConnection

View file

@ -226,7 +226,7 @@ export default class NylasEnvConstructor {
this.timer = remote.getGlobal('application').timer;
this.localSyncEmitter = new Emitter();
this.globalWindowEmitter = new Emitter();
if (!this.inSpecMode()) {
this.actionBridge = new ActionBridge(ipcRenderer);

View file

@ -1,35 +1,73 @@
const TransactionConnector = require('../shared/transaction-connector')
const {DeltaStreamBuilder} = require('isomorphic-core')
import _ from 'underscore'
import {DeltaStreamBuilder} from 'isomorphic-core'
import {DatabaseStore, DeltaProcessor} from 'nylas-exports'
import TransactionConnector from '../shared/transaction-connector'
export default class LocalSyncDeltaEmitter {
constructor(db, accountId) {
constructor(account, db) {
this._db = db;
this._accountId = accountId;
NylasEnv.localSyncEmitter.on("startDeltasFor", this._startDeltasFor)
NylasEnv.localSyncEmitter.on("endDeltasFor", this._endDeltasFor)
/**
* The local-sync/sync-worker starts up asynchronously. We need to
* notify N1 client that there are more deltas it should be looking
* for.
*/
NylasEnv.localSyncEmitter.emit("refreshLocalDeltas", accountId)
this._state = null
this._account = account;
this._disposable = {dispose: () => {}}
this._writeStateDebounced = _.debounce(this._writeState, 100)
}
_startDeltasFor = ({accountId, cursor}) => {
if (accountId !== this._accountId) return;
if (this._disp && this._disp.dispose) this._disp.dispose()
this._disp = DeltaStreamBuilder.buildDeltaObservable({
async activate() {
if (this._disposable && this._disposable.dispose) {
this._disposable.dispose()
}
if (!this._state) {
this._state = await this._loadState()
}
const {cursor = 0} = this._state
this._disposable = DeltaStreamBuilder.buildDeltaObservable({
cursor,
db: this._db,
cursor: cursor,
accountId: accountId,
deltasSource: TransactionConnector.getObservableForAccountId(accountId),
}).subscribe((deltas) => {
NylasEnv.localSyncEmitter.emit("localSyncDeltas", deltas)
accountId: this._account.id,
deltasSource: TransactionConnector.getObservableForAccountId(this._account.id),
})
.subscribe((deltas) => {
this._onDeltasReceived(deltas)
})
}
_endDeltasFor = ({accountId}) => {
if (accountId !== this._accountId) return;
if (this._disp && this._disp.dispose) this._disp.dispose()
deactivate() {
this._state = null
if (this._disposable && this._disposable.dispose) {
this._disposable.dispose()
}
}
_onDeltasReceived(deltas = []) {
const last = deltas[deltas.length - 1]
if (last) {
this._state.cursor = last.cursor;
this._writeStateDebounced();
}
DeltaProcessor.process(deltas, {source: "localSync"})
}
async _loadState() {
const json = await DatabaseStore.findJSONBlob(`LocalSyncStatus:${this._account.id}`)
if (json) {
return json
}
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) {
return {}
}
const {deltaCursors = {}} = oldState
return {cursor: deltaCursors.localSync}
}
async _writeState() {
if (!this._state) { return }
await DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`LocalSyncStatus:${this._account.id}`, this._state)
);
}
}

View file

@ -3,27 +3,28 @@ const fs = require('fs')
const {remote} = require('electron')
const {Actions, OnlineStatusStore} = require('nylas-exports')
const SyncWorker = require('./sync-worker');
const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default
const LocalDatabaseConnector = require('../shared/local-database-connector')
class SyncProcessManager {
constructor() {
this._workers = {};
this._accounts = []
this._exiting = false;
this._resettingEmailCache = false
this._workers = {};
this._localSyncDeltaEmitters = new Map()
OnlineStatusStore.listen(this._onOnlineStatusChanged, this)
Actions.resetEmailCache.listen(this._resetEmailCache, this)
Actions.debugSync.listen(this._onDebugSync, this)
Actions.wakeLocalSyncWorkerForAccount.listen((accountId) =>
this.wakeWorkerForAccount(accountId, {interrupt: true})
);
Actions.resetEmailCache.listen(this._resetEmailCache, this)
Actions.debugSync.listen(this._onDebugSync, this)
OnlineStatusStore.listen(this._onOnlineStatusChanged, this)
}
_onOnlineStatusChanged() {
if (OnlineStatusStore.isOnline()) {
this._accounts.forEach(({id}) => {
Object.keys(this._workers).forEach((id) => {
this.wakeWorkerForAccount(id, {reason: 'Came back online'})
})
}
@ -96,13 +97,16 @@ class SyncProcessManager {
const logger = global.Logger.forAccount(account)
try {
const db = await LocalDatabaseConnector.forAccount(account.id);
if (this._workers[account.id]) {
logger.warn(`SyncProcessManager: Worker for account already exists`)
logger.warn(`SyncProcessManager.addWorkerForAccount: Worker for account already exists - skipping`)
return
}
this._accounts.push(account)
const db = await LocalDatabaseConnector.forAccount(account.id);
this._workers[account.id] = new SyncWorker(account, db, this);
const localSyncDeltaEmitter = new LocalSyncDeltaEmitter(account, db)
await localSyncDeltaEmitter.activate()
this._localSyncDeltaEmitters.set(account.id, localSyncDeltaEmitter)
logger.log(`SyncProcessManager: Claiming Account Succeeded`)
} catch (err) {
logger.error(`SyncProcessManager: Claiming Account Failed`, err)
@ -114,9 +118,13 @@ class SyncProcessManager {
await this._workers[accountId].cleanup();
this._workers[accountId] = null;
}
if (this._localSyncDeltaEmitters.has(accountId)) {
this._localSyncDeltaEmitters.get(accountId).deactivate();
this._localSyncDeltaEmitters.delete(accountId)
}
}
}
window.syncProcessManager = new SyncProcessManager();
window.dbs = window.syncProcessManager.dbs.bind(window.syncProcessManager)
module.exports = window.syncProcessManager
window.$n.SyncProcessManager = new SyncProcessManager();
module.exports = window.$n.SyncProcessManager

View file

@ -19,7 +19,7 @@ import {
import Interruptible from '../shared/interruptible'
import SyncTaskFactory from './sync-task-factory';
import SyncbackTaskRunner from './syncback-task-runner'
import LocalSyncDeltaEmitter from './local-sync-delta-emitter'
const {SYNC_STATE_RUNNING, SYNC_STATE_AUTH_FAILED, SYNC_STATE_ERROR} = Account
const AC_SYNC_LOOP_INTERVAL_MS = 10 * 1000 // 10 sec
@ -37,7 +37,6 @@ class SyncWorker {
this._currentTask = null
this._mailListenerConn = null
this._interruptible = new Interruptible()
this._localDeltas = new LocalSyncDeltaEmitter(db, account.id)
this._logger = global.Logger.forAccount(account)
this._startTime = Date.now()