mirror of
https://github.com/Foundry376/Mailspring.git
synced 2024-09-21 07:46:06 +08:00
Get rid of more code no longer needed
This commit is contained in:
parent
6aa666837e
commit
2d1ce345c3
|
@ -1,9 +0,0 @@
|
||||||
import SyncHealthChecker from './sync-health-checker'
|
|
||||||
|
|
||||||
export function activate() {
|
|
||||||
SyncHealthChecker.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
export function deactivate() {
|
|
||||||
SyncHealthChecker.stop()
|
|
||||||
}
|
|
|
@ -1,114 +0,0 @@
|
||||||
import {ipcRenderer} from 'electron'
|
|
||||||
import {IdentityStore, AccountStore, Actions, NylasAPI, NylasAPIRequest} from 'nylas-exports'
|
|
||||||
|
|
||||||
const CHECK_HEALTH_INTERVAL = 5 * 60 * 1000;
|
|
||||||
|
|
||||||
class SyncHealthChecker {
|
|
||||||
constructor() {
|
|
||||||
this._lastSyncActivity = null
|
|
||||||
this._interval = null
|
|
||||||
}
|
|
||||||
|
|
||||||
start() {
|
|
||||||
if (this._interval) {
|
|
||||||
console.warn('SyncHealthChecker has already been started')
|
|
||||||
} else {
|
|
||||||
this._interval = setInterval(this._checkSyncHealth, CHECK_HEALTH_INTERVAL)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stop() {
|
|
||||||
clearInterval(this._interval)
|
|
||||||
this._interval = null
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is a separate function so the request can be manipulated in the specs
|
|
||||||
_buildRequest = () => {
|
|
||||||
return new NylasAPIRequest({
|
|
||||||
api: NylasAPI,
|
|
||||||
options: {
|
|
||||||
accountId: AccountStore.accounts()[0].id,
|
|
||||||
path: `/health`,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
_checkSyncHealth = async () => {
|
|
||||||
try {
|
|
||||||
if (!IdentityStore.identity()) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const request = this._buildRequest()
|
|
||||||
const response = await request.run()
|
|
||||||
this._lastSyncActivity = response
|
|
||||||
} catch (err) {
|
|
||||||
if (/ECONNREFUSED/i.test(err.toString())) {
|
|
||||||
this._onWorkerWindowUnavailable()
|
|
||||||
} else {
|
|
||||||
err.message = `Error checking sync health: ${err.message}`
|
|
||||||
NylasEnv.reportError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onWorkerWindowUnavailable() {
|
|
||||||
let extraData = {};
|
|
||||||
|
|
||||||
// Extract data that we want to report. We'll report the entire
|
|
||||||
// _lastSyncActivity object, but it'll probably be useful if we can segment
|
|
||||||
// by the data in the oldest or newest entry, so we report those as
|
|
||||||
// individual values too.
|
|
||||||
const lastActivityEntries = Object.entries(this._lastSyncActivity || {})
|
|
||||||
if (lastActivityEntries.length > 0) {
|
|
||||||
const times = lastActivityEntries.map((entry) => entry[1].time)
|
|
||||||
const now = Date.now()
|
|
||||||
|
|
||||||
const maxTime = Math.max(...times)
|
|
||||||
const mostRecentEntry = lastActivityEntries.find((entry) => entry[1].time === maxTime)
|
|
||||||
const [mostRecentActivityAccountId, {
|
|
||||||
activity: mostRecentActivity,
|
|
||||||
time: mostRecentActivityTime,
|
|
||||||
}] = mostRecentEntry;
|
|
||||||
const mostRecentDuration = now - mostRecentActivityTime
|
|
||||||
|
|
||||||
const minTime = Math.min(...times)
|
|
||||||
const leastRecentEntry = lastActivityEntries.find((entry) => entry[1].time === minTime)
|
|
||||||
const [leastRecentActivityAccountId, {
|
|
||||||
activity: leastRecentActivity,
|
|
||||||
time: leastRecentActivityTime,
|
|
||||||
}] = leastRecentEntry;
|
|
||||||
const leastRecentDuration = now - leastRecentActivityTime
|
|
||||||
|
|
||||||
extraData = {
|
|
||||||
mostRecentActivity,
|
|
||||||
mostRecentActivityTime,
|
|
||||||
mostRecentActivityAccountId,
|
|
||||||
mostRecentDuration,
|
|
||||||
leastRecentActivity,
|
|
||||||
leastRecentActivityTime,
|
|
||||||
leastRecentActivityAccountId,
|
|
||||||
leastRecentDuration,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
NylasEnv.reportError(new Error('Worker window was unavailable'), {
|
|
||||||
// This information isn't as useful in Sentry, but include it here until
|
|
||||||
// the data is actually sent to Mixpanel. (See the TODO below)
|
|
||||||
lastActivityPerAccount: this._lastSyncActivity,
|
|
||||||
...extraData,
|
|
||||||
})
|
|
||||||
|
|
||||||
// TODO: This doesn't make it to Mixpanel because our analytics process
|
|
||||||
// lives in the worker window. We should move analytics to the main process.
|
|
||||||
// https://phab.nylas.com/T8029
|
|
||||||
Actions.recordUserEvent('Worker Window Unavailable', {
|
|
||||||
lastActivityPerAccount: this._lastSyncActivity,
|
|
||||||
...extraData,
|
|
||||||
})
|
|
||||||
|
|
||||||
console.log(`Detected worker window was unavailable. Restarting it.`, this._lastSyncActivity)
|
|
||||||
ipcRenderer.send('ensure-worker-window')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default new SyncHealthChecker()
|
|
|
@ -1,11 +0,0 @@
|
||||||
{
|
|
||||||
"name": "sync-health-checker",
|
|
||||||
"version": "0.1.0",
|
|
||||||
"main": "./lib/main",
|
|
||||||
"description": "Periodically ping the sync process to ensure it's running",
|
|
||||||
"license": "GPL-3.0",
|
|
||||||
"private": true,
|
|
||||||
"engines": {
|
|
||||||
"nylas": "*"
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,44 +0,0 @@
|
||||||
import {ipcRenderer} from 'electron'
|
|
||||||
import SyncHealthChecker from '../lib/sync-health-checker'
|
|
||||||
|
|
||||||
const requestWithErrorResponse = () => {
|
|
||||||
return {
|
|
||||||
run: async () => {
|
|
||||||
throw new Error('ECONNREFUSED');
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const activityData = {account1: {time: 1490305104619, activity: ['activity']}}
|
|
||||||
|
|
||||||
const requestWithDataResponse = () => {
|
|
||||||
return {
|
|
||||||
run: async () => {
|
|
||||||
return activityData
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('SyncHealthChecker', () => {
|
|
||||||
describe('when the worker window is not available', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
spyOn(SyncHealthChecker, '_buildRequest').andCallFake(requestWithErrorResponse)
|
|
||||||
spyOn(ipcRenderer, 'send')
|
|
||||||
spyOn(NylasEnv, 'reportError')
|
|
||||||
})
|
|
||||||
it('attempts to restart it', async () => {
|
|
||||||
await SyncHealthChecker._checkSyncHealth();
|
|
||||||
expect(NylasEnv.reportError.calls.length).toEqual(1)
|
|
||||||
expect(ipcRenderer.send.calls[0].args[0]).toEqual('ensure-worker-window')
|
|
||||||
})
|
|
||||||
})
|
|
||||||
describe('when data is returned', () => {
|
|
||||||
beforeEach(() => {
|
|
||||||
spyOn(SyncHealthChecker, '_buildRequest').andCallFake(requestWithDataResponse)
|
|
||||||
})
|
|
||||||
it('stores the data', async () => {
|
|
||||||
await SyncHealthChecker._checkSyncHealth();
|
|
||||||
expect(SyncHealthChecker._lastSyncActivity).toEqual(activityData)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
|
@ -1,441 +0,0 @@
|
||||||
_ = require 'underscore'
|
|
||||||
{NylasAPI, NylasAPIHelpers, NylasAPIRequest, Actions, DatabaseStore, DatabaseWriter, Account, Thread} = require 'nylas-exports'
|
|
||||||
DeltaStreamingConnection = require('../../src/services/delta-streaming-connection').default
|
|
||||||
|
|
||||||
# TODO these are badly out of date, we need to rewrite them
|
|
||||||
xdescribe "DeltaStreamingConnection", ->
|
|
||||||
beforeEach ->
|
|
||||||
@apiRequests = []
|
|
||||||
spyOn(NylasAPIRequest.prototype, "run").andCallFake ->
|
|
||||||
@apiRequests.push({requestOptions: this.options})
|
|
||||||
@localSyncCursorStub = undefined
|
|
||||||
@n1CloudCursorStub = undefined
|
|
||||||
# spyOn(DeltaStreamingConnection.prototype, '_fetchMetadata').andReturn(Promise.resolve())
|
|
||||||
spyOn(DatabaseWriter.prototype, 'persistJSONBlob').andReturn(Promise.resolve())
|
|
||||||
spyOn(DatabaseStore, 'findJSONBlob').andCallFake (key) =>
|
|
||||||
if key is "NylasSyncWorker:#{TEST_ACCOUNT_ID}"
|
|
||||||
return Promise.resolve _.extend {}, {
|
|
||||||
"deltaCursors": {
|
|
||||||
"localSync": @localSyncCursorStub,
|
|
||||||
"n1Cloud": @n1CloudCursorStub,
|
|
||||||
}
|
|
||||||
"initialized": true,
|
|
||||||
"contacts":
|
|
||||||
busy: true
|
|
||||||
complete: false
|
|
||||||
"calendars":
|
|
||||||
busy:false
|
|
||||||
complete: true
|
|
||||||
}
|
|
||||||
else if key.indexOf('ContactRankings') is 0
|
|
||||||
return Promise.resolve([])
|
|
||||||
else
|
|
||||||
return throw new Error("Not stubbed! #{key}")
|
|
||||||
|
|
||||||
|
|
||||||
spyOn(DeltaStreamingConnection.prototype, 'start')
|
|
||||||
@account = new Account(id: TEST_ACCOUNT_CLIENT_ID, organizationUnit: 'label')
|
|
||||||
@worker = new DeltaStreamingConnection(@account)
|
|
||||||
@worker.loadStateFromDatabase()
|
|
||||||
advanceClock()
|
|
||||||
@worker.start()
|
|
||||||
@worker._metadata = {"a": [{"id":"b"}]}
|
|
||||||
@deltaStreams = @worker._deltaStreams
|
|
||||||
advanceClock()
|
|
||||||
|
|
||||||
it "should reset `busy` to false when reading state from disk", ->
|
|
||||||
@worker = new DeltaStreamingConnection(@account)
|
|
||||||
spyOn(@worker, '_resume')
|
|
||||||
@worker.loadStateFromDatabase()
|
|
||||||
advanceClock()
|
|
||||||
expect(@worker._state.contacts.busy).toEqual(false)
|
|
||||||
|
|
||||||
describe "start", ->
|
|
||||||
it "should open the delta connection", ->
|
|
||||||
@worker.start()
|
|
||||||
advanceClock()
|
|
||||||
expect(@deltaStreams.localSync.start).toHaveBeenCalled()
|
|
||||||
expect(@deltaStreams.n1Cloud.start).toHaveBeenCalled()
|
|
||||||
|
|
||||||
it "should start querying for model collections that haven't been fully cached", ->
|
|
||||||
waitsForPromise => @worker.start().then =>
|
|
||||||
expect(@apiRequests.length).toBe(7)
|
|
||||||
modelsRequested = _.compact _.map @apiRequests, ({model}) -> model
|
|
||||||
expect(modelsRequested).toEqual(['threads', 'messages', 'folders', 'labels', 'drafts', 'contacts', 'events'])
|
|
||||||
|
|
||||||
expect(modelsRequested).toEqual(['threads', 'messages', 'folders', 'labels', 'drafts', 'contacts', 'events'])
|
|
||||||
|
|
||||||
it "should fetch 1000 labels and folders, to prevent issues where Inbox is not in the first page", ->
|
|
||||||
labelsRequest = _.find @apiRequests, (r) -> r.model is 'labels'
|
|
||||||
expect(labelsRequest.params.limit).toBe(1000)
|
|
||||||
|
|
||||||
it "should mark incomplete collections as `busy`", ->
|
|
||||||
@worker.start()
|
|
||||||
advanceClock()
|
|
||||||
nextState = @worker._state
|
|
||||||
|
|
||||||
for collection in ['contacts','threads','drafts', 'labels']
|
|
||||||
expect(nextState[collection].busy).toEqual(true)
|
|
||||||
|
|
||||||
it "should initialize count and fetched to 0", ->
|
|
||||||
@worker.start()
|
|
||||||
advanceClock()
|
|
||||||
nextState = @worker._state
|
|
||||||
|
|
||||||
for collection in ['contacts','threads','drafts', 'labels']
|
|
||||||
expect(nextState[collection].fetched).toEqual(0)
|
|
||||||
expect(nextState[collection].count).toEqual(0)
|
|
||||||
|
|
||||||
it "after failures, it should attempt to resume periodically but back off as failures continue", ->
|
|
||||||
simulateNetworkFailure = =>
|
|
||||||
@apiRequests[0].requestOptions.error({statusCode: 400})
|
|
||||||
@apiRequests = []
|
|
||||||
|
|
||||||
spyOn(@worker, '_resume').andCallThrough()
|
|
||||||
spyOn(Math, 'random').andReturn(1.0)
|
|
||||||
@worker.start()
|
|
||||||
|
|
||||||
expectThings = (resumeCallCount, randomCallCount) =>
|
|
||||||
expect(@worker._resume.callCount).toBe(resumeCallCount)
|
|
||||||
expect(Math.random.callCount).toBe(randomCallCount)
|
|
||||||
|
|
||||||
expect(@worker._resume.callCount).toBe(1, 1)
|
|
||||||
simulateNetworkFailure(); expectThings(1, 1)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(2, 1)
|
|
||||||
simulateNetworkFailure(); expectThings(2, 2)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(2, 2)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(3, 2)
|
|
||||||
simulateNetworkFailure(); expectThings(3, 3)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(3, 3)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(3, 3)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(4, 3)
|
|
||||||
simulateNetworkFailure(); expectThings(4, 4)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(4, 4)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(4, 4)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(4, 4)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(4, 4)
|
|
||||||
advanceClock(4000); advanceClock(); expectThings(5, 4)
|
|
||||||
|
|
||||||
it "handles the request as a failure if we try and grab labels or folders without an 'inbox'", ->
|
|
||||||
spyOn(@worker, '_resume').andCallThrough()
|
|
||||||
@worker.start()
|
|
||||||
expect(@worker._resume.callCount).toBe(1)
|
|
||||||
request = _.findWhere(@apiRequests, model: 'labels')
|
|
||||||
request.requestOptions.success([])
|
|
||||||
expect(@worker._resume.callCount).toBe(1)
|
|
||||||
advanceClock(30000); advanceClock()
|
|
||||||
expect(@worker._resume.callCount).toBe(2)
|
|
||||||
|
|
||||||
it "handles the request as a success if we try and grab labels or folders and it includes the 'inbox'", ->
|
|
||||||
spyOn(@worker, '_resume').andCallThrough()
|
|
||||||
@worker.start()
|
|
||||||
expect(@worker._resume.callCount).toBe(1)
|
|
||||||
request = _.findWhere(@apiRequests, model: 'labels')
|
|
||||||
request.requestOptions.success([{name: "inbox"}, {name: "archive"}])
|
|
||||||
expect(@worker._resume.callCount).toBe(1)
|
|
||||||
advanceClock(30000); advanceClock()
|
|
||||||
expect(@worker._resume.callCount).toBe(1)
|
|
||||||
|
|
||||||
describe "delta streaming cursor", ->
|
|
||||||
it "should read the cursor from the database", ->
|
|
||||||
spyOn(DeltaStreamingConnection.prototype, 'latestCursor').andReturn Promise.resolve()
|
|
||||||
|
|
||||||
@localSyncCursorStub = undefined
|
|
||||||
@n1CloudCursorStub = undefined
|
|
||||||
|
|
||||||
# no cursor present
|
|
||||||
worker = new DeltaStreamingConnection(@account)
|
|
||||||
deltaStreams = worker._deltaStreams
|
|
||||||
expect(deltaStreams.localSync.hasCursor()).toBe(false)
|
|
||||||
expect(deltaStreams.n1Cloud.hasCursor()).toBe(false)
|
|
||||||
worker.loadStateFromDatabase()
|
|
||||||
advanceClock()
|
|
||||||
expect(deltaStreams.localSync.hasCursor()).toBe(false)
|
|
||||||
expect(deltaStreams.n1Cloud.hasCursor()).toBe(false)
|
|
||||||
|
|
||||||
# cursor present in database
|
|
||||||
@localSyncCursorStub = "new-school"
|
|
||||||
@n1CloudCursorStub = 123
|
|
||||||
|
|
||||||
worker = new DeltaStreamingConnection(@account)
|
|
||||||
deltaStreams = worker._deltaStreams
|
|
||||||
expect(deltaStreams.localSync.hasCursor()).toBe(false)
|
|
||||||
expect(deltaStreams.n1Cloud.hasCursor()).toBe(false)
|
|
||||||
worker.loadStateFromDatabase()
|
|
||||||
advanceClock()
|
|
||||||
expect(deltaStreams.localSync.hasCursor()).toBe(true)
|
|
||||||
expect(deltaStreams.n1Cloud.hasCursor()).toBe(true)
|
|
||||||
expect(deltaStreams.localSync._getCursor()).toEqual('new-school')
|
|
||||||
expect(deltaStreams.n1Cloud._getCursor()).toEqual(123)
|
|
||||||
|
|
||||||
it "should set the cursor to the last cursor after receiving deltas", ->
|
|
||||||
spyOn(DeltaStreamingConnection.prototype, 'latestCursor').andReturn Promise.resolve()
|
|
||||||
worker = new DeltaStreamingConnection(@account)
|
|
||||||
advanceClock()
|
|
||||||
deltaStreams = worker._deltaStreams
|
|
||||||
deltas = [{cursor: '1'}, {cursor: '2'}]
|
|
||||||
deltaStreams.localSync._emitter.emit('results-stopped-arriving', deltas)
|
|
||||||
deltaStreams.n1Cloud._emitter.emit('results-stopped-arriving', deltas)
|
|
||||||
advanceClock()
|
|
||||||
expect(deltaStreams.localSync._getCursor()).toEqual('2')
|
|
||||||
expect(deltaStreams.n1Cloud._getCursor()).toEqual('2')
|
|
||||||
|
|
||||||
describe "_resume", ->
|
|
||||||
it "should fetch metadata first and fetch other collections when metadata is ready", ->
|
|
||||||
fetchAllMetadataCallback = null
|
|
||||||
spyOn(@worker, '_fetchCollectionPage')
|
|
||||||
@worker._state = {}
|
|
||||||
@worker._resume()
|
|
||||||
expect(@worker._fetchMetadata).toHaveBeenCalled()
|
|
||||||
expect(@worker._fetchCollectionPage.calls.length).toBe(0)
|
|
||||||
advanceClock()
|
|
||||||
expect(@worker._fetchCollectionPage.calls.length).not.toBe(0)
|
|
||||||
|
|
||||||
it "should fetch collections for which `_shouldFetchCollection` returns true", ->
|
|
||||||
spyOn(@worker, '_fetchCollectionPage')
|
|
||||||
spyOn(@worker, '_shouldFetchCollection').andCallFake (collection) =>
|
|
||||||
return collection.model in ['threads', 'labels', 'drafts']
|
|
||||||
@worker._resume()
|
|
||||||
advanceClock()
|
|
||||||
advanceClock()
|
|
||||||
expect(@worker._fetchCollectionPage.calls.map (call) -> call.args[0]).toEqual(['threads', 'labels', 'drafts'])
|
|
||||||
|
|
||||||
it "should be called when Actions.retryDeltaConnection is received", ->
|
|
||||||
spyOn(DeltaStreamingConnection.prototype, 'latestCursor').andReturn Promise.resolve()
|
|
||||||
|
|
||||||
# TODO why do we need to call through?
|
|
||||||
spyOn(@worker, '_resume').andCallThrough()
|
|
||||||
Actions.retryDeltaConnection()
|
|
||||||
expect(@worker._resume).toHaveBeenCalled()
|
|
||||||
|
|
||||||
describe "_shouldFetchCollection", ->
|
|
||||||
it "should return false if the collection sync is already in progress", ->
|
|
||||||
@worker._state.threads = {
|
|
||||||
'busy': true
|
|
||||||
'complete': false
|
|
||||||
}
|
|
||||||
expect(@worker._shouldFetchCollection({model: 'threads'})).toBe(false)
|
|
||||||
|
|
||||||
it "should return false if the collection sync is already complete", ->
|
|
||||||
@worker._state.threads = {
|
|
||||||
'busy': false
|
|
||||||
'complete': true
|
|
||||||
}
|
|
||||||
expect(@worker._shouldFetchCollection({model: 'threads'})).toBe(false)
|
|
||||||
|
|
||||||
it "should return true otherwise", ->
|
|
||||||
@worker._state.threads = {
|
|
||||||
'busy': false
|
|
||||||
'complete': false
|
|
||||||
}
|
|
||||||
expect(@worker._shouldFetchCollection({model: 'threads'})).toBe(true)
|
|
||||||
@worker._state.threads = undefined
|
|
||||||
expect(@worker._shouldFetchCollection({model: 'threads'})).toBe(true)
|
|
||||||
|
|
||||||
describe "_fetchCollection", ->
|
|
||||||
beforeEach ->
|
|
||||||
@apiRequests = []
|
|
||||||
|
|
||||||
it "should pass any metadata it preloaded", ->
|
|
||||||
@worker._state.threads = {
|
|
||||||
'busy': false
|
|
||||||
'complete': false
|
|
||||||
}
|
|
||||||
@worker._fetchCollection({model: 'threads'})
|
|
||||||
expect(@apiRequests[0].model).toBe('threads')
|
|
||||||
expect(@apiRequests[0].requestOptions.metadataToAttach).toBe(@worker._metadata)
|
|
||||||
|
|
||||||
describe "when there is no request history (`lastRequestRange`)", ->
|
|
||||||
it "should start the first request for models", ->
|
|
||||||
@worker._state.threads = {
|
|
||||||
'busy': false
|
|
||||||
'complete': false
|
|
||||||
}
|
|
||||||
@worker._fetchCollection({model: 'threads'})
|
|
||||||
expect(@apiRequests[0].model).toBe('threads')
|
|
||||||
expect(@apiRequests[0].params.offset).toBe(0)
|
|
||||||
|
|
||||||
describe "when it was previously trying to fetch a page (`lastRequestRange`)", ->
|
|
||||||
beforeEach ->
|
|
||||||
@worker._state.threads =
|
|
||||||
'count': 1200
|
|
||||||
'fetched': 100
|
|
||||||
'busy': false
|
|
||||||
'complete': false
|
|
||||||
'error': new Error("Something bad")
|
|
||||||
'lastRequestRange':
|
|
||||||
offset: 100
|
|
||||||
limit: 50
|
|
||||||
|
|
||||||
it "should start paginating from the request that was interrupted", ->
|
|
||||||
@worker._fetchCollection({model: 'threads'})
|
|
||||||
expect(@apiRequests[0].model).toBe('threads')
|
|
||||||
expect(@apiRequests[0].params.offset).toBe(100)
|
|
||||||
expect(@apiRequests[0].params.limit).toBe(50)
|
|
||||||
|
|
||||||
it "should not reset the `count`, `fetched` or start fetching the count", ->
|
|
||||||
@worker._fetchCollection({model: 'threads'})
|
|
||||||
expect(@worker._state.threads.fetched).toBe(100)
|
|
||||||
expect(@worker._state.threads.count).toBe(1200)
|
|
||||||
expect(@apiRequests.length).toBe(1)
|
|
||||||
|
|
||||||
describe 'when maxFetchCount option is specified', ->
|
|
||||||
it "should only fetch maxFetch count on the first request if it is less than initialPageSize", ->
|
|
||||||
@worker._state.messages =
|
|
||||||
count: 1000
|
|
||||||
fetched: 0
|
|
||||||
@worker._fetchCollection({model: 'messages', initialPageSize: 30, maxFetchCount: 25})
|
|
||||||
expect(@apiRequests[0].params.offset).toBe 0
|
|
||||||
expect(@apiRequests[0].params.limit).toBe 25
|
|
||||||
|
|
||||||
it "sould only fetch the maxFetchCount when restoring from saved state", ->
|
|
||||||
@worker._state.messages =
|
|
||||||
count: 1000
|
|
||||||
fetched: 470
|
|
||||||
lastRequestRange: {
|
|
||||||
limit: 50,
|
|
||||||
offset: 470,
|
|
||||||
}
|
|
||||||
@worker._fetchCollection({model: 'messages', maxFetchCount: 500})
|
|
||||||
expect(@apiRequests[0].params.offset).toBe 470
|
|
||||||
expect(@apiRequests[0].params.limit).toBe 30
|
|
||||||
|
|
||||||
describe "_fetchCollectionPage", ->
|
|
||||||
beforeEach ->
|
|
||||||
@apiRequests = []
|
|
||||||
|
|
||||||
describe 'when maxFetchCount option is specified', ->
|
|
||||||
it 'should not fetch next page if maxFetchCount has been reached', ->
|
|
||||||
@worker._state.messages =
|
|
||||||
count: 1000
|
|
||||||
fetched: 470
|
|
||||||
@worker._fetchCollectionPage('messages', {limit: 30, offset: 470}, {maxFetchCount: 500})
|
|
||||||
{success} = @apiRequests[0].requestOptions
|
|
||||||
success({length: 30})
|
|
||||||
expect(@worker._state.messages.fetched).toBe 500
|
|
||||||
advanceClock(2000); advanceClock()
|
|
||||||
expect(@apiRequests.length).toBe 1
|
|
||||||
|
|
||||||
it 'should limit by maxFetchCount when requesting the next page', ->
|
|
||||||
@worker._state.messages =
|
|
||||||
count: 1000
|
|
||||||
fetched: 450
|
|
||||||
@worker._fetchCollectionPage('messages', {limit: 30, offset: 450 }, {maxFetchCount: 500})
|
|
||||||
{success} = @apiRequests[0].requestOptions
|
|
||||||
success({length: 30})
|
|
||||||
expect(@worker._state.messages.fetched).toBe 480
|
|
||||||
advanceClock(2000); advanceClock()
|
|
||||||
expect(@apiRequests[1].params.offset).toBe 480
|
|
||||||
expect(@apiRequests[1].params.limit).toBe 20
|
|
||||||
|
|
||||||
describe "when an API request completes", ->
|
|
||||||
beforeEach ->
|
|
||||||
@worker.start()
|
|
||||||
advanceClock()
|
|
||||||
@request = @apiRequests[0]
|
|
||||||
@apiRequests = []
|
|
||||||
|
|
||||||
describe "successfully, with models", ->
|
|
||||||
it "should start out by requesting a small number of items", ->
|
|
||||||
expect(@request.params.limit).toBe DeltaStreamingConnection.INITIAL_PAGE_SIZE
|
|
||||||
|
|
||||||
it "should request the next page", ->
|
|
||||||
pageSize = @request.params.limit
|
|
||||||
models = []
|
|
||||||
models.push(new Thread) for i in [0..(pageSize-1)]
|
|
||||||
@request.requestOptions.success(models)
|
|
||||||
advanceClock(2000); advanceClock()
|
|
||||||
expect(@apiRequests.length).toBe(1)
|
|
||||||
expect(@apiRequests[0].params.offset).toEqual @request.params.offset + pageSize
|
|
||||||
|
|
||||||
it "increase the limit on the next page load by 50%", ->
|
|
||||||
pageSize = @request.params.limit
|
|
||||||
models = []
|
|
||||||
models.push(new Thread) for i in [0..(pageSize-1)]
|
|
||||||
@request.requestOptions.success(models)
|
|
||||||
advanceClock(2000); advanceClock()
|
|
||||||
expect(@apiRequests.length).toBe(1)
|
|
||||||
expect(@apiRequests[0].params.limit).toEqual pageSize * 1.5,
|
|
||||||
|
|
||||||
it "never requests more then MAX_PAGE_SIZE", ->
|
|
||||||
pageSize = @request.params.limit = DeltaStreamingConnection.MAX_PAGE_SIZE
|
|
||||||
models = []
|
|
||||||
models.push(new Thread) for i in [0..(pageSize-1)]
|
|
||||||
@request.requestOptions.success(models)
|
|
||||||
advanceClock(2000); advanceClock()
|
|
||||||
expect(@apiRequests.length).toBe(1)
|
|
||||||
expect(@apiRequests[0].params.limit).toEqual DeltaStreamingConnection.MAX_PAGE_SIZE
|
|
||||||
|
|
||||||
it "should update the fetched count on the collection", ->
|
|
||||||
expect(@worker._state.threads.fetched).toEqual(0)
|
|
||||||
pageSize = @request.params.limit
|
|
||||||
models = []
|
|
||||||
models.push(new Thread) for i in [0..(pageSize-1)]
|
|
||||||
@request.requestOptions.success(models)
|
|
||||||
expect(@worker._state.threads.fetched).toEqual(pageSize)
|
|
||||||
|
|
||||||
describe "successfully, with fewer models than requested", ->
|
|
||||||
beforeEach ->
|
|
||||||
models = []
|
|
||||||
models.push(new Thread) for i in [0..100]
|
|
||||||
@request.requestOptions.success(models)
|
|
||||||
|
|
||||||
it "should not request another page", ->
|
|
||||||
expect(@apiRequests.length).toBe(0)
|
|
||||||
|
|
||||||
it "should update the state to complete", ->
|
|
||||||
expect(@worker._state.threads.busy).toEqual(false)
|
|
||||||
expect(@worker._state.threads.complete).toEqual(true)
|
|
||||||
|
|
||||||
it "should update the fetched count on the collection", ->
|
|
||||||
expect(@worker._state.threads.fetched).toEqual(101)
|
|
||||||
|
|
||||||
describe "successfully, with no models", ->
|
|
||||||
it "should not request another page", ->
|
|
||||||
@request.requestOptions.success([])
|
|
||||||
expect(@apiRequests.length).toBe(0)
|
|
||||||
|
|
||||||
it "should update the state to complete", ->
|
|
||||||
@request.requestOptions.success([])
|
|
||||||
expect(@worker._state.threads.busy).toEqual(false)
|
|
||||||
expect(@worker._state.threads.complete).toEqual(true)
|
|
||||||
|
|
||||||
describe "with an error", ->
|
|
||||||
it "should log the error to the state, along with the range that failed", ->
|
|
||||||
err = new Error("Oh no a network error")
|
|
||||||
@request.requestOptions.error(err)
|
|
||||||
expect(@worker._state.threads.busy).toEqual(false)
|
|
||||||
expect(@worker._state.threads.complete).toEqual(false)
|
|
||||||
expect(@worker._state.threads.error).toEqual(err.toString())
|
|
||||||
expect(@worker._state.threads.lastRequestRange).toEqual({offset: 0, limit: 30})
|
|
||||||
|
|
||||||
it "should not request another page", ->
|
|
||||||
@request.requestOptions.error(new Error("Oh no a network error"))
|
|
||||||
expect(@apiRequests.length).toBe(0)
|
|
||||||
|
|
||||||
describe "succeeds after a previous error", ->
|
|
||||||
beforeEach ->
|
|
||||||
@worker._state.threads.error = new Error("Something bad happened")
|
|
||||||
@worker._state.threads.lastRequestRange = {limit: 10, offset: 10}
|
|
||||||
@request.requestOptions.success([])
|
|
||||||
advanceClock(1)
|
|
||||||
|
|
||||||
it "should clear any previous error and updates lastRequestRange", ->
|
|
||||||
expect(@worker._state.threads.error).toEqual(null)
|
|
||||||
expect(@worker._state.threads.lastRequestRange).toEqual({offset: 0, limit: 30})
|
|
||||||
|
|
||||||
describe "cleanup", ->
|
|
||||||
it "should termiate the delta connection", ->
|
|
||||||
spyOn(@deltaStreams.localSync, 'end')
|
|
||||||
spyOn(@deltaStreams.n1Cloud, 'end')
|
|
||||||
@worker.cleanup()
|
|
||||||
expect(@deltaStreams.localSync.end).toHaveBeenCalled()
|
|
||||||
expect(@deltaStreams.n1Cloud.end).toHaveBeenCalled()
|
|
||||||
|
|
||||||
it "should stop trying to restart failed collection syncs", ->
|
|
||||||
spyOn(console, 'log')
|
|
||||||
spyOn(@worker, '_resume').andCallThrough()
|
|
||||||
@worker.cleanup()
|
|
||||||
advanceClock(50000); advanceClock()
|
|
||||||
expect(@worker._resume.callCount).toBe(0)
|
|
||||||
|
|
|
@ -18,9 +18,7 @@ import AutoUpdateManager from './auto-update-manager';
|
||||||
import SystemTrayManager from './system-tray-manager';
|
import SystemTrayManager from './system-tray-manager';
|
||||||
import DefaultClientHelper from '../default-client-helper';
|
import DefaultClientHelper from '../default-client-helper';
|
||||||
import NylasProtocolHandler from './nylas-protocol-handler';
|
import NylasProtocolHandler from './nylas-protocol-handler';
|
||||||
import PackageMigrationManager from './package-migration-manager';
|
|
||||||
import ConfigPersistenceManager from './config-persistence-manager';
|
import ConfigPersistenceManager from './config-persistence-manager';
|
||||||
import preventLegacyN1Migration from './prevent-legacy-n1-migration';
|
|
||||||
|
|
||||||
let clipboard = null;
|
let clipboard = null;
|
||||||
|
|
||||||
|
@ -67,14 +65,9 @@ export default class Application extends EventEmitter {
|
||||||
this.configPersistenceManager = new ConfigPersistenceManager({configDirPath, resourcePath});
|
this.configPersistenceManager = new ConfigPersistenceManager({configDirPath, resourcePath});
|
||||||
config.load();
|
config.load();
|
||||||
|
|
||||||
preventLegacyN1Migration(configDirPath)
|
|
||||||
|
|
||||||
this.configMigrator = new ConfigMigrator(this.config, this.databaseReader);
|
this.configMigrator = new ConfigMigrator(this.config, this.databaseReader);
|
||||||
this.configMigrator.migrate()
|
this.configMigrator.migrate()
|
||||||
|
|
||||||
this.packageMigrationManager = new PackageMigrationManager({config, configDirPath, version})
|
|
||||||
this.packageMigrationManager.migrate()
|
|
||||||
|
|
||||||
let initializeInBackground = options.background;
|
let initializeInBackground = options.background;
|
||||||
if (initializeInBackground === undefined) {
|
if (initializeInBackground === undefined) {
|
||||||
initializeInBackground = false;
|
initializeInBackground = false;
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
import fs from 'fs'
|
|
||||||
import path from 'path'
|
|
||||||
import semver from 'semver'
|
|
||||||
|
|
||||||
|
|
||||||
const PACKAGE_MIGRATIONS = [
|
|
||||||
{
|
|
||||||
"version": "0.4.50",
|
|
||||||
"package-migrations": [{
|
|
||||||
"new-name": "composer-markdown",
|
|
||||||
"old-name": "N1-Markdown-Composer",
|
|
||||||
"enabled-by-default": false,
|
|
||||||
}],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "0.4.204",
|
|
||||||
"package-migrations": [{
|
|
||||||
"new-name": "nylas-private-salesforce",
|
|
||||||
"old-name": "nylas-private-salesforce",
|
|
||||||
"enabled-by-default": false,
|
|
||||||
}],
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "2.0.1",
|
|
||||||
"package-migrations": [
|
|
||||||
{
|
|
||||||
"new-name": "thread-snooze",
|
|
||||||
"old-name": "thread-snooze",
|
|
||||||
"enabled-by-default": true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"new-name": "send-reminders",
|
|
||||||
"old-name": "send-reminders",
|
|
||||||
"enabled-by-default": true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"new-name": "send-later",
|
|
||||||
"old-name": "send-later",
|
|
||||||
"enabled-by-default": true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
class PackageMigrationManager {
|
|
||||||
|
|
||||||
constructor({config, version, configDirPath} = {}) {
|
|
||||||
this.config = config
|
|
||||||
this.configDirPath = configDirPath
|
|
||||||
this.version = version
|
|
||||||
this.savedMigrationVersion = this.config.get('core.packageMigrationVersion')
|
|
||||||
}
|
|
||||||
|
|
||||||
getMigrationsToRun() {
|
|
||||||
let migrations;
|
|
||||||
if (this.savedMigrationVersion) {
|
|
||||||
migrations = PACKAGE_MIGRATIONS
|
|
||||||
.filter((migration) => semver.gt(migration.version, this.savedMigrationVersion))
|
|
||||||
.map(migration => migration['package-migrations'])
|
|
||||||
} else {
|
|
||||||
migrations = PACKAGE_MIGRATIONS.map(migration => migration['package-migrations'])
|
|
||||||
}
|
|
||||||
return [].concat(...migrations)
|
|
||||||
}
|
|
||||||
|
|
||||||
migrate() {
|
|
||||||
if (this.savedMigrationVersion === this.version) { return }
|
|
||||||
const migrations = this.getMigrationsToRun()
|
|
||||||
const oldPackNames = migrations.map((mig) => mig['old-name'])
|
|
||||||
const disabledPackNames = this.config.get('core.disabledPackages') || []
|
|
||||||
let oldEnabledPackNames = []
|
|
||||||
|
|
||||||
if (fs.existsSync(path.join(this.configDirPath, 'packages'))) {
|
|
||||||
// Find any external packages that have been manually installed
|
|
||||||
const toMigrate = fs.readdirSync(path.join(this.configDirPath, 'packages'))
|
|
||||||
.filter((packName) => oldPackNames.includes(packName))
|
|
||||||
.filter((packName) => packName[0] !== '.')
|
|
||||||
|
|
||||||
// Move old installed packages to a deprecated folder
|
|
||||||
const deprecatedPath = path.join(this.configDirPath, 'packages-deprecated')
|
|
||||||
if (!fs.existsSync(deprecatedPath)) {
|
|
||||||
fs.mkdirSync(deprecatedPath);
|
|
||||||
}
|
|
||||||
toMigrate.forEach((packName) => {
|
|
||||||
const prevPath = path.join(this.configDirPath, 'packages', packName)
|
|
||||||
const nextPath = path.join(deprecatedPath, packName)
|
|
||||||
fs.renameSync(prevPath, nextPath);
|
|
||||||
});
|
|
||||||
|
|
||||||
oldEnabledPackNames = toMigrate.filter((packName) => (
|
|
||||||
!(disabledPackNames).includes(packName)
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enable any packages that were migrated from an old install and were
|
|
||||||
// enabled, or that should be enabled by default
|
|
||||||
migrations.forEach((migration) => {
|
|
||||||
// If the old install was enabled, keep it that way
|
|
||||||
if (oldEnabledPackNames.includes(migration['old-name'])) { return }
|
|
||||||
// If we want to enable the package by default,
|
|
||||||
if (migration['enabled-by-default']) {
|
|
||||||
if (disabledPackNames.includes(migration['old-name'])) {
|
|
||||||
this.config.removeAtKeyPath('core.disabledPackages', migration['old-name'])
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const newName = migration['new-name']
|
|
||||||
this.config.pushAtKeyPath('core.disabledPackages', newName);
|
|
||||||
})
|
|
||||||
|
|
||||||
this.config.set('core.packageMigrationVersion', this.version)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default PackageMigrationManager
|
|
|
@ -1,22 +0,0 @@
|
||||||
import fs from 'fs'
|
|
||||||
import path from 'path'
|
|
||||||
|
|
||||||
// This function prevents old N1 from destroying its own config and copying the
|
|
||||||
// one from Nylas Mail 2.0. The expected workflow now is to migrate from old
|
|
||||||
// N1 (1.5.0) to Nylas Mail (2.0) instead of the other way around
|
|
||||||
// See https://github.com/nylas/nylas-mail/blob/n1-pro/src/browser/nylas-pro-migrator.es6 for details
|
|
||||||
export default function preventLegacyN1Migration(configDirPath) {
|
|
||||||
try {
|
|
||||||
const legacyConfigPath = path.join(configDirPath, '..', '.nylas', 'config.json')
|
|
||||||
if (!fs.existsSync(legacyConfigPath)) { return }
|
|
||||||
const legacyConfig = require(legacyConfigPath) || {} // eslint-disable-line
|
|
||||||
if (!legacyConfig['*']) {
|
|
||||||
legacyConfig['*'] = {}
|
|
||||||
}
|
|
||||||
legacyConfig['*'].nylasMailBasicMigrationTime = Date.now()
|
|
||||||
fs.writeFileSync(legacyConfigPath, JSON.stringify(legacyConfig))
|
|
||||||
} catch (err) {
|
|
||||||
console.error('Error preventing legacy N1 migration')
|
|
||||||
console.error(err)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,132 +0,0 @@
|
||||||
import _ from 'underscore'
|
|
||||||
import Rx from 'rx-lite'
|
|
||||||
import NylasStore from 'nylas-store'
|
|
||||||
import AccountStore from './account-store'
|
|
||||||
import DatabaseStore from './database-store'
|
|
||||||
import DeltaStreamingConnection from '../../services/delta-streaming-connection'
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* DeltaConnectionStore manages delta connections and
|
|
||||||
* keeps track of the status of delta connections
|
|
||||||
* per account. It will trigger whenever delta conenction
|
|
||||||
* status changes.
|
|
||||||
*
|
|
||||||
* The connection status for any given account has the following shape:
|
|
||||||
*
|
|
||||||
* {
|
|
||||||
* cursor: 0,
|
|
||||||
* status: 'connected',
|
|
||||||
* }
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
class DeltaConnectionStore extends NylasStore {
|
|
||||||
|
|
||||||
constructor() {
|
|
||||||
super()
|
|
||||||
this._unsubscribers = []
|
|
||||||
this._connectionStatesByAccountId = {}
|
|
||||||
this._connectionsByAccountId = new Map()
|
|
||||||
this._connectionStatusSubscriptionsByAccountId = new Map()
|
|
||||||
|
|
||||||
this._isBuildingDeltaConnections = false
|
|
||||||
|
|
||||||
this._triggerDebounced = _.debounce(this.trigger, 100)
|
|
||||||
}
|
|
||||||
|
|
||||||
async activate() {
|
|
||||||
if (!NylasEnv.isWorkWindow()) { return }
|
|
||||||
this._unsubsribers = [
|
|
||||||
this.listenTo(AccountStore, () => this._onAccountsChanged()),
|
|
||||||
]
|
|
||||||
const accountIds = AccountStore.accountIds()
|
|
||||||
this._setupConnectionStatusSubscriptions({newAccountIds: accountIds})
|
|
||||||
await this._setupDeltaStreamingConnections({newAccountIds: accountIds})
|
|
||||||
}
|
|
||||||
|
|
||||||
deactivate() {
|
|
||||||
if (!NylasEnv.isWorkWindow()) { return }
|
|
||||||
this._unsubsribers.forEach(usub => usub())
|
|
||||||
for (const subscription of this._connectionStatusSubscriptionsByAccountId.values()) {
|
|
||||||
subscription.dispose()
|
|
||||||
}
|
|
||||||
this._connectionStatusSubscriptionsByAccountId.clear()
|
|
||||||
}
|
|
||||||
|
|
||||||
getDeltaConnectionStates() {
|
|
||||||
return this._connectionStatesByAccountId
|
|
||||||
}
|
|
||||||
|
|
||||||
_updateState(accountId, nextState) {
|
|
||||||
const currentState = this._connectionStatesByAccountId[accountId] || {}
|
|
||||||
if (_.isEqual(currentState, nextState)) { return }
|
|
||||||
this._connectionStatesByAccountId[accountId] = nextState
|
|
||||||
this._triggerDebounced()
|
|
||||||
}
|
|
||||||
|
|
||||||
async _onAccountsChanged() {
|
|
||||||
const currentIds = Array.from(this._connectionStatusSubscriptionsByAccountId.keys())
|
|
||||||
const nextIds = AccountStore.accountIds()
|
|
||||||
const newAccountIds = _.difference(nextIds, currentIds)
|
|
||||||
const removedAccountIds = _.difference(currentIds, nextIds)
|
|
||||||
|
|
||||||
this._setupConnectionStatusSubscriptions({newAccountIds, removedAccountIds})
|
|
||||||
await this._setupDeltaStreamingConnections({newAccountIds, removedAccountIds})
|
|
||||||
}
|
|
||||||
|
|
||||||
_setupConnectionStatusSubscriptions({newAccountIds = [], removedAccountIds = []} = {}) {
|
|
||||||
removedAccountIds.forEach((accountId) => {
|
|
||||||
if (this._connectionStatusSubscriptionsByAccountId.has(accountId)) {
|
|
||||||
this._connectionStatusSubscriptionsByAccountId.get(accountId).dispose()
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._connectionStatesByAccountId[accountId]) {
|
|
||||||
delete this._connectionStatesByAccountId[accountId]
|
|
||||||
this._triggerDebounced()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
newAccountIds.forEach((accountId) => {
|
|
||||||
if (this._connectionStatusSubscriptionsByAccountId.has(accountId)) { return; }
|
|
||||||
const query = DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${accountId}`)
|
|
||||||
const subscription = Rx.Observable.fromQuery(query)
|
|
||||||
.subscribe((json) => {
|
|
||||||
// We need to copy `json` otherwise the query observable will mutate
|
|
||||||
// the reference to that object
|
|
||||||
this._updateState(accountId, {...json})
|
|
||||||
})
|
|
||||||
this._connectionStatusSubscriptionsByAccountId.set(accountId, subscription)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async _setupDeltaStreamingConnections({newAccountIds = [], removedAccountIds = []} = {}) {
|
|
||||||
if (NylasEnv.inSpecMode()) { return; }
|
|
||||||
|
|
||||||
// We need a function lock on this because on bootup, many legitimate
|
|
||||||
// events coming in may result in this function being called multiple times
|
|
||||||
// in quick succession, which can cause us to start multiple syncs for the
|
|
||||||
// same account
|
|
||||||
if (this._isBuildingDeltaConnections) { return }
|
|
||||||
this._isBuildingDeltaConnections = true;
|
|
||||||
|
|
||||||
try {
|
|
||||||
for (const accountId of newAccountIds) {
|
|
||||||
const account = AccountStore.accountForId(accountId)
|
|
||||||
const newDeltaConnection = new DeltaStreamingConnection(account);
|
|
||||||
await newDeltaConnection.start()
|
|
||||||
this._connectionsByAccountId.set(accountId, newDeltaConnection)
|
|
||||||
}
|
|
||||||
for (const accountId of removedAccountIds) {
|
|
||||||
if (this._connectionsByAccountId.has(accountId)) {
|
|
||||||
const connection = this._connectionsByAccountId.get(accountId)
|
|
||||||
connection.end()
|
|
||||||
this._connectionsByAccountId.delete(accountId)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
this._isBuildingDeltaConnections = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default new DeltaConnectionStore()
|
|
|
@ -162,7 +162,6 @@ lazyLoadAndRegisterStore(`PreferencesUIStore`, 'preferences-ui-store');
|
||||||
lazyLoadAndRegisterStore(`FocusedContentStore`, 'focused-content-store');
|
lazyLoadAndRegisterStore(`FocusedContentStore`, 'focused-content-store');
|
||||||
lazyLoadAndRegisterStore(`MessageBodyProcessor`, 'message-body-processor');
|
lazyLoadAndRegisterStore(`MessageBodyProcessor`, 'message-body-processor');
|
||||||
lazyLoadAndRegisterStore(`FocusedContactsStore`, 'focused-contacts-store');
|
lazyLoadAndRegisterStore(`FocusedContactsStore`, 'focused-contacts-store');
|
||||||
lazyLoadAndRegisterStore(`DeltaConnectionStore`, 'delta-connection-store');
|
|
||||||
lazyLoadAndRegisterStore(`FolderSyncProgressStore`, 'folder-sync-progress-store');
|
lazyLoadAndRegisterStore(`FolderSyncProgressStore`, 'folder-sync-progress-store');
|
||||||
lazyLoadAndRegisterStore(`FocusedPerspectiveStore`, 'focused-perspective-store');
|
lazyLoadAndRegisterStore(`FocusedPerspectiveStore`, 'focused-perspective-store');
|
||||||
lazyLoadAndRegisterStore(`SearchableComponentStore`, 'searchable-component-store');
|
lazyLoadAndRegisterStore(`SearchableComponentStore`, 'searchable-component-store');
|
||||||
|
|
|
@ -1,215 +0,0 @@
|
||||||
import _ from 'underscore'
|
|
||||||
import {ExponentialBackoffScheduler} from 'isomorphic-core'
|
|
||||||
import N1CloudAPI from '../n1-cloud-api'
|
|
||||||
import Actions from '../flux/actions'
|
|
||||||
import {APIError} from '../flux/errors'
|
|
||||||
import Account from '../flux/models/account'
|
|
||||||
import DeltaProcessor from './delta-processor'
|
|
||||||
import DatabaseStore from '../flux/stores/database-store'
|
|
||||||
import IdentityStore from '../flux/stores/identity-store'
|
|
||||||
import OnlineStatusStore from '../flux/stores/online-status-store'
|
|
||||||
import NylasLongConnection from '../flux/nylas-long-connection'
|
|
||||||
|
|
||||||
|
|
||||||
const MAX_RETRY_DELAY = 10 * 60 * 1000;
|
|
||||||
const BASE_RETRY_DELAY = 1000;
|
|
||||||
|
|
||||||
class DeltaStreamingConnection {
|
|
||||||
constructor(account) {
|
|
||||||
this._account = account
|
|
||||||
this._state = null
|
|
||||||
this._longConnection = null
|
|
||||||
this._retryTimeout = null
|
|
||||||
this._unsubscribers = []
|
|
||||||
this._writeStateDebounced = _.debounce(this._writeState, 100)
|
|
||||||
this._backoffScheduler = new ExponentialBackoffScheduler({
|
|
||||||
baseDelay: BASE_RETRY_DELAY,
|
|
||||||
maxDelay: MAX_RETRY_DELAY,
|
|
||||||
})
|
|
||||||
|
|
||||||
this._setupListeners()
|
|
||||||
NylasEnv.onBeforeUnload = (readyToUnload) => {
|
|
||||||
this._writeState().finally(readyToUnload)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async start() {
|
|
||||||
try {
|
|
||||||
if (!IdentityStore.identity()) {
|
|
||||||
console.warn(`Can't start DeltaStreamingConnection without a Nylas Identity`)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if (!this._state) {
|
|
||||||
this._state = await this._loadState()
|
|
||||||
}
|
|
||||||
const cursor = this._state.cursor || 0
|
|
||||||
this._clearRetryTimeout()
|
|
||||||
this._longConnection = new NylasLongConnection({
|
|
||||||
api: N1CloudAPI,
|
|
||||||
accountId: this._account.id,
|
|
||||||
path: `/delta/streaming?cursor=${cursor}`,
|
|
||||||
throttleResultsInterval: 1000,
|
|
||||||
closeIfDataStopsInterval: 15 * 1000,
|
|
||||||
onError: this._onError,
|
|
||||||
onResults: this._onResults,
|
|
||||||
onStatusChanged: this._onStatusChanged,
|
|
||||||
})
|
|
||||||
this._longConnection.start()
|
|
||||||
} catch (err) {
|
|
||||||
this._onError(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
restart() {
|
|
||||||
try {
|
|
||||||
this._restarting = true
|
|
||||||
this.close();
|
|
||||||
this._disposeListeners()
|
|
||||||
this._setupListeners()
|
|
||||||
this.start();
|
|
||||||
} finally {
|
|
||||||
this._restarting = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
close() {
|
|
||||||
this._clearRetryTimeout()
|
|
||||||
this._disposeListeners()
|
|
||||||
if (this._longConnection) {
|
|
||||||
this._longConnection.close()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
end() {
|
|
||||||
this._clearRetryTimeout()
|
|
||||||
this._disposeListeners()
|
|
||||||
if (this._longConnection) {
|
|
||||||
this._longConnection.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_setupListeners() {
|
|
||||||
this._unsubscribers = [
|
|
||||||
Actions.retryDeltaConnection.listen(this.restart, this),
|
|
||||||
OnlineStatusStore.listen(this._onOnlineStatusChanged, this),
|
|
||||||
IdentityStore.listen(this._onIdentityChanged, this),
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
_disposeListeners() {
|
|
||||||
this._unsubscribers.forEach(usub => usub())
|
|
||||||
this._unsubscribers = []
|
|
||||||
}
|
|
||||||
|
|
||||||
_clearRetryTimeout() {
|
|
||||||
clearTimeout(this._retryTimeout)
|
|
||||||
this._retryTimeout = null
|
|
||||||
}
|
|
||||||
|
|
||||||
_onOnlineStatusChanged = () => {
|
|
||||||
if (OnlineStatusStore.isOnline()) {
|
|
||||||
this.restart()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onIdentityChanged = () => {
|
|
||||||
if (IdentityStore.identity()) {
|
|
||||||
this.restart()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onStatusChanged = (status) => {
|
|
||||||
if (this._restarting) { return; }
|
|
||||||
this._state.status = status;
|
|
||||||
this._writeStateDebounced();
|
|
||||||
const {Closed, Connected} = NylasLongConnection.Status
|
|
||||||
if (status === Connected) {
|
|
||||||
Actions.updateAccount(this._account.id, {
|
|
||||||
n1CloudState: Account.N1_CLOUD_STATE_RUNNING,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if (status === Closed) {
|
|
||||||
if (this._retryTimeout) { return }
|
|
||||||
this._clearRetryTimeout()
|
|
||||||
this._retryTimeout = setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_onResults = (deltas = []) => {
|
|
||||||
this._backoffScheduler.reset()
|
|
||||||
|
|
||||||
const last = _.last(deltas);
|
|
||||||
if (last && last.cursor) {
|
|
||||||
this._setCursor(last.cursor)
|
|
||||||
}
|
|
||||||
DeltaProcessor.process(deltas, {source: 'n1Cloud'})
|
|
||||||
}
|
|
||||||
|
|
||||||
_onError = (err = {}) => {
|
|
||||||
if (err.message && err.message.includes('Invalid cursor')) {
|
|
||||||
// TODO is this still necessary?
|
|
||||||
const error = new Error('DeltaStreamingConnection: Cursor is invalid. Need to blow away local cache.');
|
|
||||||
NylasEnv.reportError(error)
|
|
||||||
this._setCursor(0)
|
|
||||||
const app = require('electron').remote.getGlobal('application') // eslint-disable-line
|
|
||||||
app.rebuildDatabase({showErrorDialog: false})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err.message = `Error connecting to delta stream: ${err.message}`
|
|
||||||
if (!(err instanceof APIError)) {
|
|
||||||
NylasEnv.reportError(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (err.shouldReportError()) {
|
|
||||||
// TODO move this check into NylasEnv.reportError()?
|
|
||||||
NylasEnv.reportError(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (err.statusCode === 401) {
|
|
||||||
Actions.updateAccount(this._account.id, {
|
|
||||||
n1CloudState: Account.N1_CLOUD_STATE_AUTH_FAILED,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_setCursor = (cursor) => {
|
|
||||||
this._state.cursor = cursor;
|
|
||||||
this._writeStateDebounced();
|
|
||||||
}
|
|
||||||
|
|
||||||
async _loadState() {
|
|
||||||
const json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)
|
|
||||||
if (json) {
|
|
||||||
return {
|
|
||||||
cursor: json.cursor || undefined,
|
|
||||||
status: json.status || undefined,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Migrate from old storage key
|
|
||||||
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
|
|
||||||
if (!oldState) {
|
|
||||||
return {
|
|
||||||
cursor: undefined,
|
|
||||||
status: undefined,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const {deltaCursors = {}, deltaStatus = {}} = oldState
|
|
||||||
return {
|
|
||||||
cursor: deltaCursors.n1Cloud,
|
|
||||||
status: deltaStatus.n1Cloud,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async _writeState() {
|
|
||||||
if (!this._state) { return }
|
|
||||||
await DatabaseStore.inTransaction(t =>
|
|
||||||
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export default DeltaStreamingConnection
|
|
Loading…
Reference in a new issue