mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-01-04 07:10:06 +08:00
18677e51a6
Summary: Previously we were storing sync cursors in config.cson. They were by far the most frequently updated piece of data in config. To make things worse, all the writing was happening in the worker window - the main window was just seeing the changes on disk and reloading. I believe there's an edge case which causes the main window to read the config file when it's mid-write and empty. This causes the accounts array to become empty in the AccountStore and lots of downstream issues. It's also then possible for the main window to write a config change of it's own and empty the file permanently. Test Plan: A few new tests to make sure this is backwards compatible. Reviewers: juan, evan Reviewed By: juan, evan Differential Revision: https://phab.nylas.com/D2642
172 lines
4.7 KiB
CoffeeScript
172 lines
4.7 KiB
CoffeeScript
{Emitter} = require 'event-kit'
|
|
url = require 'url'
|
|
_ = require 'underscore'
|
|
|
|
class NylasLongConnection
|
|
|
|
@State =
|
|
Idle: 'idle'
|
|
Ended: 'ended'
|
|
Connecting: 'connecting'
|
|
Connected: 'connected'
|
|
Retrying: 'retrying'
|
|
|
|
constructor: (api, accountId, config) ->
|
|
@_api = api
|
|
@_accountId = accountId
|
|
@_config = config
|
|
@_emitter = new Emitter
|
|
@_state = 'idle'
|
|
@_req = null
|
|
@_reqForceReconnectInterval = null
|
|
@_buffer = null
|
|
|
|
@_deltas = []
|
|
@_flushDeltasDebounced = _.debounce =>
|
|
|
|
return if @_deltas.length is 0
|
|
last = @_deltas[@_deltas.length - 1]
|
|
|
|
@_emitter.emit('deltas-stopped-arriving', @_deltas)
|
|
@_config.setCursor(last.cursor)
|
|
@_deltas = []
|
|
|
|
, 1000
|
|
|
|
@
|
|
|
|
accountId: ->
|
|
@_accountId
|
|
|
|
hasCursor: ->
|
|
!!@_config.getCursor()
|
|
|
|
withCursor: (callback) ->
|
|
cursor = @_config.getCursor()
|
|
return callback(cursor) if cursor
|
|
|
|
@_api.makeRequest
|
|
path: "/delta/latest_cursor"
|
|
accountId: @_accountId
|
|
method: 'POST'
|
|
success: ({cursor}) =>
|
|
console.log("Obtained stream cursor #{cursor}.")
|
|
@_config.setCursor(cursor)
|
|
callback(cursor)
|
|
|
|
state: ->
|
|
@state
|
|
|
|
setState: (state) ->
|
|
@_state = state
|
|
@_emitter.emit('state-change', state)
|
|
|
|
onStateChange: (callback) ->
|
|
@_emitter.on('state-change', callback)
|
|
|
|
onDeltas: (callback) ->
|
|
@_emitter.on('deltas-stopped-arriving', callback)
|
|
|
|
onProcessBuffer: =>
|
|
bufferJSONs = @_buffer.split('\n')
|
|
|
|
# We can't parse the last block - we don't know whether we've
|
|
# received the entire delta or only part of it. Wait
|
|
# until we have more.
|
|
@_buffer = bufferJSONs.pop()
|
|
|
|
for deltaJSON in bufferJSONs
|
|
continue if deltaJSON.length is 0
|
|
delta = null
|
|
try
|
|
delta = JSON.parse(deltaJSON)
|
|
catch e
|
|
console.log("#{deltaJSON} could not be parsed as JSON.", e)
|
|
if delta
|
|
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
|
@_deltas.push(delta)
|
|
@_flushDeltasDebounced()
|
|
|
|
start: ->
|
|
return unless @_config.ready()
|
|
|
|
token = @_api.accessTokenForAccountId(@_accountId)
|
|
return if not token?
|
|
return if @_state is NylasLongConnection.State.Ended
|
|
return if @_req
|
|
|
|
@withCursor (cursor) =>
|
|
return if @state is NylasLongConnection.State.Ended
|
|
console.log("Delta Connection: Starting for account #{@_accountId}, token #{token}, with cursor #{cursor}")
|
|
options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_folders=false&exclude_metadata=false")
|
|
options.auth = "#{token}:"
|
|
|
|
if @_api.APIRoot.indexOf('https') is -1
|
|
lib = require 'http'
|
|
else
|
|
options.port = 443
|
|
lib = require 'https'
|
|
|
|
req = lib.request options, (res) =>
|
|
if res.statusCode isnt 200
|
|
res.on 'data', (chunk) =>
|
|
if chunk.toString().indexOf('Invalid cursor') > 0
|
|
console.log('Delta Connection: Cursor is invalid. Need to blow away local cache.')
|
|
# TODO THIS!
|
|
else
|
|
@retry()
|
|
return
|
|
|
|
@_buffer = ''
|
|
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
|
|
res.setEncoding('utf8')
|
|
res.on 'close', => @retry()
|
|
res.on 'data', (chunk) =>
|
|
# Ignore redundant newlines sent as pings. Want to avoid
|
|
# calls to @onProcessBuffer that contain no actual updates
|
|
return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n')
|
|
@_buffer += chunk
|
|
processBufferThrottled()
|
|
|
|
req.setTimeout(60*60*1000)
|
|
req.setSocketKeepAlive(true)
|
|
req.on 'error', => @retry()
|
|
req.on 'socket', (socket) =>
|
|
@setState(NylasLongConnection.State.Connecting)
|
|
socket.on 'connect', =>
|
|
@setState(NylasLongConnection.State.Connected)
|
|
req.write("1")
|
|
|
|
@_req = req
|
|
|
|
# Currently we have trouble identifying when the connection has closed.
|
|
# Instead of trying to fix that, just reconnect every 120 seconds.
|
|
@_reqForceReconnectInterval = setInterval =>
|
|
@retry(true)
|
|
,30000
|
|
|
|
retry: (immediate = false) ->
|
|
return if @_state is NylasLongConnection.State.Ended
|
|
@setState(NylasLongConnection.State.Retrying)
|
|
@cleanup()
|
|
|
|
startDelay = if immediate then 0 else 10000
|
|
setTimeout =>
|
|
@start()
|
|
, startDelay
|
|
|
|
end: ->
|
|
console.log("Delta Connection: Closed.")
|
|
@setState(NylasLongConnection.State.Ended)
|
|
@cleanup()
|
|
|
|
cleanup: ->
|
|
clearInterval(@_reqForceReconnectInterval) if @_reqForceReconnectInterval
|
|
@_reqForceReconnectInterval = null
|
|
@_buffer = ''
|
|
if @_req
|
|
@_req.end()
|
|
@_req.abort()
|
|
@_req = null
|
|
|
|
module.exports = NylasLongConnection
|