mirror of
https://github.com/Foundry376/Mailspring.git
synced 2024-09-24 17:26:06 +08:00
a3874ce1de
When APIRoot is using https, the port will forced to be 443 even when APIRoot indicates another port. This is because of the line options.port = 443 overwriting the correct options.port extracted by url.parse. This is fixed by removing the problematic line. The default port is still 443 if you look into https.js. In case we want a different default port in the future, we can do options.port = options.port || 443.
167 lines
4.6 KiB
CoffeeScript
167 lines
4.6 KiB
CoffeeScript
{Emitter} = require 'event-kit'
|
|
{DatabaseStore} = require 'nylas-exports'
|
|
url = require 'url'
|
|
_ = require 'underscore'
|
|
|
|
class NylasLongConnection
|
|
|
|
@Status =
|
|
None: 'none'
|
|
Connecting: 'connecting'
|
|
Connected: 'connected'
|
|
Closed: 'closed' # Socket has been closed for any reason
|
|
Ended: 'ended' # We have received 'end()' and will never open again.
|
|
|
|
constructor: (api, accountId, config) ->
|
|
@_api = api
|
|
@_accountId = accountId
|
|
@_config = config
|
|
@_emitter = new Emitter
|
|
@_status = NylasLongConnection.Status.None
|
|
@_req = null
|
|
@_pingTimeout = null
|
|
@_buffer = null
|
|
|
|
@_deltas = []
|
|
@_flushDeltasDebounced = _.debounce =>
|
|
|
|
return if @_deltas.length is 0
|
|
last = @_deltas[@_deltas.length - 1]
|
|
|
|
@_emitter.emit('deltas-stopped-arriving', @_deltas)
|
|
@_config.setCursor(last.cursor)
|
|
@_deltas = []
|
|
|
|
, 1000
|
|
|
|
@
|
|
|
|
accountId: ->
|
|
@_accountId
|
|
|
|
hasCursor: ->
|
|
!!@_config.getCursor()
|
|
|
|
withCursor: (callback) ->
|
|
cursor = @_config.getCursor()
|
|
return callback(cursor) if cursor
|
|
|
|
@_api.makeRequest
|
|
path: "/delta/latest_cursor"
|
|
accountId: @_accountId
|
|
method: 'POST'
|
|
success: ({cursor}) =>
|
|
console.log("Obtained stream cursor #{cursor}.")
|
|
@_config.setCursor(cursor)
|
|
callback(cursor)
|
|
|
|
status: ->
|
|
@status
|
|
|
|
setStatus: (status) ->
|
|
return if @_status is status
|
|
@_status = status
|
|
@_config.setStatus(status)
|
|
|
|
onDeltas: (callback) ->
|
|
@_emitter.on('deltas-stopped-arriving', callback)
|
|
|
|
onProcessBuffer: =>
|
|
bufferJSONs = @_buffer.split('\n')
|
|
|
|
# We can't parse the last block - we don't know whether we've
|
|
# received the entire delta or only part of it. Wait
|
|
# until we have more.
|
|
@_buffer = bufferJSONs.pop()
|
|
|
|
for deltaJSON in bufferJSONs
|
|
continue if deltaJSON.length is 0
|
|
delta = null
|
|
try
|
|
delta = JSON.parse(deltaJSON)
|
|
catch e
|
|
console.log("#{deltaJSON} could not be parsed as JSON.", e)
|
|
if delta
|
|
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
|
@_deltas.push(delta)
|
|
@_flushDeltasDebounced()
|
|
|
|
start: ->
|
|
return unless @_config.ready()
|
|
return unless @_status in [NylasLongConnection.Status.None, NylasLongConnection.Status.Closed]
|
|
|
|
token = @_api.accessTokenForAccountId(@_accountId)
|
|
return if not token?
|
|
return if @_req
|
|
|
|
@withCursor (cursor) =>
|
|
return if @status is NylasLongConnection.Status.Ended
|
|
|
|
options = url.parse("#{@_api.APIRoot}/delta/streaming?cursor=#{cursor}&exclude_folders=false&exclude_metadata=false&exclude_account=false")
|
|
options.auth = "#{token}:"
|
|
|
|
if @_api.APIRoot.indexOf('https') is -1
|
|
lib = require 'http'
|
|
else
|
|
lib = require 'https'
|
|
|
|
@_req = lib.request options, (res) =>
|
|
if res.statusCode isnt 200
|
|
res.on 'data', (chunk) =>
|
|
if chunk.toString().indexOf('Invalid cursor') > 0
|
|
error = new Error('Delta Connection: Cursor is invalid. Need to blow away local cache.')
|
|
NylasEnv.config.unset("nylas.#{@_accountId}.cursor")
|
|
DatabaseStore._handleSetupError(error)
|
|
@close()
|
|
return
|
|
|
|
@_buffer = ''
|
|
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
|
|
res.setEncoding('utf8')
|
|
res.on 'close', => @close()
|
|
res.on 'data', (chunk) =>
|
|
@closeIfDataStops()
|
|
# Ignore redundant newlines sent as pings. Want to avoid
|
|
# calls to @onProcessBuffer that contain no actual updates
|
|
return if chunk is '\n' and (@_buffer.length is 0 or @_buffer[-1] is '\n')
|
|
@_buffer += chunk
|
|
processBufferThrottled()
|
|
|
|
@_req.setTimeout(60*60*1000)
|
|
@_req.setSocketKeepAlive(true)
|
|
@_req.on 'error', => @close()
|
|
@_req.on 'socket', (socket) =>
|
|
@setStatus(NylasLongConnection.Status.Connecting)
|
|
socket.on 'connect', =>
|
|
@setStatus(NylasLongConnection.Status.Connected)
|
|
@closeIfDataStops()
|
|
@_req.end()
|
|
|
|
|
|
close: ->
|
|
return if @_status is NylasLongConnection.Status.Closed
|
|
@setStatus(NylasLongConnection.Status.Closed)
|
|
@cleanup()
|
|
|
|
closeIfDataStops: =>
|
|
clearTimeout(@_pingTimeout) if @_pingTimeout
|
|
@_pingTimeout = setTimeout =>
|
|
@_pingTimeout = null
|
|
@close()
|
|
, 15 * 1000
|
|
|
|
end: ->
|
|
@setStatus(NylasLongConnection.Status.Ended)
|
|
@cleanup()
|
|
|
|
cleanup: ->
|
|
clearInterval(@_pingTimeout) if @_pingTimeout
|
|
@_pingTimeout = null
|
|
@_buffer = ''
|
|
if @_req
|
|
@_req.end()
|
|
@_req.abort()
|
|
@_req = null
|
|
|
|
module.exports = NylasLongConnection
|