Mailspring/internal_packages/worker-sync/lib/delta-processor.es6
Juan Tejada b1882656ee fix(deltas) Prevent app from crashing when processing deltas
Summary:
Some strange scenario is causing us to attempt to redefine the object property `_delta` on our `delta.attributes` object inside `DeltaProcessor._decorateDeltas`, which throws an error, and consequently throws the app into an error loop. I haven't been able to track down how to reproduce this, but when it happens it completely takes the app down.

From testing with tomasz's copy of N1, this seemed to have originated from the lack of Account tokens, which was hard to discover because the `_deltas` object completely floods the console with errors.

From what I gather, `_decorateDeltas` is only called from a single place, `DeltaProcessor.process`, which is also only called from a single place, and its unclear how it would be called multiple times with the same delta object references.

In order to prevent this error from completely destroying the app, I've added the `configurable` option when defining the property, so that redifining it doesn't throw an error. It should be safe to redefine this property with the latest delta reference, and will also allow us to discover other errors more easily in the console like the lack of account tokens which might be the original cause of this problem.

Test Plan: manual

Reviewers: mark, spang, evan, halla

Reviewed By: evan, halla

Differential Revision: https://phab.nylas.com/D3717
2017-01-16 19:37:37 -08:00

291 lines
9 KiB
JavaScript

import _ from 'underscore';
import {
Actions,
Thread,
Message,
NylasAPIHelpers,
DatabaseStore,
MailRulesProcessor,
} from 'nylas-exports';
/**
* This injests deltas from multiple sources. One is from local-sync, the
* other is from n1-cloud. Both sources use
* isomorphic-core/src/delta-stream-builder to generate the delta stream.
*
* In both cases we are given the JSON serialized form of a `Transaction`
* model. An example Thread delta would look like:
*
* modelDelta = {
* id: 518,
* event: "modify",
* object: "thread",
* objectId: 2887,
* changedFields: ["subject", "unread"],
* attributes: {
* id: 2887,
* object: 'thread',
* account_id: 2,
* subject: "Hello World",
* unread: true,
* ...
* }
* }
*
* An example Metadata delta would look like:
*
* metadataDelta = {
* id: 519,
* event: "create",
* object: "metadata",
* objectId: 8876,
* changedFields: ["version", "object"],
* attributes: {
* id: 8876,
* value: {link_clicks: 1},
* object: "metadata",
* version: 2,
* plugin_id: "link-tracking",
* object_id: 2887,
* object_type: "thread"
* account_id: 2,
* }
* }
*
* The `object` may be "thread", "message", "metadata", or any other model
* type we support
*/
class DeltaProcessor {
constructor() {
this.activationTime = Date.now()
}
async process(rawDeltas = []) {
try {
const deltas = await this._decorateDeltas(rawDeltas);
Actions.longPollReceivedRawDeltas(deltas);
const {
modelDeltas,
accountDeltas,
metadataDeltas,
} = this._extractDeltaTypes(deltas);
this._handleAccountDeltas(accountDeltas);
const models = await this._saveModels(modelDeltas);
await this._saveMetadata(metadataDeltas);
await this._notifyOfNewMessages(models.created);
this._notifyOfSyncbackRequestDeltas(models)
} catch (err) {
console.error(rawDeltas)
console.error("DeltaProcessor: Process failed.", err)
NylasEnv.reportError(err);
} finally {
Actions.longPollProcessedDeltas()
}
}
/**
* Create a (non-enumerable) reference from the attributes which we
* carry forward back to their original deltas. This allows us to
* mark the deltas that the app ignores later in the process.
*/
_decorateDeltas(rawDeltas) {
rawDeltas.forEach((delta) => {
if (!delta.attributes) return;
Object.defineProperty(delta.attributes, '_delta', {
configurable: true,
get() { return delta; },
});
})
return rawDeltas
}
_extractDeltaTypes(rawDeltas) {
const modelDeltas = []
const accountDeltas = []
const metadataDeltas = []
rawDeltas.forEach((delta) => {
if (delta.object === "metadata") {
metadataDeltas.push(delta)
} else if (delta.object === "account") {
accountDeltas.push(delta)
} else {
modelDeltas.push(delta)
}
})
return {modelDeltas, metadataDeltas, accountDeltas}
}
_handleAccountDeltas = (accountDeltas) => {
const {modify} = this._clusterDeltas(accountDeltas);
if (!modify.account) return;
for (const accountJSON of _.values(modify.account)) {
Actions.updateAccount(accountJSON.account_id, {syncState: accountJSON.sync_state});
if (accountJSON.sync_state !== "running") {
Actions.recordUserEvent('Account Sync Errored', {
accountId: accountJSON.account_id,
syncState: accountJSON.sync_state,
});
}
}
}
_notifyOfSyncbackRequestDeltas({created, updated} = {}) {
const createdRequests = created.syncbackRequest || []
const updatedRequests = updated.syncbackRequest || []
const syncbackRequests = createdRequests.concat(updatedRequests)
if (syncbackRequests.length === 0) { return }
Actions.didReceiveSyncbackRequestDeltas(syncbackRequests)
}
async _saveModels(modelDeltas) {
const {create, modify, destroy} = this._clusterDeltas(modelDeltas);
const created = await Promise.props(_.mapObject(create, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
))
const updated = await Promise.props(_.mapObject(modify, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
));
await Promise.map(destroy, this._handleDestroyDelta);
return {created, updated};
}
async _saveMetadata(deltas) {
const all = {};
for (const delta of deltas.filter(d => d.event === 'create')) {
all[delta.attributes.object_id] = delta.attributes;
}
for (const delta of deltas.filter(d => d.event === 'modify')) {
all[delta.attributes.object_id] = delta.attributes;
}
const allByObjectType = _.groupBy(_.values(all), "object_type")
return Promise.map(Object.keys(allByObjectType), (objType) => {
const jsons = allByObjectType[objType]
const klass = NylasAPIHelpers.apiObjectToClassMap[objType];
const objectIds = jsons.map(j => j.object_id)
return DatabaseStore.inTransaction((t) => {
return this._findModelsForMetadata(t, klass, objectIds).then((modelsByObjectId) => {
const models = [];
Object.keys(modelsByObjectId).forEach((objectId) => {
const model = modelsByObjectId[objectId];
const metadataJSON = all[objectId];
const modelWithMetadata = model.applyPluginMetadata(metadataJSON.plugin_id, metadataJSON.value);
const localMetadatum = modelWithMetadata.metadataObjectForPluginId(metadataJSON.plugin_id);
localMetadatum.version = metadataJSON.version;
models.push(model);
})
return t.persistModels(models)
});
});
})
}
/**
@param ids An array of metadata object_ids
@returns A map of the object_ids to models in the database, resolving the
IDs as necessary. Must be a hashmap because the metadata object_ids may not
actually be present in the resulting models.
*/
_findModelsForMetadata(t, klass, ids) {
if (klass === Thread) {
// go through the Message table first, since local Thread IDs may be
// the (static) ID of any Message in the thread
// We prepend 't:' to thread IDs to avoid global object ID conflicts
const messageIds = ids.map(i => i.slice(2))
return t.findAll(Message, {id: messageIds}).then((messages) => {
if (messages.length !== messageIds.length) {
throw new Error(`Didn't find message for each thread. Thread IDs from remote: ${ids}`);
}
const threadIds = messages.map(m => m.threadId);
return t.findAll(Thread, {id: threadIds}).then((threads) => {
const map = {};
for (const thread of threads) {
const pluginObjectId = ids[threadIds.indexOf(thread.id)];
map[pluginObjectId] = thread;
}
return map;
});
});
}
return t.findAll(klass, {id: ids}).then((models) => {
const map = {};
for (const model of models) {
const pluginObjectId = model.id;
map[pluginObjectId] = model;
}
return map;
});
}
/**
* Group deltas by object type so we can mutate the cache efficiently.
* NOTE: This code must not just accumulate creates, modifies and
* destroys but also de-dupe them. We cannot call
* "persistModels(itemA, itemA, itemB)" or it will throw an exception
*/
_clusterDeltas(deltas) {
const create = {};
const modify = {};
const destroy = [];
for (const delta of deltas) {
if (delta.event === 'create') {
if (!create[delta.object]) { create[delta.object] = {}; }
create[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'modify') {
if (!modify[delta.object]) { modify[delta.object] = {}; }
modify[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'delete') {
destroy.push(delta);
}
}
return {create, modify, destroy};
}
async _notifyOfNewMessages(created) {
const incomingMessages = created.message || [];
// Filter for new messages that are not sent by the current user
const newUnread = incomingMessages.filter((msg) => {
const isUnread = msg.unread === true;
const isNew = msg.date && msg.date.valueOf() >= this.activationTime;
const isFromMe = msg.isFromMe();
return isUnread && isNew && !isFromMe;
});
if (newUnread.length === 0) {
return;
}
try {
await MailRulesProcessor.processMessages(created.message || [])
} catch (err) {
console.error("DeltaProcessor: Running mail rules on incoming mail failed.")
}
Actions.onNewMailDeltas(created)
}
_handleDestroyDelta(delta) {
const klass = NylasAPIHelpers.apiObjectToClassMap[delta.object];
if (!klass) { return Promise.resolve(); }
return DatabaseStore.inTransaction(t => {
return t.find(klass, delta.objectId).then((model) => {
if (!model) { return Promise.resolve(); }
return t.unpersistModel(model);
});
});
}
}
export default new DeltaProcessor()