fix(db): Always refresh entire range after unknown changes

Summary: When changes can't be accurately applied to a result set, always refresh the entire range, not just the missing range. This ensures that additional changes being applied while our query is in flight can't cause the result set to include the same item twice.

Test Plan: Run tests

Reviewers: evan, juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D2846
This commit is contained in:
Ben Gotow 2016-04-05 18:32:38 -07:00
parent 43093cda31
commit a3fe0f4d71

View file

@ -88,21 +88,17 @@ export default class QuerySubscription {
}
// Scan through change records and apply them to the last result set.
// - Returns true if changes did / will result in new result set being created.
// - Returns false if no changes were made.
_processChangeRecords = () => {
if (this._queuedChangeRecords.length === 0) {
return false;
return;
}
if (!this._set) {
this.update();
return true;
return;
}
let knownImpacts = 0;
let unknownImpacts = 0;
let mustRefetchAllIds = false;
this._queuedChangeRecords.forEach((record) => {
if (record.type === 'unpersist') {
@ -124,14 +120,12 @@ export default class QuerySubscription {
unknownImpacts += 1
} else if (itemShouldBeInSet && !itemIsInSet) {
this._set.replaceModel(item)
mustRefetchAllIds = true
unknownImpacts += 1;
} else if (itemIsInSet) {
const oldItem = this._set.modelWithId(item.clientId);
this._set.replaceModel(item);
if (this._itemSortOrderHasChanged(oldItem, item)) {
mustRefetchAllIds = true
unknownImpacts += 1
} else {
knownImpacts += 1
@ -142,7 +136,6 @@ export default class QuerySubscription {
// item previously matched the set and doesn't anymore, impacting the items
// in the query range. We need to refetch IDs to be sure our set === correct.
if ((this._query.range().offset > 0) && (unknownImpacts + knownImpacts) < record.objects.length) {
mustRefetchAllIds = true
unknownImpacts += 1
}
}
@ -151,17 +144,10 @@ export default class QuerySubscription {
this._queuedChangeRecords = [];
if (unknownImpacts > 0) {
if (mustRefetchAllIds) {
this._set = null;
}
this.update();
return true;
}
if (knownImpacts > 0) {
this.update({mustRefetchEntireRange: true});
} else if (knownImpacts > 0) {
this._createResultAndTrigger();
return false;
}
return false;
}
_itemSortOrderHasChanged(old, updated) {
@ -177,18 +163,29 @@ export default class QuerySubscription {
return false;
}
update() {
update({mustRefetchEntireRange} = {}) {
this._updateInFlight = true;
const version = this._queryVersion;
const desiredRange = this._query.range();
const currentRange = this._set ? this._set.range() : null;
const areNotInfinite = currentRange && !currentRange.isInfinite() && !desiredRange.isInfinite();
const previousResultIsEmpty = !this._set || this._set.modelCacheCount() === 0;
const missingRange = this._getMissingRange(desiredRange, currentRange);
const fetchEntireModels = areNotInfinite ? true : previousResultIsEmpty;
const hasNonInfiniteRange = currentRange && !currentRange.isInfinite() && !desiredRange.isInfinite();
this._fetchMissingRange(missingRange, {version, fetchEntireModels});
// If we have a limited range, and changes don't require that we refetch
// the entire range, just fetch the missing items. This is the path typically
// used while scrolling.
if (hasNonInfiniteRange && !mustRefetchEntireRange) {
const missingRange = this._getMissingRange(desiredRange, currentRange);
this._fetchRange(missingRange, {
version: this._queryVersion,
fetchEntireModels: true,
});
} else {
const haveNoModels = !this._set || this._set.modelCacheCount() === 0;
this._fetchRange(desiredRange, {
version: this._queryVersion,
fetchEntireModels: haveNoModels,
});
}
}
_getMissingRange = (desiredRange, currentRange) => {
@ -213,62 +210,64 @@ export default class QuerySubscription {
return rangeQuery;
}
_fetchMissingRange(missingRange, {version, fetchEntireModels}) {
const missingRangeQuery = this._getQueryForRange(missingRange, fetchEntireModels);
_fetchRange(range, {version, fetchEntireModels}) {
const rangeQuery = this._getQueryForRange(range, fetchEntireModels);
DatabaseStore.run(missingRangeQuery, {format: false}).then((results) => {
DatabaseStore.run(rangeQuery, {format: false}).then((results) => {
if (this._queryVersion !== version) {
return;
}
if (this._set && !this._set.range().isContiguousWith(missingRange)) {
if (this._set && !this._set.range().isContiguousWith(range)) {
this._set = null;
}
this._set = this._set || new MutableQueryResultSet();
// Create result and trigger if either of the following:
// A) no changes have come in during querying the missing range,
// B) applying those changes has no effect on the result set, and this one is
// still good.
if ((this._queuedChangeRecords.length === 0) || (this._processChangeRecords() === false)) {
if (fetchEntireModels) {
this._set.addModelsInRange(results, missingRange);
} else {
this._set.addIdsInRange(results, missingRange);
}
this._set.clipToRange(this._query.range());
const missingIds = this._set.ids().filter(id => !this._set.modelWithId(id));
if (missingIds.length > 0) {
DatabaseStore.findAll(this._query._klass, {id: missingIds}).then((models) => {
if (this._queryVersion !== version) {
return;
}
for (const m of models) {
this._set.replaceModel(m);
}
this._updateInFlight = false;
this._createResultAndTrigger();
});
} else {
this._updateInFlight = false;
this._createResultAndTrigger();
}
if (fetchEntireModels) {
this._set.addModelsInRange(results, range);
} else {
this._set.addIdsInRange(results, range);
}
this._set.clipToRange(this._query.range());
this._fetchMissingModels().then((models) => {
if (this._queryVersion !== version) {
return;
}
for (const m of models) {
this._set.replaceModel(m);
}
this._createResultAndTrigger();
});
});
}
_fetchMissingModels() {
const missingIds = this._set.ids().filter(id => !this._set.modelWithId(id));
if (missingIds.length === 0) {
return Promise.resolve([]);
}
return DatabaseStore.findAll(this._query._klass, {id: missingIds});
}
_createResultAndTrigger = () => {
const allCompleteModels = this._set.isComplete()
const allUniqueIds = _.uniq(this._set.ids()).length === this._set.ids().length
let error = null;
if (!allCompleteModels) {
error = new Error("QuerySubscription: Applied all changes and result set === missing models.");
}
if (!allUniqueIds) {
throw new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.");
error = new Error("QuerySubscription: Applied all changes and result set contains duplicate IDs.");
}
if (!allCompleteModels) {
throw new Error("QuerySubscription: Applied all changes and result set === missing models.");
if (error) {
NylasEnv.reportError(error);
this._set = null;
this.update();
return;
}
if (this._options.asResultSet) {
@ -279,5 +278,11 @@ export default class QuerySubscription {
}
this._callbacks.forEach((callback) => callback(this._lastResult));
// process any additional change records that have arrived
if (this._updateInFlight) {
this._updateInFlight = false;
this._processChangeRecords();
}
}
}