Mailspring/internal_packages/worker-sync/lib/nylas-sync-worker.es6
Evan Morikawa 083021d860 perf(delta): replaces API delta stream with direct in-memory one
Summary:
This replaces the API delta stream with a direct in-memory one

Depends on D3548

Test Plan: manual

Reviewers: halla, jackie

Reviewed By: halla, jackie

Subscribers: juan

Differential Revision: https://phab.nylas.com/D3549
2016-12-21 18:44:17 -08:00

112 lines
3.3 KiB
JavaScript

import _ from 'underscore';
import {
Actions,
N1CloudAPI,
DatabaseStore,
NylasLongConnection,
} from 'nylas-exports';
import DeltaStreamingConnection from './delta-streaming-connection';
import DeltaStreamingInMemoryConnection from './delta-streaming-in-memory-connection';
import DeltaProcessor from './delta-processor'
import ContactRankingsCache from './contact-rankings-cache';
/**
* This manages the syncing of N1 assets. We create one worker per email
* account. We save the state of the worker in the database.
*
* The `state` takes the following schema:
* this._state = {
* "deltaCursors": {
* n1Cloud: 523,
* localSync: 1108,
* }
* "deltaStatus": {
* n1Cloud: "closed",
* localSync: "connecting",
* }
* }
*
* It can be null to indicate
*/
export default class NylasSyncWorker {
constructor(account) {
this._state = { deltaCursors: {}, deltaStatus: {} }
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._account = account;
this._unlisten = Actions.retrySync.listen(this.refresh.bind(this), this);
this._deltaStreams = this._setupDeltaStreams(account);
this._refreshingCaches = [new ContactRankingsCache(account.id)];
NylasEnv.onBeforeUnload = (readyToUnload) => {
this._writeState().finally(readyToUnload)
}
NylasEnv.localSyncEmitter.on("refreshLocalDeltas", (accountId) => {
if (accountId !== account.id) return;
this._deltaStreams.localSync.end()
this._deltaStreams.localSync.start()
})
}
loadStateFromDatabase() {
return DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`).then(json => {
if (!json) return;
this._state = json;
if (!this._state.deltaCursors) this._state.deltaCursors = {}
if (!this._state.deltaStatus) this._state.deltaStatus = {}
});
}
account() {
return this._account;
}
refresh() {
this.cleanup();
// Cleanup defaults to an "ENDED" socket. We need to indicate it's
// merely closed and can be re-opened again immediately.
_.map(this._deltaStreams, s => s.setStatus(NylasLongConnection.Status.Closed))
return this.start();
}
start = () => {
this._refreshingCaches.map(c => c.start());
_.map(this._deltaStreams, s => s.start())
}
cleanup() {
this._unlisten();
_.map(this._deltaStreams, s => s.end())
this._refreshingCaches.map(c => c.end());
}
_setupDeltaStreams = (account) => {
const localSync = new DeltaStreamingInMemoryConnection(account.id, this._deltaStreamOpts("localSync"));
const n1Cloud = new DeltaStreamingConnection(N1CloudAPI,
account.id, this._deltaStreamOpts("n1Cloud"));
return {localSync, n1Cloud};
}
_deltaStreamOpts = (streamName) => {
return {
getCursor: () => this._state.deltaCursors[streamName],
setCursor: (val) => {
this._state.deltaCursors[streamName] = val;
this._writeStateDebounced();
},
onDeltas: DeltaProcessor.process.bind(DeltaProcessor),
onStatusChanged: (status) => {
this._state.deltaStatus[streamName] = status;
this._writeStateDebounced();
},
}
}
_writeState() {
return DatabaseStore.inTransaction(t => {
return t.persistJSONBlob(`NylasSyncWorker:${this._account.id}`, this._state);
});
}
}