From b97f607a6e483e2f39abe86fc00e93c3c197f87d Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Wed, 18 Feb 2015 13:59:58 -0800 Subject: [PATCH] fix(long-polling): Close streaming connections, wait for changes to stop Summary: fix(*): Minor onboarding react issue Test Plan: Run existing tests Reviewers: evan Reviewed By: evan Differential Revision: https://review.inboxapp.com/D1208 --- .../onboarding/lib/container-view.cjsx | 2 +- src/flux/inbox-api.coffee | 64 +++++++++++++------ src/flux/inbox-long-connection.coffee | 20 ++++-- 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/internal_packages/onboarding/lib/container-view.cjsx b/internal_packages/onboarding/lib/container-view.cjsx index aef17e3b0..6239a5262 100644 --- a/internal_packages/onboarding/lib/container-view.cjsx +++ b/internal_packages/onboarding/lib/container-view.cjsx @@ -125,7 +125,7 @@ ContainerView = React.createClass
- +
diff --git a/src/flux/inbox-api.coffee b/src/flux/inbox-api.coffee index 08965bc5e..e6514230c 100644 --- a/src/flux/inbox-api.coffee +++ b/src/flux/inbox-api.coffee @@ -17,6 +17,8 @@ class InboxAPI @ _onConfigChanged: => + prev = {@APIToken, @AppID, @APIRoot} + @APIToken = atom.config.get('inbox.token') env = atom.config.get('inbox.env') if env in ['production'] @@ -26,7 +28,9 @@ class InboxAPI @AppID = '54miogmnotxuo5st254trcmb9' @APIRoot = 'https://api-staging.inboxapp.com' - if @APIToken && (atom.state.mode == 'editor') + current = {@APIToken, @AppID, @APIRoot} + + if atom.state.mode is 'editor' and not _.isEqual(prev, current) @makeRequest path: "/n" returnsModel: true @@ -35,13 +39,21 @@ class InboxAPI error: => @_startLongPolling() + _stopLongPolling: -> + for namespace, connection of @APILongConnections + connection.end() + @APILongConnections = {} + _startLongPolling: -> return unless atom.state.mode == 'editor' return if atom.getLoadSettings().isSpec DatabaseStore = require './stores/database-store' Namespace = require './models/namespace' + DatabaseStore.findAll(Namespace).then (namespaces) => + @_stopLongPolling() + namespaces.forEach (namespace) => connection = new InboxLongConnection(@, namespace.id) @APILongConnections[namespace.id] = connection @@ -54,10 +66,13 @@ class InboxAPI Actions.longPollStateChanged(state) if state == InboxLongConnection.State.Connected Actions.restartTaskQueue() - connection.onDelta (delta) => - @_handleLongPollingChange(namespace.id, delta) + + connection.onDeltas (deltas) => + @_handleDeltas(namespace.id, deltas) Actions.restartTaskQueue() + connection.start() + .catch (error) -> console.error(error) # Delegates to node's request object. @@ -95,22 +110,35 @@ class InboxAPI @_handleModelResponse(body) if options.returnsModel options.success(body) if options.success - _handleLongPollingChange: (namespaceId, delta) -> - return if delta.object == 'contact' - return if delta.object == 'event' + _handleDeltas: (namespaceId, deltas) -> + console.log("Processing deltas:") - @_shouldAcceptModel(delta.object, delta.attributes).then => - if delta.event == 'create' - @_handleModelResponse(delta.attributes) - else if delta.event == 'modify' - @_handleModelResponse(delta.attributes) - else if delta.event == 'delete' - klass = modelClassMap()[delta.object] - return unless klass - DatabaseStore.find(klass, delta.id).then (model) -> - DatabaseStore.unpersistModel(model) - .catch (rejectionReason) -> - console.log("Delta to #{delta.event} a '#{delta.object}' was ignored. #{rejectionReason}", delta) + # Group deltas by object type so we can mutate our local cache efficiently + deltasByObject = {} + deltasDeletions = [] + for delta in deltas + if delta.event is 'delete' + deltasDeletions.push(delta) + else if delta.event is 'create' or delta.event is 'modify' + deltasByObject[delta.object] ||= [] + deltasByObject[delta.object].push(delta.attributes) + + # Remove events and contacts - we don't apply deltas to them + delete deltasByObject['contact'] + delete deltasByObject['event'] + + # Apply all the create / modfiy events by class + for object, items of deltasByObject + console.log(" + #{items.length} #{object}") + @_handleModelResponse(items) + + # Apply all of the deletions + for delta in deltasDeletions + console.log(" - 1 #{delta.object} (#{delta.id})") + klass = modelClassMap()[delta.object] + return unless klass + DatabaseStore.find(klass, delta.id).then (model) -> + DatabaseStore.unpersistModel(model) if model _defaultErrorCallback: (apiError) -> console.error("Unhandled Inbox API Error:", apiError.message, apiError) diff --git a/src/flux/inbox-long-connection.coffee b/src/flux/inbox-long-connection.coffee index 6dc94e245..d02135d3f 100644 --- a/src/flux/inbox-long-connection.coffee +++ b/src/flux/inbox-long-connection.coffee @@ -19,6 +19,13 @@ class InboxLongConnection @_req = null @_reqPingInterval = null @_buffer = null + + @_deltas = [] + @_flushDeltasDebounced = _.debounce => + @_emitter.emit('deltas-stopped-arriving', @_deltas) + @_deltas = [] + , 1000 + @ hasCursor: -> @@ -50,8 +57,8 @@ class InboxLongConnection onStateChange: (callback) -> @_emitter.on('state-change', callback) - onDelta: (callback) -> - @_emitter.on('delta', callback) + onDeltas: (callback) -> + @_emitter.on('deltas-stopped-arriving', callback) onProcessBuffer: => bufferJSONs = @_buffer.split('\n') @@ -66,12 +73,12 @@ class InboxLongConnection console.log("#{bufferJSONs[i]} could not be parsed as JSON.", e) if delta throw (new Error 'Received delta with no cursor!') unless delta.cursor - @_emitter.emit('delta', delta) + @_deltas.push(delta) + @_flushDeltasDebounced() bufferCursor = delta.cursor # Note: setCursor is slow and saves to disk, so we do it once at the end @setCursor(bufferCursor) - console.log("Long Polling Connection: Processed #{bufferJSONs.length-1} updates") @_buffer = bufferJSONs[bufferJSONs.length - 1] start: -> @@ -93,7 +100,7 @@ class InboxLongConnection return @retry() unless res.statusCode == 200 @_buffer = '' res.setEncoding('utf8') - processBufferThrottled = _.throttle(@onProcessBuffer, 500, {leading: false}) + processBufferThrottled = _.throttle(@onProcessBuffer, 400, {leading: false}) res.on 'close', => @retry() res.on 'data', (chunk) => @_buffer += chunk @@ -120,7 +127,8 @@ class InboxLongConnection @start() , 10000 - end: => + end: -> + console.log("Long Polling Connection: Closed.") @setState(InboxLongConnection.State.Idle) clearInterval(@_reqPingInterval) if @_reqPingInterval @_reqPingInterval = null