fix(db): Pool, flush change records rather than cancel queries

This commit is contained in:
Ben Gotow 2016-01-28 19:18:50 -08:00
parent 45d897ce0a
commit 140162dcd3
3 changed files with 102 additions and 82 deletions

View file

@ -79,7 +79,8 @@ describe "QuerySubscription", ->
new Thread(accountId: 'a', clientId: '2', lastMessageReceivedTimestamp: 2),
new Thread(accountId: 'a', clientId: '1', lastMessageReceivedTimestamp: 1),
]
mustUpdate: true
mustUpdate: false
mustTrigger: true
mustRefetchAllIds: false
},{
name: 'Item in set saved - new sort value'
@ -93,7 +94,8 @@ describe "QuerySubscription", ->
new Thread(accountId: 'a', clientId: '3', lastMessageReceivedTimestamp: 3),
new Thread(accountId: 'a', clientId: '2', lastMessageReceivedTimestamp: 2),
]
mustUpdate: true
mustUpdate: false
mustTrigger: true
mustRefetchAllIds: true
},{
name: 'Item saved - does not match query clauses, offset > 0'
@ -182,22 +184,27 @@ describe "QuerySubscription", ->
scenarios.forEach (scenario) =>
scenario.tests.forEach (test) =>
it "with #{scenario.name}, should correctly apply #{test.name}", ->
@q = new QuerySubscription(scenario.query)
@q._set = new MutableQueryResultSet()
@q._set.addModelsInRange(scenario.lastModels, new QueryRange(start: 0, end: scenario.lastModels.length))
subscription = new QuerySubscription(scenario.query)
subscription._set = new MutableQueryResultSet()
subscription._set.addModelsInRange(scenario.lastModels, new QueryRange(start: 0, end: scenario.lastModels.length))
spyOn(@q, 'update')
@q.applyChangeRecord(test.change)
spyOn(subscription, 'update')
spyOn(subscription, '_createResultAndTrigger')
subscription._updateInFlight = false
subscription.applyChangeRecord(test.change)
if test.mustRefetchAllIds
expect(@q._set).toBe(null)
expect(subscription._set).toBe(null)
else if test.nextModels is 'unchanged'
expect(@q._set.models()).toEqual(scenario.lastModels)
expect(subscription._set.models()).toEqual(scenario.lastModels)
else
expect(@q._set.models()).toEqual(test.nextModels)
expect(subscription._set.models()).toEqual(test.nextModels)
if test.mustUpdate
expect(@q.update).toHaveBeenCalled()
expect(subscription.update).toHaveBeenCalled()
if test.mustTriger
expect(subscription._createResultAndTrigger).toHaveBeenCalled()
describe "update", ->
beforeEach ->
@ -205,18 +212,12 @@ describe "QuerySubscription", ->
@_set ?= new MutableQueryResultSet()
Promise.resolve()
it "should increment the version", ->
subscription = new QuerySubscription(DatabaseStore.findAll(Thread))
expect(subscription._version).toBe(1)
subscription.update()
expect(subscription._version).toBe(2)
describe "when the query has an infinite range", ->
it "should call _fetchRange for the entire range", ->
subscription = new QuerySubscription(DatabaseStore.findAll(Thread))
subscription.update()
advanceClock()
expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: true, version: 2})
expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: true})
it "should fetch full full models only when the previous set is empty", ->
subscription = new QuerySubscription(DatabaseStore.findAll(Thread))
@ -224,7 +225,7 @@ describe "QuerySubscription", ->
subscription._set.addModelsInRange([new Thread()], new QueryRange(start: 0, end: 1))
subscription.update()
advanceClock()
expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: false, version: 2})
expect(subscription._fetchRange).toHaveBeenCalledWith(QueryRange.infinite(), {entireModels: false})
describe "when the query has a range", ->
beforeEach ->
@ -236,7 +237,7 @@ describe "QuerySubscription", ->
subscription._set = null
subscription.update()
advanceClock()
expect(subscription._fetchRange).toHaveBeenCalledWith(@query.range(), {entireModels: true, version: 2})
expect(subscription._fetchRange).toHaveBeenCalledWith(@query.range(), {entireModels: true})
describe "when we have a previous range", ->
it "should call _fetchRange for the ranges representing the difference", ->
@ -247,8 +248,12 @@ describe "QuerySubscription", ->
subscription = new QuerySubscription(@query)
subscription._set = new MutableQueryResultSet()
subscription._set.addModelsInRange([new Thread()], new QueryRange(start: 0, end: 1))
advanceClock()
subscription._fetchRange.reset()
subscription._updateInFlight = false
subscription.update()
advanceClock()
expect(subscription._fetchRange.callCount).toBe(2)
expect(subscription._fetchRange.calls[0].args).toEqual([customRange1, {entireModels: true, version: 2}])
expect(subscription._fetchRange.calls[1].args).toEqual([customRange2, {entireModels: true, version: 2}])
expect(subscription._fetchRange.calls[0].args).toEqual([customRange1, {entireModels: true}])
expect(subscription._fetchRange.calls[1].args).toEqual([customRange2, {entireModels: true}])

View file

@ -37,7 +37,7 @@ describe "registeredObjectReviver / registeredObjectReplacer", ->
it "should re-inflate Models in places they're not explicitly declared types", ->
b = new JSONBlob({id: "local-ThreadsToProcess", json: [@testThread]})
jsonString = JSON.stringify(b, Utils.registeredObjectReplacer)
expectedString = '{"client_id":"local-ThreadsToProcess","json":[{"client_id":"local-1","subject":"Test 1234","participants":[{"client_id":"local-a","name":"Juan","email":"juan@nylas.com","thirdPartyData":{},"id":"local-a"},{"client_id":"local-b","name":"Ben","email":"ben@nylas.com","thirdPartyData":{},"id":"local-b"}],"id":"local-1","__constructorName":"Thread"}],"id":"local-ThreadsToProcess","__constructorName":"JSONBlob"}'
expectedString = '{"client_id":"local-ThreadsToProcess","server_id":"local-ThreadsToProcess","json":[{"client_id":"local-1","subject":"Test 1234","participants":[{"client_id":"local-a","name":"Juan","email":"juan@nylas.com","thirdPartyData":{},"id":"local-a"},{"client_id":"local-b","name":"Ben","email":"ben@nylas.com","thirdPartyData":{},"id":"local-b"}],"id":"local-1","__constructorName":"Thread"}],"id":"local-ThreadsToProcess","__constructorName":"JSONBlob"}'
expect(jsonString).toEqual(expectedString)
revived = JSON.parse(jsonString, Utils.registeredObjectReviver)

View file

@ -8,9 +8,10 @@ verbose = false
class QuerySubscription
constructor: (@_query, @_options = {}) ->
@_set = null
@_version = 0
@_callbacks = []
@_lastResult = null
@_updateInFlight = false
@_queuedChangeRecords = []
if @_query
if @_query._count
@ -56,51 +57,66 @@ class QuerySubscription
return unless @_query and record.objectClass is @_query.objectClass()
return unless record.objects.length > 0
@_queuedChangeRecords.push(record)
@_processChangeRecords() unless @_updateInFlight
_processChangeRecords: =>
return if @_queuedChangeRecords.length is 0
return @update() if not @_set
impactCount = 0
knownImpacts = 0
unknownImpacts = 0
mustRefetchAllIds = false
if record.type is 'unpersist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
if offset isnt -1
@_set.removeModelAtOffset(item, offset)
impactCount += 1
@_queuedChangeRecords.forEach (record) =>
if record.type is 'unpersist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
if offset isnt -1
@_set.removeModelAtOffset(item, offset)
unknownImpacts += 1
else if record.type is 'persist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
itemIsInSet = offset isnt -1
itemShouldBeInSet = item.matches(@_query.matchers())
else if record.type is 'persist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
itemIsInSet = offset isnt -1
itemShouldBeInSet = item.matches(@_query.matchers())
if itemIsInSet and not itemShouldBeInSet
@_set.removeModelAtOffset(item, offset)
impactCount += 1
if itemIsInSet and not itemShouldBeInSet
@_set.removeModelAtOffset(item, offset)
unknownImpacts += 1
else if itemShouldBeInSet and not itemIsInSet
@_set.replaceModel(item)
else if itemShouldBeInSet and not itemIsInSet
@_set.replaceModel(item)
mustRefetchAllIds = true
unknownImpacts += 1
else if itemIsInSet
oldItem = @_set.modelWithId(item.clientId)
@_set.replaceModel(item)
if @_itemSortOrderHasChanged(oldItem, item)
mustRefetchAllIds = true
unknownImpacts += 1
else
knownImpacts += 1
# If we're not at the top of the result set, we can't be sure whether an
# item previously matched the set and doesn't anymore, impacting the items
# in the query range. We need to refetch IDs to be sure our set is correct.
if @_query.range().offset > 0 and (unknownImpacts + knownImpacts) < record.objects.length
mustRefetchAllIds = true
impactCount += 1
unknownImpacts += 1
else if itemIsInSet
oldItem = @_set.modelWithId(item.clientId)
@_set.replaceModel(item)
impactCount += 1
mustRefetchAllIds = true if @_itemSortOrderHasChanged(oldItem, item)
@_queuedChangeRecords = []
# If we're not at the top of the result set, we can't be sure whether an
# item previously matched the set and doesn't anymore, impacting the items
# in the query range. We need to refetch IDs to be sure our set is correct.
if @_query.range().offset > 0 and impactCount < record.objects.length
impactCount += 1
mustRefetchAllIds = true
if impactCount > 0
if unknownImpacts > 0
if mustRefetchAllIds
@log("Clearing result set - mustRefetchAllIds")
@_set = null
@update()
else if knownImpacts > 0
@_createResultAndTrigger()
_itemSortOrderHasChanged: (old, updated) ->
for descriptor in @_query.orderSortDescriptors()
@ -118,10 +134,9 @@ class QuerySubscription
console.log(msg) if @_query._klass.name is 'Thread'
update: =>
version = @_version += 1
desiredRange = @_query.range()
currentRange = @_set?.range()
@_updateInFlight = true
if currentRange and not currentRange.isInfinite() and not desiredRange.isInfinite()
ranges = QueryRange.rangesBySubtracting(desiredRange, currentRange)
@ -131,24 +146,35 @@ class QuerySubscription
entireModels = not @_set or @_set.modelCacheCount() is 0
Promise.each ranges, (range) =>
return @log("Update (#{version}) - Cancelled @ Step 0") unless version is @_version
@log("Update (#{version}) - Fetching range #{range}")
@_fetchRange(range, {entireModels, version})
@log("Update (#{@_query._klass.name}) - Fetching range #{range}")
@_fetchRange(range, {entireModels})
.then =>
return @log("Update (#{version}) - Cancelled @ Step 1") unless version is @_version
ids = @_set.ids().filter (id) => not @_set.modelWithId(id)
return @log("Update (#{version}) - No missing Ids") if ids.length is 0
@log("Update (#{version}) - Fetching missing Ids: #{ids}")
return @log("Update (#{@_query._klass.name}) - No missing Ids") if ids.length is 0
@log("Update (#{@_query._klass.name}) - Fetching missing Ids: #{ids}")
return DatabaseStore.findAll(@_query._klass, {id: ids}).then (models) =>
return @log("Update (#{version}) - Cancelled @ Step 1.5") unless version is @_version
@log("Update (#{version}) - Fetched missing Ids")
@log("Update (#{@_query._klass.name}) - Fetched missing Ids")
@_set.replaceModel(m) for m in models
.then =>
return @log("Update (#{version}) - Cancelled @ Step 2") unless version is @_version
@log("Update (#{version}) - Triggering...")
@_createResultAndTrigger()
@_updateInFlight = false
_fetchRange: (range, {entireModels, version} = {}) ->
allChangesApplied = @_queuedChangeRecords.length is 0
allCompleteModels = @_set.isComplete()
allUniqueIds = _.uniq(@_set.ids()).length is @_set.ids().length
if allChangesApplied and not allUniqueIds
throw new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.")
if allChangesApplied and not allCompleteModels
throw new Error("QuerySubscription: Applied all changes and result set is missing models.")
if allChangesApplied and allCompleteModels and allUniqueIds
@log("Update (#{@_query._klass.name}) - Triggering...")
@_createResultAndTrigger()
else
@_processChangeRecords()
_fetchRange: (range, {entireModels} = {}) ->
rangeQuery = undefined
unless range.isInfinite()
@ -162,11 +188,8 @@ class QuerySubscription
rangeQuery ?= @_query
DatabaseStore.run(rangeQuery, {format: false}).then (results) =>
if version and version isnt @_version
return @log("Update (#{version}) - fetchRange Cancelled")
unless @_set?.range().isContiguousWith(range)
@log("Clearing result set - #{range} isnt contiguous with #{@_set?.range()}")
if @_set and not @_set.range().isContiguousWith(range)
@log("Clearing result set - #{range} isnt contiguous with #{@_set.range()}")
@_set = null
@_set ?= new MutableQueryResultSet()
@ -178,14 +201,6 @@ class QuerySubscription
@_set.clipToRange(@_query.range())
_createResultAndTrigger: =>
unless @_set.isComplete()
console.warn("QuerySubscription: tried to publish a result set missing models.")
return
ids = @_set.ids()
unless _.uniq(ids).length is ids.length
throw new Error("QuerySubscription: result set contains duplicate ids.")
if @_options.asResultSet
@_set.setQuery(@_query)
@_lastResult = @_set.immutableClone()