2016-05-16 14:25:30 +08:00
|
|
|
{Emitter} = require 'event-kit'
|
2016-07-14 13:02:48 +08:00
|
|
|
{IdentityStore, DatabaseStore} = require 'nylas-exports'
|
2016-05-16 14:25:30 +08:00
|
|
|
url = require 'url'
|
|
|
|
_ = require 'underscore'
|
|
|
|
|
|
|
|
class NylasLongConnection
|
|
|
|
|
|
|
|
@Status =
|
|
|
|
None: 'none'
|
|
|
|
Connecting: 'connecting'
|
|
|
|
Connected: 'connected'
|
|
|
|
Closed: 'closed' # Socket has been closed for any reason
|
|
|
|
Ended: 'ended' # We have received 'end()' and will never open again.
|
|
|
|
|
|
|
|
constructor: (api, accountId, config) ->
|
|
|
|
@_api = api
|
|
|
|
@_accountId = accountId
|
|
|
|
@_config = config
|
|
|
|
@_emitter = new Emitter
|
|
|
|
@_status = NylasLongConnection.Status.None
|
|
|
|
@_req = null
|
|
|
|
@_pingTimeout = null
|
|
|
|
@_buffer = null
|
|
|
|
|
|
|
|
@_deltas = []
|
|
|
|
@_flushDeltasDebounced = _.debounce =>
|
|
|
|
|
|
|
|
return if @_deltas.length is 0
|
|
|
|
last = @_deltas[@_deltas.length - 1]
|
|
|
|
|
|
|
|
@_emitter.emit('deltas-stopped-arriving', @_deltas)
|
|
|
|
@_config.setCursor(last.cursor)
|
|
|
|
@_deltas = []
|
|
|
|
|
|
|
|
, 1000
|
|
|
|
|
|
|
|
@
|
|
|
|
|
|
|
|
accountId: ->
|
|
|
|
@_accountId
|
|
|
|
|
|
|
|
hasCursor: ->
|
|
|
|
!!@_config.getCursor()
|
|
|
|
|
|
|
|
withCursor: (callback) ->
|
|
|
|
cursor = @_config.getCursor()
|
|
|
|
return callback(cursor) if cursor
|
|
|
|
|
|
|
|
@_api.makeRequest
|
|
|
|
path: "/delta/latest_cursor"
|
|
|
|
accountId: @_accountId
|
|
|
|
method: 'POST'
|
|
|
|
success: ({cursor}) =>
|
|
|
|
console.log("Obtained stream cursor #{cursor}.")
|
|
|
|
@_config.setCursor(cursor)
|
|
|
|
callback(cursor)
|
|
|
|
|
|
|
|
status: ->
|
|
|
|
@status
|
|
|
|
|
|
|
|
setStatus: (status) ->
|
|
|
|
return if @_status is status
|
|
|
|
@_status = status
|
|
|
|
@_config.setStatus(status)
|
|
|
|
|
|
|
|
onDeltas: (callback) ->
|
|
|
|
@_emitter.on('deltas-stopped-arriving', callback)
|
|
|
|
|
|
|
|
onProcessBuffer: =>
|
|
|
|
bufferJSONs = @_buffer.split('\n')
|
|
|
|
|
|
|
|
# We can't parse the last block - we don't know whether we've
|
|
|
|
# received the entire delta or only part of it. Wait
|
|
|
|
# until we have more.
|
|
|
|
@_buffer = bufferJSONs.pop()
|
|
|
|
|
|
|
|
for deltaJSON in bufferJSONs
|
|
|
|
continue if deltaJSON.length is 0
|
|
|
|
delta = null
|
|
|
|
try
|
|
|
|
delta = JSON.parse(deltaJSON)
|
|
|
|
catch e
|
|
|
|
console.log("#{deltaJSON} could not be parsed as JSON.", e)
|
|
|
|
if delta
|
|
|
|
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
|
|
|
@_deltas.push(delta)
|
|
|
|
@_flushDeltasDebounced()
|
|
|
|
|
|
|
|
start: ->
|
|
|
|
return unless @_config.ready()
|
|
|
|
return unless @_status in [NylasLongConnection.Status.None, NylasLongConnection.Status.Closed]
|
|
|
|
|
|
|
|
token = @_api.accessTokenForAccountId(@_accountId)
|
|
|
|
return if not token?
|
|
|
|
return if @_req
|
|
|
|
|
|
|
|
@withCursor (cursor) =>
|
|
|
|
return if @status is NylasLongConnection.Status.Ended
|
|
|
|
|
|
|
|
options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_folders=false&exclude_metadata=false&exclude_account=false")
|
2016-07-14 13:02:48 +08:00
|
|
|
options.auth = "#{token}:#{IdentityStore.identity().token}"
|
2016-05-16 14:25:30 +08:00
|
|
|
|
|
|
|
if @_api.APIRoot.indexOf('https') is -1
|
|
|
|
lib = require 'http'
|
|
|
|
else
|
|
|
|
lib = require 'https'
|
|
|
|
|
|
|
|
@_req = lib.request options, (res) =>
|
|
|
|
if res.statusCode isnt 200
|
|
|
|
res.on 'data', (chunk) =>
|
|
|
|
if chunk.toString().indexOf('Invalid cursor') > 0
|
|
|
|
error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.')
|
2016-05-26 02:03:38 +08:00
|
|
|
NylasEnv.config.unset("nylas.#{@_accountId}.cursor")
|
2016-05-16 14:25:30 +08:00
|
|
|
DatabaseStore._handleSetupError(error)
|
|
|
|
@close()
|
|
|
|
return
|
|
|
|
|
|
|
|
@_buffer = ''
|
|
|
|
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
|
|
|
|
res.setEncoding('utf8')
|
|
|
|
res.on 'close', => @close()
|
|
|
|
res.on 'data', (chunk) =>
|
|
|
|
@closeIfDataStops()
|
|
|
|
# Ignore redundant newlines sent as pings. Want to avoid
|
|
|
|
# calls to @onProcessBuffer that contain no actual updates
|
|
|
|
return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n')
|
|
|
|
@_buffer += chunk
|
|
|
|
processBufferThrottled()
|
|
|
|
|
|
|
|
@_req.setTimeout(60*60*1000)
|
|
|
|
@_req.setSocketKeepAlive(true)
|
|
|
|
@_req.on 'error', => @close()
|
|
|
|
@_req.on 'socket', (socket) =>
|
|
|
|
@setStatus(NylasLongConnection.Status.Connecting)
|
|
|
|
socket.on 'connect', =>
|
|
|
|
@setStatus(NylasLongConnection.Status.Connected)
|
|
|
|
@closeIfDataStops()
|
|
|
|
@_req.end()
|
|
|
|
|
|
|
|
|
|
|
|
close: ->
|
|
|
|
return if @_status is NylasLongConnection.Status.Closed
|
|
|
|
@setStatus(NylasLongConnection.Status.Closed)
|
|
|
|
@cleanup()
|
|
|
|
|
|
|
|
closeIfDataStops: =>
|
|
|
|
clearTimeout(@_pingTimeout) if @_pingTimeout
|
|
|
|
@_pingTimeout = setTimeout =>
|
|
|
|
@_pingTimeout = null
|
|
|
|
@close()
|
|
|
|
, 15 * 1000
|
|
|
|
|
|
|
|
end: ->
|
|
|
|
@setStatus(NylasLongConnection.Status.Ended)
|
|
|
|
@cleanup()
|
|
|
|
|
|
|
|
cleanup: ->
|
|
|
|
clearInterval(@_pingTimeout) if @_pingTimeout
|
|
|
|
@_pingTimeout = null
|
|
|
|
@_buffer = ''
|
|
|
|
if @_req
|
|
|
|
@_req.end()
|
|
|
|
@_req.abort()
|
|
|
|
@_req = null
|
|
|
|
|
|
|
|
module.exports = NylasLongConnection
|