mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-09-11 15:14:31 +08:00
fix(long-polling): Close streaming connections, wait for changes to stop
Summary: fix(*): Minor onboarding react issue Test Plan: Run existing tests Reviewers: evan Reviewed By: evan Differential Revision: https://review.inboxapp.com/D1208
This commit is contained in:
parent
f97a1bec22
commit
b97f607a6e
3 changed files with 61 additions and 25 deletions
|
@ -125,7 +125,7 @@ ContainerView = React.createClass
|
||||||
<div className="page" key={@state.page}>
|
<div className="page" key={@state.page}>
|
||||||
<div className="check">
|
<div className="check">
|
||||||
<svg preserveAspectRatio="xMidYMid" width="61" height="52" viewBox="0 0 61 52" className="check-icon">
|
<svg preserveAspectRatio="xMidYMid" width="61" height="52" viewBox="0 0 61 52" className="check-icon">
|
||||||
<path d="M56.560,-0.010 C37.498,10.892 26.831,26.198 20.617,33.101 C20.617,33.101 5.398,23.373 5.398,23.373 C5.398,23.373 0.010,29.051 0.010,29.051 C0.010,29.051 24.973,51.981 24.973,51.981 C29.501,41.166 42.502,21.583 60.003,6.565 C60.003,6.565 56.560,-0.010 56.560,-0.010 Z" id="path-1" class="cls-2" fill-rule="evenodd"/>
|
<path d="M56.560,-0.010 C37.498,10.892 26.831,26.198 20.617,33.101 C20.617,33.101 5.398,23.373 5.398,23.373 C5.398,23.373 0.010,29.051 0.010,29.051 C0.010,29.051 24.973,51.981 24.973,51.981 C29.501,41.166 42.502,21.583 60.003,6.565 C60.003,6.565 56.560,-0.010 56.560,-0.010 Z" id="path-1" className="cls-2" fill-rule="evenodd"/>
|
||||||
</svg>
|
</svg>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -17,6 +17,8 @@ class InboxAPI
|
||||||
@
|
@
|
||||||
|
|
||||||
_onConfigChanged: =>
|
_onConfigChanged: =>
|
||||||
|
prev = {@APIToken, @AppID, @APIRoot}
|
||||||
|
|
||||||
@APIToken = atom.config.get('inbox.token')
|
@APIToken = atom.config.get('inbox.token')
|
||||||
env = atom.config.get('inbox.env')
|
env = atom.config.get('inbox.env')
|
||||||
if env in ['production']
|
if env in ['production']
|
||||||
|
@ -26,7 +28,9 @@ class InboxAPI
|
||||||
@AppID = '54miogmnotxuo5st254trcmb9'
|
@AppID = '54miogmnotxuo5st254trcmb9'
|
||||||
@APIRoot = 'https://api-staging.inboxapp.com'
|
@APIRoot = 'https://api-staging.inboxapp.com'
|
||||||
|
|
||||||
if @APIToken && (atom.state.mode == 'editor')
|
current = {@APIToken, @AppID, @APIRoot}
|
||||||
|
|
||||||
|
if atom.state.mode is 'editor' and not _.isEqual(prev, current)
|
||||||
@makeRequest
|
@makeRequest
|
||||||
path: "/n"
|
path: "/n"
|
||||||
returnsModel: true
|
returnsModel: true
|
||||||
|
@ -35,13 +39,21 @@ class InboxAPI
|
||||||
error: =>
|
error: =>
|
||||||
@_startLongPolling()
|
@_startLongPolling()
|
||||||
|
|
||||||
|
_stopLongPolling: ->
|
||||||
|
for namespace, connection of @APILongConnections
|
||||||
|
connection.end()
|
||||||
|
@APILongConnections = {}
|
||||||
|
|
||||||
_startLongPolling: ->
|
_startLongPolling: ->
|
||||||
return unless atom.state.mode == 'editor'
|
return unless atom.state.mode == 'editor'
|
||||||
return if atom.getLoadSettings().isSpec
|
return if atom.getLoadSettings().isSpec
|
||||||
|
|
||||||
DatabaseStore = require './stores/database-store'
|
DatabaseStore = require './stores/database-store'
|
||||||
Namespace = require './models/namespace'
|
Namespace = require './models/namespace'
|
||||||
|
|
||||||
DatabaseStore.findAll(Namespace).then (namespaces) =>
|
DatabaseStore.findAll(Namespace).then (namespaces) =>
|
||||||
|
@_stopLongPolling()
|
||||||
|
|
||||||
namespaces.forEach (namespace) =>
|
namespaces.forEach (namespace) =>
|
||||||
connection = new InboxLongConnection(@, namespace.id)
|
connection = new InboxLongConnection(@, namespace.id)
|
||||||
@APILongConnections[namespace.id] = connection
|
@APILongConnections[namespace.id] = connection
|
||||||
|
@ -54,10 +66,13 @@ class InboxAPI
|
||||||
Actions.longPollStateChanged(state)
|
Actions.longPollStateChanged(state)
|
||||||
if state == InboxLongConnection.State.Connected
|
if state == InboxLongConnection.State.Connected
|
||||||
Actions.restartTaskQueue()
|
Actions.restartTaskQueue()
|
||||||
connection.onDelta (delta) =>
|
|
||||||
@_handleLongPollingChange(namespace.id, delta)
|
connection.onDeltas (deltas) =>
|
||||||
|
@_handleDeltas(namespace.id, deltas)
|
||||||
Actions.restartTaskQueue()
|
Actions.restartTaskQueue()
|
||||||
|
|
||||||
connection.start()
|
connection.start()
|
||||||
|
|
||||||
.catch (error) -> console.error(error)
|
.catch (error) -> console.error(error)
|
||||||
|
|
||||||
# Delegates to node's request object.
|
# Delegates to node's request object.
|
||||||
|
@ -95,22 +110,35 @@ class InboxAPI
|
||||||
@_handleModelResponse(body) if options.returnsModel
|
@_handleModelResponse(body) if options.returnsModel
|
||||||
options.success(body) if options.success
|
options.success(body) if options.success
|
||||||
|
|
||||||
_handleLongPollingChange: (namespaceId, delta) ->
|
_handleDeltas: (namespaceId, deltas) ->
|
||||||
return if delta.object == 'contact'
|
console.log("Processing deltas:")
|
||||||
return if delta.object == 'event'
|
|
||||||
|
|
||||||
@_shouldAcceptModel(delta.object, delta.attributes).then =>
|
# Group deltas by object type so we can mutate our local cache efficiently
|
||||||
if delta.event == 'create'
|
deltasByObject = {}
|
||||||
@_handleModelResponse(delta.attributes)
|
deltasDeletions = []
|
||||||
else if delta.event == 'modify'
|
for delta in deltas
|
||||||
@_handleModelResponse(delta.attributes)
|
if delta.event is 'delete'
|
||||||
else if delta.event == 'delete'
|
deltasDeletions.push(delta)
|
||||||
klass = modelClassMap()[delta.object]
|
else if delta.event is 'create' or delta.event is 'modify'
|
||||||
return unless klass
|
deltasByObject[delta.object] ||= []
|
||||||
DatabaseStore.find(klass, delta.id).then (model) ->
|
deltasByObject[delta.object].push(delta.attributes)
|
||||||
DatabaseStore.unpersistModel(model)
|
|
||||||
.catch (rejectionReason) ->
|
# Remove events and contacts - we don't apply deltas to them
|
||||||
console.log("Delta to #{delta.event} a '#{delta.object}' was ignored. #{rejectionReason}", delta)
|
delete deltasByObject['contact']
|
||||||
|
delete deltasByObject['event']
|
||||||
|
|
||||||
|
# Apply all the create / modfiy events by class
|
||||||
|
for object, items of deltasByObject
|
||||||
|
console.log(" + #{items.length} #{object}")
|
||||||
|
@_handleModelResponse(items)
|
||||||
|
|
||||||
|
# Apply all of the deletions
|
||||||
|
for delta in deltasDeletions
|
||||||
|
console.log(" - 1 #{delta.object} (#{delta.id})")
|
||||||
|
klass = modelClassMap()[delta.object]
|
||||||
|
return unless klass
|
||||||
|
DatabaseStore.find(klass, delta.id).then (model) ->
|
||||||
|
DatabaseStore.unpersistModel(model) if model
|
||||||
|
|
||||||
_defaultErrorCallback: (apiError) ->
|
_defaultErrorCallback: (apiError) ->
|
||||||
console.error("Unhandled Inbox API Error:", apiError.message, apiError)
|
console.error("Unhandled Inbox API Error:", apiError.message, apiError)
|
||||||
|
|
|
@ -19,6 +19,13 @@ class InboxLongConnection
|
||||||
@_req = null
|
@_req = null
|
||||||
@_reqPingInterval = null
|
@_reqPingInterval = null
|
||||||
@_buffer = null
|
@_buffer = null
|
||||||
|
|
||||||
|
@_deltas = []
|
||||||
|
@_flushDeltasDebounced = _.debounce =>
|
||||||
|
@_emitter.emit('deltas-stopped-arriving', @_deltas)
|
||||||
|
@_deltas = []
|
||||||
|
, 1000
|
||||||
|
|
||||||
@
|
@
|
||||||
|
|
||||||
hasCursor: ->
|
hasCursor: ->
|
||||||
|
@ -50,8 +57,8 @@ class InboxLongConnection
|
||||||
onStateChange: (callback) ->
|
onStateChange: (callback) ->
|
||||||
@_emitter.on('state-change', callback)
|
@_emitter.on('state-change', callback)
|
||||||
|
|
||||||
onDelta: (callback) ->
|
onDeltas: (callback) ->
|
||||||
@_emitter.on('delta', callback)
|
@_emitter.on('deltas-stopped-arriving', callback)
|
||||||
|
|
||||||
onProcessBuffer: =>
|
onProcessBuffer: =>
|
||||||
bufferJSONs = @_buffer.split('\n')
|
bufferJSONs = @_buffer.split('\n')
|
||||||
|
@ -66,12 +73,12 @@ class InboxLongConnection
|
||||||
console.log("#{bufferJSONs[i]} could not be parsed as JSON.", e)
|
console.log("#{bufferJSONs[i]} could not be parsed as JSON.", e)
|
||||||
if delta
|
if delta
|
||||||
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
throw (new Error 'Received delta with no cursor!') unless delta.cursor
|
||||||
@_emitter.emit('delta', delta)
|
@_deltas.push(delta)
|
||||||
|
@_flushDeltasDebounced()
|
||||||
bufferCursor = delta.cursor
|
bufferCursor = delta.cursor
|
||||||
|
|
||||||
# Note: setCursor is slow and saves to disk, so we do it once at the end
|
# Note: setCursor is slow and saves to disk, so we do it once at the end
|
||||||
@setCursor(bufferCursor)
|
@setCursor(bufferCursor)
|
||||||
console.log("Long Polling Connection: Processed #{bufferJSONs.length-1} updates")
|
|
||||||
@_buffer = bufferJSONs[bufferJSONs.length - 1]
|
@_buffer = bufferJSONs[bufferJSONs.length - 1]
|
||||||
|
|
||||||
start: ->
|
start: ->
|
||||||
|
@ -93,7 +100,7 @@ class InboxLongConnection
|
||||||
return @retry() unless res.statusCode == 200
|
return @retry() unless res.statusCode == 200
|
||||||
@_buffer = ''
|
@_buffer = ''
|
||||||
res.setEncoding('utf8')
|
res.setEncoding('utf8')
|
||||||
processBufferThrottled = _.throttle(@onProcessBuffer, 500, {leading: false})
|
processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false})
|
||||||
res.on 'close', => @retry()
|
res.on 'close', => @retry()
|
||||||
res.on 'data', (chunk) =>
|
res.on 'data', (chunk) =>
|
||||||
@_buffer += chunk
|
@_buffer += chunk
|
||||||
|
@ -120,7 +127,8 @@ class InboxLongConnection
|
||||||
@start()
|
@start()
|
||||||
, 10000
|
, 10000
|
||||||
|
|
||||||
end: =>
|
end: ->
|
||||||
|
console.log("Long Polling Connection: Closed.")
|
||||||
@setState(InboxLongConnection.State.Idle)
|
@setState(InboxLongConnection.State.Idle)
|
||||||
clearInterval(@_reqPingInterval) if @_reqPingInterval
|
clearInterval(@_reqPingInterval) if @_reqPingInterval
|
||||||
@_reqPingInterval = null
|
@_reqPingInterval = null
|
||||||
|
|
Loading…
Add table
Reference in a new issue