mirror of
https://github.com/Foundry376/Mailspring.git
synced 2024-11-12 04:25:31 +08:00
327eb43932
Summary: Changes the delta code to handle new deltas on the Account object, which are triggered by changes in sync state indicating various backend issues. Saves the sync state in a new field on the Account object, which is persisited in `config.cson`. Includes several UI changes to display more information when an account has backend sync issues. Adds better messages and new actions the user can take based on the type of sync issue. Additionally, fixes bug in action bridge that was preventing multi-arg global actions from working. Test Plan: Manual, by testing different sync state values and triggering deltas from the backend Reviewers: juan, evan, bengotow Reviewed By: evan, bengotow Subscribers: khamidou Differential Revision: https://phab.nylas.com/D2696
172 lines
4.7 KiB
CoffeeScript
172 lines
4.7 KiB
CoffeeScript
{Emitter} = require 'event-kit'
|
|
url = require 'url'
|
|
_ = require 'underscore'
|
|
|
|
class NylasLongConnection
|
|
|
|
@State =
|
|
Idle: 'idle'
|
|
Ended: 'ended'
|
|
Connecting: 'connecting'
|
|
Connected: 'connected'
|
|
Retrying: 'retrying'
|
|
|
|
constructor: (api, accountId, config) ->
|
|
@_api = api
|
|
@_accountId = accountId
|
|
@_config = config
|
|
@_emitter = new Emitter
|
|
@_state = 'idle'
|
|
@_req = null
|
|
@_reqForceReconnectInterval = null
|
|
@_buffer = null
|
|
|
|
@_deltas = []
|
|
@_flushDeltasDebounced = _.debounce =>
|
|
|
|
return if @_deltas.length is 0
|
|
last = @_deltas[@_deltas.length - 1]
|
|
|
|
@_emitter.emit('deltas-stopped-arriving', @_deltas)
|
|
@_config.setCursor(last.cursor)
|
|
@_deltas = []
|
|
|
|
, 1000
|
|
|
|
@
|
|
|
|
accountId: ->
|
|
@_accountId
|
|
|
|
hasCursor: ->
|
|
!!@_config.getCursor()
|
|
|
|
withCursor: (callback) ->
|
|
cursor = @_config.getCursor()
|
|
return callback(cursor) if cursor
|
|
|
|
@_api.makeRequest
|
|
path: "/delta/latest_cursor"
|
|
accountId: @_accountId
|
|
method: 'POST'
|
|
success: ({cursor}) =>
|
|
console.log("Obtained stream cursor #{cursor}.")
|
|
@_config.setCursor(cursor)
|
|
callback(cursor)
|
|
|
|
state: ->
|
|
@state
|
|
|
|
setState: (state) ->
|
|
@_state = state
|
|
@_emitter.emit('state-change', state)
|
|
|
|
onStateChange: (callback) ->
|
|
@_emitter.on('state-change', callback)
|
|
|
|
onDeltas: (callback) ->
|
|
@_emitter.on('deltas-stopped-arriving', callback)
|
|
|
|
onProcessBuffer: =>
|
|
bufferJSONs = @_buffer.split('\n')
|
|
|
|
# We can't parse the last block - we don't know whether we've
|
|
# received the entire delta or only part of it. Wait
|
|
# until we have more.
|
|
@_buffer = bufferJSONs.pop()
|
|
|
|
for deltaJSON in bufferJSONs
|
|
continue if deltaJSON.length is 0
|
|
delta = null
|
|
try
|
|
delta = JSON.parse(deltaJSON)
|
|
catch e
|
|
console.log("#{deltaJSON} could not be parsed as JSON.", e)
|
|
if delta
|
|
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
|
@_deltas.push(delta)
|
|
@_flushDeltasDebounced()
|
|
|
|
start: ->
|
|
return unless @_config.ready()
|
|
|
|
token = @_api.accessTokenForAccountId(@_accountId)
|
|
return if not token?
|
|
return if @_state is NylasLongConnection.State.Ended
|
|
return if @_req
|
|
|
|
@withCursor (cursor) =>
|
|
return if @state is NylasLongConnection.State.Ended
|
|
console.log("Delta Connection: Starting for account #{@_accountId}, token #{token}, with cursor #{cursor}")
|
|
options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_folders=false&exclude_metadata=false&exclude_account=false")
|
|
options.auth = "#{token}:"
|
|
|
|
if @_api.APIRoot.indexOf('https') is -1
|
|
lib = require 'http'
|
|
else
|
|
options.port = 443
|
|
lib = require 'https'
|
|
|
|
req = lib.request options, (res) =>
|
|
if res.statusCode isnt 200
|
|
res.on 'data', (chunk) =>
|
|
if chunk.toString().indexOf('Invalid cursor') > 0
|
|
console.log('Delta Connection: Cursor is invalid. Need to blow away local cache.')
|
|
# TODO THIS!
|
|
else
|
|
@retry()
|
|
return
|
|
|
|
@_buffer = ''
|
|
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
|
|
res.setEncoding('utf8')
|
|
res.on 'close', => @retry()
|
|
res.on 'data', (chunk) =>
|
|
# Ignore redundant newlines sent as pings. Want to avoid
|
|
# calls to @onProcessBuffer that contain no actual updates
|
|
return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n')
|
|
@_buffer += chunk
|
|
processBufferThrottled()
|
|
|
|
req.setTimeout(60*60*1000)
|
|
req.setSocketKeepAlive(true)
|
|
req.on 'error', => @retry()
|
|
req.on 'socket', (socket) =>
|
|
@setState(NylasLongConnection.State.Connecting)
|
|
socket.on 'connect', =>
|
|
@setState(NylasLongConnection.State.Connected)
|
|
req.write("1")
|
|
|
|
@_req = req
|
|
|
|
# Currently we have trouble identifying when the connection has closed.
|
|
# Instead of trying to fix that, just reconnect every 120 seconds.
|
|
@_reqForceReconnectInterval = setInterval =>
|
|
@retry(true)
|
|
,30000
|
|
|
|
retry: (immediate = false) ->
|
|
return if @_state is NylasLongConnection.State.Ended
|
|
@setState(NylasLongConnection.State.Retrying)
|
|
@cleanup()
|
|
|
|
startDelay = if immediate then 0 else 10000
|
|
setTimeout =>
|
|
@start()
|
|
, startDelay
|
|
|
|
end: ->
|
|
console.log("Delta Connection: Closed.")
|
|
@setState(NylasLongConnection.State.Ended)
|
|
@cleanup()
|
|
|
|
cleanup: ->
|
|
clearInterval(@_reqForceReconnectInterval) if @_reqForceReconnectInterval
|
|
@_reqForceReconnectInterval = null
|
|
@_buffer = ''
|
|
if @_req
|
|
@_req.end()
|
|
@_req.abort()
|
|
@_req = null
|
|
|
|
module.exports = NylasLongConnection
|