{Emitter} = require 'event-kit' url = require 'url' _ = require 'underscore' class NylasLongConnection @State = Idle: 'idle' Ended: 'ended' Connecting: 'connecting' Connected: 'connected' Retrying: 'retrying' constructor: (api, accountId) -> @_api = api @_accountId = accountId @_cursorKey = "nylas.#{@_accountId}.cursor" @_emitter = new Emitter @_state = 'idle' @_req = null @_reqForceReconnectInterval = null @_buffer = null @_deltas = [] @_flushDeltasDebounced = _.debounce => @_emitter.emit('deltas-stopped-arriving', @_deltas) @_deltas = [] , 1000 @ accountId: -> @_accountId hasCursor: -> !!atom.config.get(@_cursorKey) withCursor: (callback) -> cursor = atom.config.get(@_cursorKey) return callback(cursor) if cursor stamp = Math.round(new Date().getTime() / 1000.0) @_api.makeRequest path: "/delta/generate_cursor" accountId: @_accountId method: 'POST' body: { "start": stamp } success: (json) => @setCursor(json['cursor']) callback(json['cursor']) console.log("Retrieved cursor #{json['cursor']} from \ `generate_cursor` with timestamp: #{stamp}") setCursor: (cursor) -> atom.config.set(@_cursorKey, cursor) state: -> @state setState: (state) -> @_state = state @_emitter.emit('state-change', state) onStateChange: (callback) -> @_emitter.on('state-change', callback) onDeltas: (callback) -> @_emitter.on('deltas-stopped-arriving', callback) onProcessBuffer: => bufferJSONs = @_buffer.split('\n') bufferCursor = null # We can't parse the last block - we don't know whether we've # received the entire delta or only part of it. Wait # until we have more. @_buffer = bufferJSONs.pop() for deltaJSON in bufferJSONs continue if deltaJSON.length is 0 delta = null try delta = JSON.parse(deltaJSON) catch e console.log("#{deltaJSON} could not be parsed as JSON.", e) if delta throw (new Error 'Received delta with no cursor!') unless delta.cursor @_deltas.push(delta) @_flushDeltasDebounced() bufferCursor = delta.cursor # Note: setCursor is slow and saves to disk, so we do it once at the end if bufferCursor @setCursor(bufferCursor) start: -> token = @_api.accessTokenForAccountId(@_accountId) return if not token? return if @_state is NylasLongConnection.State.Ended return if @_req @withCursor (cursor) => return if @state is NylasLongConnection.State.Ended console.log("Long Polling Connection: Starting for account #{@_accountId}, token #{token}, with cursor #{cursor}") options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_types=event&exclude_folders=false") options.auth = "#{token}:" if @_api.APIRoot.indexOf('https') is -1 lib = require 'http' else options.port = 443 lib = require 'https' req = lib.request options, (res) => if res.statusCode isnt 200 res.on 'data', (chunk) => if chunk.toString().indexOf('Invalid cursor') > 0 console.log('Long Polling Connection: Cursor is invalid. Need to blow away local cache.') # TODO THIS! else @retry() return @_buffer = '' processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false}) res.setEncoding('utf8') res.on 'close', => @retry() res.on 'data', (chunk) => # Ignore redundant newlines sent as pings. Want to avoid # calls to @onProcessBuffer that contain no actual updates return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n') @_buffer += chunk processBufferThrottled() req.setTimeout(60*60*1000) req.setSocketKeepAlive(true) req.on 'error', => @retry() req.on 'socket', (socket) => @setState(NylasLongConnection.State.Connecting) socket.on 'connect', => @setState(NylasLongConnection.State.Connected) req.write("1") @_req = req # Currently we have trouble identifying when the connection has closed. # Instead of trying to fix that, just reconnect every 120 seconds. @_reqForceReconnectInterval = setInterval => @retry(true) ,30000 retry: (immediate = false) -> return if @_state is NylasLongConnection.State.Ended @setState(NylasLongConnection.State.Retrying) @cleanup() startDelay = if immediate then 0 else 10000 setTimeout => @start() , startDelay end: -> console.log("Long Polling Connection: Closed.") @setState(NylasLongConnection.State.Ended) @cleanup() cleanup: -> clearInterval(@_reqForceReconnectInterval) if @_reqForceReconnectInterval @_reqForceReconnectInterval = null @_buffer = '' if @_req @_req.end() @_req.abort() @_req = null module.exports = NylasLongConnection