Mailspring/internal_packages/worker-sync/lib/delta-processor.es6
Halla Moore 9066078911 fix(delta-processor): Return created models in the expected format
Summary:
We were returning an array of arrays, when really we wanted an object
of arrays keyed by model type. This also involved removing an implementation
of mapObject that was overwriting underscore's implementation of it.

Test Plan: local, having others try it.

Reviewers: juan

Reviewed By: juan

Differential Revision: https://phab.nylas.com/D3509
2016-12-15 12:12:37 -08:00

273 lines
8.6 KiB
JavaScript

import _ from 'underscore';
import {
Actions,
Thread,
Message,
NylasAPIHelpers,
DatabaseStore,
MailRulesProcessor,
} from 'nylas-exports';
/**
* This injests deltas from multiple sources. One is from local-sync, the
* other is from n1-cloud. Both sources use
* isomorphic-core/src/delta-stream-builder to generate the delta stream.
*
* In both cases we are given the JSON serialized form of a `Transaction`
* model. An example Thread delta would look like:
*
* modelDelta = {
* id: 518,
* event: "modify",
* object: "thread",
* objectId: 2887,
* changedFields: ["subject", "unread"],
* attributes: {
* id: 2887,
* object: 'thread',
* account_id: 2,
* subject: "Hello World",
* unread: true,
* ...
* }
* }
*
* An example Metadata delta would look like:
*
* metadataDelta = {
* id: 519,
* event: "create",
* object: "metadata",
* objectId: 8876,
* changedFields: ["version", "object"],
* attributes: {
* id: 8876,
* value: {link_clicks: 1},
* object: "metadata",
* version: 2,
* plugin_id: "link-tracking",
* object_id: 2887,
* object_type: "thread"
* account_id: 2,
* }
* }
*
* The `object` may be "thread", "message", "metadata", or any other model
* type we support
*/
class DeltaProcessor {
async process(rawDeltas = []) {
try {
const deltas = await this._decorateDeltas(rawDeltas);
Actions.longPollReceivedRawDeltas(deltas);
Actions.longPollReceivedRawDeltasPing(deltas.length);
const {
modelDeltas,
accountDeltas,
metadataDeltas,
} = this._extractDeltaTypes(deltas);
this._handleAccountDeltas(accountDeltas);
const models = await this._saveModels(modelDeltas);
await this._saveMetadata(metadataDeltas);
await this._notifyOfNewMessages(models.created);
this._notifyOfSyncbackRequestDeltas(models)
} catch (err) {
console.error(rawDeltas)
console.error("DeltaProcessor: Process failed.", err)
NylasEnv.reportError(err);
} finally {
Actions.longPollProcessedDeltas()
}
}
/**
* Create a (non-enumerable) reference from the attributes which we
* carry forward back to their original deltas. This allows us to
* mark the deltas that the app ignores later in the process.
*/
_decorateDeltas(rawDeltas) {
rawDeltas.forEach((delta) => {
if (!delta.attributes) return;
Object.defineProperty(delta.attributes, '_delta', {
get() { return delta; },
});
})
return rawDeltas
}
_extractDeltaTypes(rawDeltas) {
const modelDeltas = []
const accountDeltas = []
const metadataDeltas = []
rawDeltas.forEach((delta) => {
if (delta.object === "metadata") {
metadataDeltas.push(delta)
} else if (delta.object === "account") {
accountDeltas.push(delta)
} else {
modelDeltas.push(delta)
}
})
return {modelDeltas, metadataDeltas, accountDeltas}
}
_handleAccountDeltas = (accountDeltas) => {
const {modify} = this._clusterDeltas(accountDeltas);
if (!modify.account) return;
for (const accountJSON of _.values(modify.account)) {
Actions.updateAccount(accountJSON.account_id, {syncState: accountJSON.sync_state});
if (accountJSON.sync_state !== "running") {
Actions.recordUserEvent('Account Sync Errored', {
accountId: accountJSON.account_id,
syncState: accountJSON.sync_state,
});
}
}
}
_notifyOfSyncbackRequestDeltas({created, updated} = {}) {
const createdRequests = created.syncbackRequest || []
const updatedRequests = updated.syncbackRequest || []
const syncbackRequests = createdRequests.concat(updatedRequests)
if (syncbackRequests.length === 0) { return }
Actions.didReceiveSyncbackRequestDeltas(syncbackRequests)
}
async _saveModels(modelDeltas) {
const {create, modify, destroy} = this._clusterDeltas(modelDeltas);
const created = await Promise.props(_.mapObject(create, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
))
const updated = await Promise.props(_.mapObject(modify, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
));
await Promise.map(destroy, this._handleDestroyDelta);
return {created, updated};
}
async _saveMetadata(deltas) {
const all = {};
for (const delta of deltas.filter(d => d.event === 'create')) {
all[delta.attributes.object_id] = delta.attributes;
}
for (const delta of deltas.filter(d => d.event === 'modify')) {
all[delta.attributes.object_id] = delta.attributes;
}
const allByObjectType = _.groupBy(_.values(all), "object_type")
return Promise.map(Object.keys(allByObjectType), (objType) => {
const jsons = allByObjectType[objType]
const klass = NylasAPIHelpers.apiObjectToClassMap[objType];
const objectIds = jsons.map(j => j.object_id)
return DatabaseStore.inTransaction((t) => {
return this._findModelsForMetadata(t, klass, objectIds).then((modelsByObjectId) => {
const models = [];
Object.keys(modelsByObjectId).forEach((objectId) => {
const model = modelsByObjectId[objectId];
const metadataJSON = all[objectId];
const modelWithMetadata = model.applyPluginMetadata(metadataJSON.plugin_id, metadataJSON.value);
const localMetadatum = modelWithMetadata.metadataObjectForPluginId(metadataJSON.plugin_id);
localMetadatum.version = metadataJSON.version;
models.push(model);
})
return t.persistModels(models)
});
});
})
}
/**
@param ids An array of metadata object_ids
@returns A map of the object_ids to models in the database, resolving the
IDs as necessary. Must be a hashmap because the metadata object_ids may not
actually be present in the resulting models.
*/
_findModelsForMetadata(t, klass, ids) {
if (klass === Thread) {
// go through the Message table first, since local Thread IDs may be
// the (static) ID of any Message in the thread
// We prepend 't:' to thread IDs to avoid global object ID conflicts
const messageIds = ids.map(i => i.slice(2))
return t.findAll(Message, {id: messageIds}).then((messages) => {
if (messages.length !== messageIds.length) {
throw new Error(`Didn't find message for each thread. Thread IDs from remote: ${ids}`);
}
const threadIds = messages.map(m => m.threadId);
return t.findAll(Thread, {id: threadIds}).then((threads) => {
const map = {};
for (const thread of threads) {
const pluginObjectId = ids[threadIds.indexOf(thread.id)];
map[pluginObjectId] = thread;
}
return map;
});
});
}
return t.findAll(klass, {id: ids}).then((models) => {
const map = {};
for (const model of models) {
const pluginObjectId = model.id;
map[pluginObjectId] = model;
}
return map;
});
}
/**
* Group deltas by object type so we can mutate the cache efficiently.
* NOTE: This code must not just accumulate creates, modifies and
* destroys but also de-dupe them. We cannot call
* "persistModels(itemA, itemA, itemB)" or it will throw an exception
*/
_clusterDeltas(deltas) {
const create = {};
const modify = {};
const destroy = [];
for (const delta of deltas) {
if (delta.event === 'create') {
if (!create[delta.object]) { create[delta.object] = {}; }
create[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'modify') {
if (!modify[delta.object]) { modify[delta.object] = {}; }
modify[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'delete') {
destroy.push(delta);
}
}
return {create, modify, destroy};
}
async _notifyOfNewMessages(created) {
try {
await MailRulesProcessor.processMessages(created.message || [])
} catch (err) {
console.error("DeltaProcessor: Running mail rules on incoming mail failed.")
}
Actions.didPassivelyReceiveCreateDeltas(created)
}
_handleDestroyDelta(delta) {
const klass = NylasAPIHelpers.apiObjectToClassMap[delta.object];
if (!klass) { return Promise.resolve(); }
return DatabaseStore.inTransaction(t => {
return t.find(klass, delta.objectId).then((model) => {
if (!model) { return Promise.resolve(); }
return t.unpersistModel(model);
});
});
}
}
export default new DeltaProcessor()