Mailspring/internal_packages/worker-sync/lib/nylas-long-connection.coffee
Drew Regitsky 327eb43932 fix(sync-errors): Handle account deltas indicating sync issues
Summary:
Changes the delta code to handle new deltas on the Account object,
which are triggered by changes in sync state indicating various backend
issues. Saves the sync state in a new field on the Account object, which
is persisited in `config.cson`.

Includes several UI changes to display more information when an account has
backend sync issues. Adds better messages and new actions the user can take
based on the type of sync issue.

Additionally, fixes bug in action bridge that was preventing multi-arg global
actions from working.

Test Plan:
Manual, by testing different sync state values and triggering deltas from the
backend

Reviewers: juan, evan, bengotow

Reviewed By: evan, bengotow

Subscribers: khamidou

Differential Revision: https://phab.nylas.com/D2696
2016-03-08 16:06:04 -08:00

173 lines
4.7 KiB
CoffeeScript

{Emitter} = require 'event-kit'
url = require 'url'
_ = require 'underscore'
class NylasLongConnection
@State =
Idle: 'idle'
Ended: 'ended'
Connecting: 'connecting'
Connected: 'connected'
Retrying: 'retrying'
constructor: (api, accountId, config) ->
@_api = api
@_accountId = accountId
@_config = config
@_emitter = new Emitter
@_state = 'idle'
@_req = null
@_reqForceReconnectInterval = null
@_buffer = null
@_deltas = []
@_flushDeltasDebounced = _.debounce =>
return if @_deltas.length is 0
last = @_deltas[@_deltas.length - 1]
@_emitter.emit('deltas-stopped-arriving', @_deltas)
@_config.setCursor(last.cursor)
@_deltas = []
, 1000
@
accountId: ->
@_accountId
hasCursor: ->
!!@_config.getCursor()
withCursor: (callback) ->
cursor = @_config.getCursor()
return callback(cursor) if cursor
@_api.makeRequest
path: "/delta/latest_cursor"
accountId: @_accountId
method: 'POST'
success: ({cursor}) =>
console.log("Obtained stream cursor #{cursor}.")
@_config.setCursor(cursor)
callback(cursor)
state: ->
@state
setState: (state) ->
@_state = state
@_emitter.emit('state-change', state)
onStateChange: (callback) ->
@_emitter.on('state-change', callback)
onDeltas: (callback) ->
@_emitter.on('deltas-stopped-arriving', callback)
onProcessBuffer: =>
bufferJSONs = @_buffer.split('\n')
# We can't parse the last block - we don't know whether we've
# received the entire delta or only part of it. Wait
# until we have more.
@_buffer = bufferJSONs.pop()
for deltaJSON in bufferJSONs
continue if deltaJSON.length is 0
delta = null
try
delta = JSON.parse(deltaJSON)
catch e
console.log("#{deltaJSON} could not be parsed as JSON.", e)
if delta
throw (new Error 'Received delta with no cursor!') unless delta.cursor
@_deltas.push(delta)
@_flushDeltasDebounced()
start: ->
return unless @_config.ready()
token = @_api.accessTokenForAccountId(@_accountId)
return if not token?
return if @_state is NylasLongConnection.State.Ended
return if @_req
@withCursor (cursor) =>
return if @state is NylasLongConnection.State.Ended
console.log("Delta Connection: Starting for account #{@_accountId}, token #{token}, with cursor #{cursor}")
options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_folders=false&exclude_metadata=false&exclude_account=false")
options.auth = "#{token}:"
if @_api.APIRoot.indexOf('https') is -1
lib = require 'http'
else
options.port = 443
lib = require 'https'
req = lib.request options, (res) =>
if res.statusCode isnt 200
res.on 'data', (chunk) =>
if chunk.toString().indexOf('Invalid cursor') > 0
console.log('Delta Connection: Cursor is invalid. Need to blow away local cache.')
# TODO THIS!
else
@retry()
return
@_buffer = ''
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
res.setEncoding('utf8')
res.on 'close', => @retry()
res.on 'data', (chunk) =>
# Ignore redundant newlines sent as pings. Want to avoid
# calls to @onProcessBuffer that contain no actual updates
return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n')
@_buffer += chunk
processBufferThrottled()
req.setTimeout(60*60*1000)
req.setSocketKeepAlive(true)
req.on 'error', => @retry()
req.on 'socket', (socket) =>
@setState(NylasLongConnection.State.Connecting)
socket.on 'connect', =>
@setState(NylasLongConnection.State.Connected)
req.write("1")
@_req = req
# Currently we have trouble identifying when the connection has closed.
# Instead of trying to fix that, just reconnect every 120 seconds.
@_reqForceReconnectInterval = setInterval =>
@retry(true)
,30000
retry: (immediate = false) ->
return if @_state is NylasLongConnection.State.Ended
@setState(NylasLongConnection.State.Retrying)
@cleanup()
startDelay = if immediate then 0 else 10000
setTimeout =>
@start()
, startDelay
end: ->
console.log("Delta Connection: Closed.")
@setState(NylasLongConnection.State.Ended)
@cleanup()
cleanup: ->
clearInterval(@_reqForceReconnectInterval) if @_reqForceReconnectInterval
@_reqForceReconnectInterval = null
@_buffer = ''
if @_req
@_req.end()
@_req.abort()
@_req = null
module.exports = NylasLongConnection