fix(qs): Handle query subscription errors, => ES6

This commit is contained in:
Ben Gotow 2016-03-22 17:14:25 -07:00
parent ed1f12d916
commit 4a8a22e588
2 changed files with 283 additions and 230 deletions

View file

@ -1,230 +0,0 @@
_ = require 'underscore'
DatabaseStore = require '../stores/database-store'
QueryRange = require './query-range'
MutableQueryResultSet = require './mutable-query-result-set'
class QuerySubscription
constructor: (@_query, @_options = {}) ->
@_set = null
@_callbacks = []
@_lastResult = null
@_updateInFlight = false
@_queuedChangeRecords = []
@_queryVersion = 1
if @_query
if @_query._count
throw new Error("QuerySubscriptionPool::add - You cannot listen to count queries.")
if @_options.initialModels
@_set = new MutableQueryResultSet()
@_set.addModelsInRange(@_options.initialModels, new QueryRange({
limit: @_options.initialModels.length,
offset: 0
query: =>
addCallback: (callback) =>
unless callback instanceof Function
throw new Error("QuerySubscription:addCallback - expects a function, received #{callback}")
if @_lastResult
process.nextTick =>
return unless @_lastResult
hasCallback: (callback) =>
@_callbacks.indexOf(callback) isnt -1
removeCallback: (callback) =>
unless callback instanceof Function
throw new Error("QuerySubscription:removeCallback - expects a function, received #{callback}")
@_callbacks = _.without(@_callbacks, callback)
callbackCount: =>
applyChangeRecord: (record) =>
return unless @_query and record.objectClass is @_query.objectClass()
return unless record.objects.length > 0
@_processChangeRecords() unless @_updateInFlight
cancelPendingUpdate: =>
@_queryVersion += 1
@_updateInFlight = false
# Scan through change records and apply them to the last result set.
# - Returns true if changes did / will result in new result set being created.
# - Returns false if no changes were made.
_processChangeRecords: =>
if @_queuedChangeRecords.length is 0
return false
if not @_set
return true
knownImpacts = 0
unknownImpacts = 0
mustRefetchAllIds = false
@_queuedChangeRecords.forEach (record) =>
if record.type is 'unpersist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
if offset isnt -1
@_set.removeModelAtOffset(item, offset)
unknownImpacts += 1
else if record.type is 'persist'
for item in record.objects
offset = @_set.offsetOfId(item.clientId)
itemIsInSet = offset isnt -1
itemShouldBeInSet = item.matches(@_query.matchers())
if itemIsInSet and not itemShouldBeInSet
@_set.removeModelAtOffset(item, offset)
unknownImpacts += 1
else if itemShouldBeInSet and not itemIsInSet
mustRefetchAllIds = true
unknownImpacts += 1
else if itemIsInSet
oldItem = @_set.modelWithId(item.clientId)
if @_itemSortOrderHasChanged(oldItem, item)
mustRefetchAllIds = true
unknownImpacts += 1
knownImpacts += 1
# If we're not at the top of the result set, we can't be sure whether an
# item previously matched the set and doesn't anymore, impacting the items
# in the query range. We need to refetch IDs to be sure our set is correct.
if @_query.range().offset > 0 and (unknownImpacts + knownImpacts) < record.objects.length
mustRefetchAllIds = true
unknownImpacts += 1
@_queuedChangeRecords = []
if unknownImpacts > 0
@_set = null if mustRefetchAllIds
return true
else if knownImpacts > 0
return false
return false
_itemSortOrderHasChanged: (old, updated) ->
for descriptor in @_query.orderSortDescriptors()
oldSortValue = old[descriptor.attr.modelKey]
updatedSortValue = updated[descriptor.attr.modelKey]
if not (oldSortValue >= updatedSortValue && oldSortValue <= updatedSortValue)
return true
return false
update: =>
@_updateInFlight = true
version = @_queryVersion
desiredRange = @_query.range()
currentRange = @_set?.range()
areNotInfinite = currentRange and not currentRange.isInfinite() and not desiredRange.isInfinite()
previousResultIsEmpty = not @_set or @_set.modelCacheCount() is 0
missingRange = @_getMissingRange(desiredRange, currentRange)
fetchEntireModels = if areNotInfinite then true else previousResultIsEmpty
@_fetchMissingRange(missingRange, {version, fetchEntireModels})
_getMissingRange: (desiredRange, currentRange) =>
if currentRange and not currentRange.isInfinite() and not desiredRange.isInfinite()
ranges = QueryRange.rangesBySubtracting(desiredRange, currentRange)
missingRange = if ranges.length > 1 then desiredRange else ranges[0]
missingRange = desiredRange
return missingRange
_getQueryForRange: (range, fetchEntireModels) =>
rangeQuery = null
if not range.isInfinite()
rangeQuery ?= @_query.clone()
if not fetchEntireModels
rangeQuery ?= @_query.clone()
rangeQuery ?= @_query
return rangeQuery
_fetchMissingRange: (missingRange, {version, fetchEntireModels}) ->
missingRangeQuery = @_getQueryForRange(missingRange, fetchEntireModels), {format: false})
.then (results) =>
return unless @_queryVersion is version
if @_set and not @_set.range().isContiguousWith(missingRange)
@_set = null
@_set ?= new MutableQueryResultSet()
# Create result and trigger
# if A) no changes have come in during querying the missing range,
# or B) applying those changes has no effect on the result set, and this one is
# still good.
if @_queuedChangeRecords.length is 0 or @_processChangeRecords() is false
if fetchEntireModels
@_set.addModelsInRange(results, missingRange)
@_set.addIdsInRange(results, missingRange)
ids = @_set.ids().filter (id) => not @_set.modelWithId(id)
if ids.length > 0
return DatabaseStore.findAll(@_query._klass, {id: ids}).then (models) =>
return unless @_queryVersion is version
@_set.replaceModel(m) for m in models
@_updateInFlight = false
@_updateInFlight = false
_createResultAndTrigger: =>
allCompleteModels = @_set.isComplete()
allUniqueIds = _.uniq(@_set.ids()).length is @_set.ids().length
if not allUniqueIds
throw new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.")
if not allCompleteModels
throw new Error("QuerySubscription: Applied all changes and result set is missing models.")
if @_options.asResultSet
@_lastResult = @_set.immutableClone()
@_lastResult = @_query.formatResult(@_set.models())
@_callbacks.forEach (callback) =>
module.exports = QuerySubscription

View file

@ -0,0 +1,283 @@
import _ from 'underscore';
import DatabaseStore from '../stores/database-store';
import QueryRange from './query-range';
import MutableQueryResultSet from './mutable-query-result-set';
export default class QuerySubscription {
constructor(query, options = {}) {
this._query = query;
this._options = options;
this._set = null;
this._callbacks = [];
this._lastResult = null;
this._updateInFlight = false;
this._queuedChangeRecords = [];
this._queryVersion = 1;
if (this._query) {
if (this._query._count) {
throw new Error("QuerySubscriptionPool::add - You cannot listen to count queries.")
if (this._options.initialModels) {
this._set = new MutableQueryResultSet();
this._set.addModelsInRange(this._options.initialModels, new QueryRange({
limit: this._options.initialModels.length,
offset: 0,
} else {
query = () => {
return this._query;
addCallback = (callback) => {
if (!(callback instanceof Function)) {
throw new Error("QuerySubscription:addCallback - expects a function, received #{callback}");
if (this._lastResult) {
process.nextTick(() => {
if (!this._lastResult) { return; }
hasCallback = (callback) => {
return (this._callbacks.indexOf(callback) !== -1);
removeCallback = (callback) => {
if (!(callback instanceof Function)) {
throw new Error("QuerySubscription:removeCallback - expects a function, received #{callback}")
this._callbacks = _.without(this._callbacks, callback);
callbackCount = () => {
return this._callbacks.length;
applyChangeRecord = (record) => {
if (!this._query || record.objectClass !== this._query.objectClass()) {
if (record.objects.length === 0) {
if (!this._updateInFlight) {
cancelPendingUpdate = () => {
this._queryVersion += 1;
this._updateInFlight = false;
// Scan through change records and apply them to the last result set.
// - Returns true if changes did / will result in new result set being created.
// - Returns false if no changes were made.
_processChangeRecords = () => {
if (this._queuedChangeRecords.length === 0) {
return false;
if (!this._set) {
return true;
let knownImpacts = 0;
let unknownImpacts = 0;
let mustRefetchAllIds = false;
this._queuedChangeRecords.forEach((record) => {
if (record.type === 'unpersist') {
for (const item of record.objects) {
const offset = this._set.offsetOfId(item.clientId)
if (offset !== -1) {
this._set.removeModelAtOffset(item, offset);
unknownImpacts += 1;
} else if (record.type === 'persist') {
for (const item of record.objects) {
const offset = this._set.offsetOfId(item.clientId);
const itemIsInSet = offset !== -1;
const itemShouldBeInSet = item.matches(this._query.matchers());
if (itemIsInSet && !itemShouldBeInSet) {
this._set.removeModelAtOffset(item, offset)
unknownImpacts += 1
} else if (itemShouldBeInSet && !itemIsInSet) {
mustRefetchAllIds = true
unknownImpacts += 1;
} else if (itemIsInSet) {
const oldItem = this._set.modelWithId(item.clientId);
if (this._itemSortOrderHasChanged(oldItem, item)) {
mustRefetchAllIds = true
unknownImpacts += 1
} else {
knownImpacts += 1
// If we're not at the top of the result set, we can't be sure whether an
// item previously matched the set and doesn't anymore, impacting the items
// in the query range. We need to refetch IDs to be sure our set === correct.
if ((this._query.range().offset > 0) && (unknownImpacts + knownImpacts) < record.objects.length) {
mustRefetchAllIds = true
unknownImpacts += 1
this._queuedChangeRecords = [];
if (unknownImpacts > 0) {
if (mustRefetchAllIds) {
this._set = null;
return true;
if (knownImpacts > 0) {
return false;
return false;
_itemSortOrderHasChanged(old, updated) {
for (const descriptor of this._query.orderSortDescriptors()) {
const oldSortValue = old[descriptor.attr.modelKey];
const updatedSortValue = updated[descriptor.attr.modelKey];
if (!(oldSortValue >= updatedSortValue && oldSortValue <= updatedSortValue)) {
return true;
return false;
update() {
this._updateInFlight = true;
const version = this._queryVersion;
const desiredRange = this._query.range();
const currentRange = this._set ? this._set.range() : null;
const areNotInfinite = currentRange && !currentRange.isInfinite() && !desiredRange.isInfinite();
const previousResultIsEmpty = !this._set || this._set.modelCacheCount() === 0;
const missingRange = this._getMissingRange(desiredRange, currentRange);
const fetchEntireModels = areNotInfinite ? true : previousResultIsEmpty;
this._fetchMissingRange(missingRange, {version, fetchEntireModels});
_getMissingRange = (desiredRange, currentRange) => {
if (currentRange && !currentRange.isInfinite() && !desiredRange.isInfinite()) {
const ranges = QueryRange.rangesBySubtracting(desiredRange, currentRange);
return ranges.length > 1 ? desiredRange : ranges[0];
return desiredRange;
_getQueryForRange = (range, fetchEntireModels) => {
let rangeQuery = null;
if (!range.isInfinite()) {
rangeQuery = rangeQuery || this._query.clone();
if (!fetchEntireModels) {
rangeQuery = rangeQuery || this._query.clone();
rangeQuery = rangeQuery || this._query;
return rangeQuery;
_fetchMissingRange(missingRange, {version, fetchEntireModels}) {
const missingRangeQuery = this._getQueryForRange(missingRange, fetchEntireModels);, {format: false}).then((results) => {
if (this._queryVersion !== version) {
if (this._set && !this._set.range().isContiguousWith(missingRange)) {
this._set = null;
this._set = this._set || new MutableQueryResultSet();
// Create result and trigger if either of the following:
// A) no changes have come in during querying the missing range,
// B) applying those changes has no effect on the result set, and this one is
// still good.
if ((this._queuedChangeRecords.length === 0) || (this._processChangeRecords() === false)) {
if (fetchEntireModels) {
this._set.addModelsInRange(results, missingRange);
} else {
this._set.addIdsInRange(results, missingRange);
const missingIds = this._set.ids().filter(id => !this._set.modelWithId(id));
if (missingIds.length > 0) {
DatabaseStore.findAll(this._query._klass, {id: missingIds}).then((models) => {
if (this._queryVersion !== version) {
for (const m of models) {
this._updateInFlight = false;
} else {
this._updateInFlight = false;
_createResultAndTrigger = () => {
const allCompleteModels = this._set.isComplete()
const allUniqueIds = _.uniq(this._set.ids()).length === this._set.ids().length
if (!allUniqueIds) {
throw new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.");
if (!allCompleteModels) {
throw new Error("QuerySubscription: Applied all changes and result set === missing models.");
if (this._options.asResultSet) {
this._lastResult = this._set.immutableClone();
} else {
this._lastResult = this._query.formatResult(this._set.models());
this._callbacks.forEach((callback) => callback(this._lastResult));