diff --git a/spec/models/query-subscription-spec.coffee b/spec/models/query-subscription-spec.coffee index 9c5443cf6..98677b193 100644 --- a/spec/models/query-subscription-spec.coffee +++ b/spec/models/query-subscription-spec.coffee @@ -79,7 +79,8 @@ describe "QuerySubscription", -> new Thread(accountId: 'a', clientId: '2', lastMessageReceivedTimestamp: 2), new Thread(accountId: 'a', clientId: '1', lastMessageReceivedTimestamp: 1), ] - mustUpdate: true + mustUpdate: false + mustTrigger: true mustRefetchAllIds: false },{ name: 'Item in set saved - new sort value' @@ -93,7 +94,8 @@ describe "QuerySubscription", -> new Thread(accountId: 'a', clientId: '3', lastMessageReceivedTimestamp: 3), new Thread(accountId: 'a', clientId: '2', lastMessageReceivedTimestamp: 2), ] - mustUpdate: true + mustUpdate: false + mustTrigger: true mustRefetchAllIds: true },{ name: 'Item saved - does not match query clauses, offset > 0' @@ -182,22 +184,27 @@ describe "QuerySubscription", -> scenarios.forEach (scenario) => scenario.tests.forEach (test) => it "with #{scenario.name}, should correctly apply #{test.name}", -> - @q = new QuerySubscription(scenario.query) - @q._set = new MutableQueryResultSet() - @q._set.addModelsInRange(scenario.lastModels, new QueryRange(start: 0, end: scenario.lastModels.length)) + subscription = new QuerySubscription(scenario.query) + subscription._set = new MutableQueryResultSet() + subscription._set.addModelsInRange(scenario.lastModels, new QueryRange(start: 0, end: scenario.lastModels.length)) - spyOn(@q, 'update') - @q.applyChangeRecord(test.change) + spyOn(subscription, 'update') + spyOn(subscription, '_createResultAndTrigger') + subscription._updateInFlight = false + subscription.applyChangeRecord(test.change) if test.mustRefetchAllIds - expect(@q._set).toBe(null) + expect(subscription._set).toBe(null) else if test.nextModels is 'unchanged' - expect(@q._set.models()).toEqual(scenario.lastModels) + expect(subscription._set.models()).toEqual(scenario.lastModels) else - expect(@q._set.models()).toEqual(test.nextModels) + expect(subscription._set.models()).toEqual(test.nextModels) if test.mustUpdate - expect(@q.update).toHaveBeenCalled() + expect(subscription.update).toHaveBeenCalled() + + if test.mustTriger + expect(subscription._createResultAndTrigger).toHaveBeenCalled() describe "update", -> beforeEach -> @@ -205,18 +212,12 @@ describe "QuerySubscription", -> @_set ?= new MutableQueryResultSet() Promise.resolve() - it "should increment the version", -> - subscription = new QuerySubscription(DatabaseStore.findAll(Thread)) - expect(subscription._version).toBe(1) - subscription.update() - expect(subscription._version).toBe(2) - describe "when the query has an infinite range", -> it "should call _fetchRange for the entire range", -> subscription = new QuerySubscription(DatabaseStore.findAll(Thread)) subscription.update() advanceClock() - expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: true, version: 2}) + expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: true}) it "should fetch full full models only when the previous set is empty", -> subscription = new QuerySubscription(DatabaseStore.findAll(Thread)) @@ -224,7 +225,7 @@ describe "QuerySubscription", -> subscription._set.addModelsInRange([new Thread()], new QueryRange(start: 0, end: 1)) subscription.update() advanceClock() - expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: false, version: 2}) + expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: false}) describe "when the query has a range", -> beforeEach -> @@ -236,7 +237,7 @@ describe "QuerySubscription", -> subscription._set = null subscription.update() advanceClock() - expect(subscription._fetchRange).toHaveBeenCalledWith(@query.range(), {entireModels: true, version: 2}) + expect(subscription._fetchRange).toHaveBeenCalledWith(@query.range(), {entireModels: true}) describe "when we have a previous range", -> it "should call _fetchRange for the ranges representing the difference", -> @@ -247,8 +248,12 @@ describe "QuerySubscription", -> subscription = new QuerySubscription(@query) subscription._set = new MutableQueryResultSet() subscription._set.addModelsInRange([new Thread()], new QueryRange(start: 0, end: 1)) + + advanceClock() + subscription._fetchRange.reset() + subscription._updateInFlight = false subscription.update() advanceClock() expect(subscription._fetchRange.callCount).toBe(2) - expect(subscription._fetchRange.calls[0].args).toEqual([customRange1, {entireModels: true, version: 2}]) - expect(subscription._fetchRange.calls[1].args).toEqual([customRange2, {entireModels: true, version: 2}]) + expect(subscription._fetchRange.calls[0].args).toEqual([customRange1, {entireModels: true}]) + expect(subscription._fetchRange.calls[1].args).toEqual([customRange2, {entireModels: true}]) diff --git a/spec/utils-spec.coffee b/spec/utils-spec.coffee index c2cb312b3..54eac445f 100644 --- a/spec/utils-spec.coffee +++ b/spec/utils-spec.coffee @@ -37,7 +37,7 @@ describe "registeredObjectReviver / registeredObjectReplacer", -> it "should re-inflate Models in places they're not explicitly declared types", -> b = new JSONBlob({id: "local-ThreadsToProcess", json: [@testThread]}) jsonString = JSON.stringify(b, Utils.registeredObjectReplacer) - expectedString = '{"client_id":"local-ThreadsToProcess","json":[{"client_id":"local-1","subject":"Test 1234","participants":[{"client_id":"local-a","name":"Juan","email":"juan@nylas.com","thirdPartyData":{},"id":"local-a"},{"client_id":"local-b","name":"Ben","email":"ben@nylas.com","thirdPartyData":{},"id":"local-b"}],"id":"local-1","__constructorName":"Thread"}],"id":"local-ThreadsToProcess","__constructorName":"JSONBlob"}' + expectedString = '{"client_id":"local-ThreadsToProcess","server_id":"local-ThreadsToProcess","json":[{"client_id":"local-1","subject":"Test 1234","participants":[{"client_id":"local-a","name":"Juan","email":"juan@nylas.com","thirdPartyData":{},"id":"local-a"},{"client_id":"local-b","name":"Ben","email":"ben@nylas.com","thirdPartyData":{},"id":"local-b"}],"id":"local-1","__constructorName":"Thread"}],"id":"local-ThreadsToProcess","__constructorName":"JSONBlob"}' expect(jsonString).toEqual(expectedString) revived = JSON.parse(jsonString, Utils.registeredObjectReviver) diff --git a/src/flux/models/query-subscription.coffee b/src/flux/models/query-subscription.coffee index 885d92941..585d66c6f 100644 --- a/src/flux/models/query-subscription.coffee +++ b/src/flux/models/query-subscription.coffee @@ -8,9 +8,10 @@ verbose = false class QuerySubscription constructor: (@_query, @_options = {}) -> @_set = null - @_version = 0 @_callbacks = [] @_lastResult = null + @_updateInFlight = false + @_queuedChangeRecords = [] if @_query if @_query._count @@ -56,51 +57,66 @@ class QuerySubscription return unless @_query and record.objectClass is @_query.objectClass() return unless record.objects.length > 0 + @_queuedChangeRecords.push(record) + @_processChangeRecords() unless @_updateInFlight + + _processChangeRecords: => + return if @_queuedChangeRecords.length is 0 return @update() if not @_set - impactCount = 0 + knownImpacts = 0 + unknownImpacts = 0 mustRefetchAllIds = false - if record.type is 'unpersist' - for item in record.objects - offset = @_set.offsetOfId(item.clientId) - if offset isnt -1 - @_set.removeModelAtOffset(item, offset) - impactCount += 1 + @_queuedChangeRecords.forEach (record) => + if record.type is 'unpersist' + for item in record.objects + offset = @_set.offsetOfId(item.clientId) + if offset isnt -1 + @_set.removeModelAtOffset(item, offset) + unknownImpacts += 1 - else if record.type is 'persist' - for item in record.objects - offset = @_set.offsetOfId(item.clientId) - itemIsInSet = offset isnt -1 - itemShouldBeInSet = item.matches(@_query.matchers()) + else if record.type is 'persist' + for item in record.objects + offset = @_set.offsetOfId(item.clientId) + itemIsInSet = offset isnt -1 + itemShouldBeInSet = item.matches(@_query.matchers()) - if itemIsInSet and not itemShouldBeInSet - @_set.removeModelAtOffset(item, offset) - impactCount += 1 + if itemIsInSet and not itemShouldBeInSet + @_set.removeModelAtOffset(item, offset) + unknownImpacts += 1 - else if itemShouldBeInSet and not itemIsInSet - @_set.replaceModel(item) + else if itemShouldBeInSet and not itemIsInSet + @_set.replaceModel(item) + mustRefetchAllIds = true + unknownImpacts += 1 + + else if itemIsInSet + oldItem = @_set.modelWithId(item.clientId) + @_set.replaceModel(item) + + if @_itemSortOrderHasChanged(oldItem, item) + mustRefetchAllIds = true + unknownImpacts += 1 + else + knownImpacts += 1 + + # If we're not at the top of the result set, we can't be sure whether an + # item previously matched the set and doesn't anymore, impacting the items + # in the query range. We need to refetch IDs to be sure our set is correct. + if @_query.range().offset > 0 and (unknownImpacts + knownImpacts) < record.objects.length mustRefetchAllIds = true - impactCount += 1 + unknownImpacts += 1 - else if itemIsInSet - oldItem = @_set.modelWithId(item.clientId) - @_set.replaceModel(item) - impactCount += 1 - mustRefetchAllIds = true if @_itemSortOrderHasChanged(oldItem, item) + @_queuedChangeRecords = [] - # If we're not at the top of the result set, we can't be sure whether an - # item previously matched the set and doesn't anymore, impacting the items - # in the query range. We need to refetch IDs to be sure our set is correct. - if @_query.range().offset > 0 and impactCount < record.objects.length - impactCount += 1 - mustRefetchAllIds = true - - if impactCount > 0 + if unknownImpacts > 0 if mustRefetchAllIds @log("Clearing result set - mustRefetchAllIds") @_set = null @update() + else if knownImpacts > 0 + @_createResultAndTrigger() _itemSortOrderHasChanged: (old, updated) -> for descriptor in @_query.orderSortDescriptors() @@ -118,10 +134,9 @@ class QuerySubscription console.log(msg) if @_query._klass.name is 'Thread' update: => - version = @_version += 1 - desiredRange = @_query.range() currentRange = @_set?.range() + @_updateInFlight = true if currentRange and not currentRange.isInfinite() and not desiredRange.isInfinite() ranges = QueryRange.rangesBySubtracting(desiredRange, currentRange) @@ -131,24 +146,35 @@ class QuerySubscription entireModels = not @_set or @_set.modelCacheCount() is 0 Promise.each ranges, (range) => - return @log("Update (#{version}) - Cancelled @ Step 0") unless version is @_version - @log("Update (#{version}) - Fetching range #{range}") - @_fetchRange(range, {entireModels, version}) + @log("Update (#{@_query._klass.name}) - Fetching range #{range}") + @_fetchRange(range, {entireModels}) .then => - return @log("Update (#{version}) - Cancelled @ Step 1") unless version is @_version ids = @_set.ids().filter (id) => not @_set.modelWithId(id) - return @log("Update (#{version}) - No missing Ids") if ids.length is 0 - @log("Update (#{version}) - Fetching missing Ids: #{ids}") + return @log("Update (#{@_query._klass.name}) - No missing Ids") if ids.length is 0 + @log("Update (#{@_query._klass.name}) - Fetching missing Ids: #{ids}") return DatabaseStore.findAll(@_query._klass, {id: ids}).then (models) => - return @log("Update (#{version}) - Cancelled @ Step 1.5") unless version is @_version - @log("Update (#{version}) - Fetched missing Ids") + @log("Update (#{@_query._klass.name}) - Fetched missing Ids") @_set.replaceModel(m) for m in models .then => - return @log("Update (#{version}) - Cancelled @ Step 2") unless version is @_version - @log("Update (#{version}) - Triggering...") - @_createResultAndTrigger() + @_updateInFlight = false - _fetchRange: (range, {entireModels, version} = {}) -> + allChangesApplied = @_queuedChangeRecords.length is 0 + allCompleteModels = @_set.isComplete() + allUniqueIds = _.uniq(@_set.ids()).length is @_set.ids().length + + if allChangesApplied and not allUniqueIds + throw new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.") + + if allChangesApplied and not allCompleteModels + throw new Error("QuerySubscription: Applied all changes and result set is missing models.") + + if allChangesApplied and allCompleteModels and allUniqueIds + @log("Update (#{@_query._klass.name}) - Triggering...") + @_createResultAndTrigger() + else + @_processChangeRecords() + + _fetchRange: (range, {entireModels} = {}) -> rangeQuery = undefined unless range.isInfinite() @@ -162,11 +188,8 @@ class QuerySubscription rangeQuery ?= @_query DatabaseStore.run(rangeQuery, {format: false}).then (results) => - if version and version isnt @_version - return @log("Update (#{version}) - fetchRange Cancelled") - - unless @_set?.range().isContiguousWith(range) - @log("Clearing result set - #{range} isnt contiguous with #{@_set?.range()}") + if @_set and not @_set.range().isContiguousWith(range) + @log("Clearing result set - #{range} isnt contiguous with #{@_set.range()}") @_set = null @_set ?= new MutableQueryResultSet() @@ -178,14 +201,6 @@ class QuerySubscription @_set.clipToRange(@_query.range()) _createResultAndTrigger: => - unless @_set.isComplete() - console.warn("QuerySubscription: tried to publish a result set missing models.") - return - - ids = @_set.ids() - unless _.uniq(ids).length is ids.length - throw new Error("QuerySubscription: result set contains duplicate ids.") - if @_options.asResultSet @_set.setQuery(@_query) @_lastResult = @_set.immutableClone()