Mailspring/internal_packages/worker-sync/lib/nylas-sync-worker-pool.coffee
Juan Tejada 693bd18267 fix(deltas): Consolidate nylas-long-connection code
- Completely remove nylas-long-connection.coffee file and consolidate all logic into
nylas-long-connection.es6 and worker-sync/delta-streaming-connection.es6:
  - Separate it into DeltaStreamingConnection which is specific to the worker-sync
    package for handling deltas
  - NylasLongConnection is a "reusable" persistent connection to our API
    which is also used by the search package
  - This is a basically a fixed revision of https://phab.nylas.com/D2875,
    see difff for more info
- Makes it so delta streaming does not retry so aggressively on 403s, which
  happens whenever a user’s trial has expired
2016-07-27 02:56:55 -07:00

156 lines
5.4 KiB
CoffeeScript

_ = require 'underscore'
{NylasAPI,
Actions,
AccountStore,
DatabaseStore,
MailRulesProcessor} = require 'nylas-exports'
NylasSyncWorker = require './nylas-sync-worker'
class NylasSyncWorkerPool
constructor: ->
@_workers = []
AccountStore.listen(@_onAccountsChanged, @)
@_onAccountsChanged()
_onAccountsChanged: ->
return if NylasEnv.inSpecMode()
accounts = AccountStore.accounts()
workers = _.map(accounts, @workerForAccount)
# Stop the workers that are not in the new workers list.
# These accounts are no longer in our database, so we shouldn't
# be listening.
old = _.without(@_workers, workers...)
worker.cleanup() for worker in old
@_workers = workers
workers: =>
@_workers
workerForAccount: (account) =>
worker = _.find @_workers, (c) -> c.account().id is account.id
return worker if worker
worker = new NylasSyncWorker(NylasAPI, account)
connection = worker.connection()
connection.onDeltas (deltas) =>
@_handleDeltas(deltas)
@_workers.push(worker)
worker.start()
worker
_cleanupAccountWorkers: ->
for worker in @_workers
worker.cleanup()
@_workers = []
_handleDeltas: (deltas) ->
Actions.longPollReceivedRawDeltas(deltas)
Actions.longPollReceivedRawDeltasPing(deltas.length)
# Create a (non-enumerable) reference from the attributes which we carry forward
# back to their original deltas. This allows us to mark the deltas that the
# app ignores later in the process.
deltas.forEach (delta) ->
if delta.attributes
Object.defineProperty(delta.attributes, '_delta', { get: -> delta })
{create, modify, destroy} = @_clusterDeltas(deltas)
# Remove any metadata deltas. These have to be handled at the end, since metadata
# is stored within the object that it points to (which may not exist yet)
metadata = []
for deltas in [create, modify]
if deltas['metadata']
metadata = metadata.concat(_.values(deltas['metadata']))
delete deltas['metadata']
# Remove any account deltas, which are only used to notify broken/fixed sync state
# on accounts
delete create['account']
delete destroy['account']
if modify['account']
@_handleAccountDeltas(_.values(modify['account']))
delete modify['account']
# Apply all the deltas to create objects. Gets promises for handling
# each type of model in the `create` hash, waits for them all to resolve.
create[type] = NylasAPI._handleModelResponse(_.values(dict)) for type, dict of create
Promise.props(create).then (created) =>
# Apply all the deltas to modify objects. Gets promises for handling
# each type of model in the `modify` hash, waits for them all to resolve.
modify[type] = NylasAPI._handleModelResponse(_.values(dict)) for type, dict of modify
Promise.props(modify).then (modified) =>
Promise.all(@_handleDeltaMetadata(metadata)).then =>
# Now that we've persisted creates/updates, fire an action
# that allows other parts of the app to update based on new models
# (notifications)
if _.flatten(_.values(created)).length > 0
MailRulesProcessor.processMessages(created['message'] ? []).finally =>
Actions.didPassivelyReceiveNewModels(created)
# Apply all of the deletions
destroyPromises = destroy.map(@_handleDeltaDeletion)
Promise.settle(destroyPromises).then =>
Actions.longPollProcessedDeltas()
_clusterDeltas: (deltas) ->
# Group deltas by object type so we can mutate the cache efficiently.
# NOTE: This code must not just accumulate creates, modifies and destroys
# but also de-dupe them. We cannot call "persistModels(itemA, itemA, itemB)"
# or it will throw an exception - use the last received copy of each model
# we see.
create = {}
modify = {}
destroy = []
for delta in deltas
if delta.event is 'create'
create[delta.object] ||= {}
create[delta.object][delta.attributes.id] = delta.attributes
else if delta.event is 'modify'
modify[delta.object] ||= {}
modify[delta.object][delta.attributes.id] = delta.attributes
else if delta.event is 'delete'
destroy.push(delta)
{create, modify, destroy}
_handleDeltaMetadata: (metadata) =>
metadata.map (metadatum) =>
klass = NylasAPI._apiObjectToClassMap[metadatum.object_type]
DatabaseStore.inTransaction (t) =>
t.find(klass, metadatum.object_id).then (model) ->
return Promise.resolve() unless model
model = model.applyPluginMetadata(metadatum.application_id, metadatum.value)
localMetadatum = model.metadataObjectForPluginId(metadatum.application_id)
localMetadatum.version = metadatum.version
t.persistModel(model)
_handleAccountDeltas: (deltas) =>
for delta in deltas
Actions.updateAccount(delta.account_id, {syncState: delta.sync_state})
if delta.sync_state isnt "running"
Actions.recordUserEvent('Account Sync Errored', {
accountId: delta.account_id
syncState: delta.sync_state
})
_handleDeltaDeletion: (delta) =>
klass = NylasAPI._apiObjectToClassMap[delta.object]
return unless klass
DatabaseStore.inTransaction (t) =>
t.find(klass, delta.id).then (model) ->
return Promise.resolve() unless model
return t.unpersistModel(model)
module.exports = NylasSyncWorkerPool