mirror of
https://github.com/Foundry376/Mailspring.git
synced 2024-09-24 01:06:07 +08:00
5a2a52ebab
Summary: don't send unnecessary crap over IPC Test Plan: manual Reviewers: spang, mark, juan, halla Reviewed By: juan, halla Differential Revision: https://phab.nylas.com/D3705
290 lines
9 KiB
JavaScript
290 lines
9 KiB
JavaScript
import _ from 'underscore';
|
|
import {
|
|
Actions,
|
|
Thread,
|
|
Message,
|
|
NylasAPIHelpers,
|
|
DatabaseStore,
|
|
MailRulesProcessor,
|
|
} from 'nylas-exports';
|
|
|
|
/**
|
|
* This injests deltas from multiple sources. One is from local-sync, the
|
|
* other is from n1-cloud. Both sources use
|
|
* isomorphic-core/src/delta-stream-builder to generate the delta stream.
|
|
*
|
|
* In both cases we are given the JSON serialized form of a `Transaction`
|
|
* model. An example Thread delta would look like:
|
|
*
|
|
* modelDelta = {
|
|
* id: 518,
|
|
* event: "modify",
|
|
* object: "thread",
|
|
* objectId: 2887,
|
|
* changedFields: ["subject", "unread"],
|
|
* attributes: {
|
|
* id: 2887,
|
|
* object: 'thread',
|
|
* account_id: 2,
|
|
* subject: "Hello World",
|
|
* unread: true,
|
|
* ...
|
|
* }
|
|
* }
|
|
*
|
|
* An example Metadata delta would look like:
|
|
*
|
|
* metadataDelta = {
|
|
* id: 519,
|
|
* event: "create",
|
|
* object: "metadata",
|
|
* objectId: 8876,
|
|
* changedFields: ["version", "object"],
|
|
* attributes: {
|
|
* id: 8876,
|
|
* value: {link_clicks: 1},
|
|
* object: "metadata",
|
|
* version: 2,
|
|
* plugin_id: "link-tracking",
|
|
* object_id: 2887,
|
|
* object_type: "thread"
|
|
* account_id: 2,
|
|
* }
|
|
* }
|
|
*
|
|
* The `object` may be "thread", "message", "metadata", or any other model
|
|
* type we support
|
|
*/
|
|
class DeltaProcessor {
|
|
constructor() {
|
|
this.activationTime = Date.now()
|
|
}
|
|
|
|
async process(rawDeltas = []) {
|
|
try {
|
|
const deltas = await this._decorateDeltas(rawDeltas);
|
|
Actions.longPollReceivedRawDeltas(deltas);
|
|
|
|
const {
|
|
modelDeltas,
|
|
accountDeltas,
|
|
metadataDeltas,
|
|
} = this._extractDeltaTypes(deltas);
|
|
this._handleAccountDeltas(accountDeltas);
|
|
|
|
const models = await this._saveModels(modelDeltas);
|
|
await this._saveMetadata(metadataDeltas);
|
|
await this._notifyOfNewMessages(models.created);
|
|
this._notifyOfSyncbackRequestDeltas(models)
|
|
} catch (err) {
|
|
console.error(rawDeltas)
|
|
console.error("DeltaProcessor: Process failed.", err)
|
|
NylasEnv.reportError(err);
|
|
} finally {
|
|
Actions.longPollProcessedDeltas()
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a (non-enumerable) reference from the attributes which we
|
|
* carry forward back to their original deltas. This allows us to
|
|
* mark the deltas that the app ignores later in the process.
|
|
*/
|
|
_decorateDeltas(rawDeltas) {
|
|
rawDeltas.forEach((delta) => {
|
|
if (!delta.attributes) return;
|
|
Object.defineProperty(delta.attributes, '_delta', {
|
|
get() { return delta; },
|
|
});
|
|
})
|
|
return rawDeltas
|
|
}
|
|
|
|
_extractDeltaTypes(rawDeltas) {
|
|
const modelDeltas = []
|
|
const accountDeltas = []
|
|
const metadataDeltas = []
|
|
rawDeltas.forEach((delta) => {
|
|
if (delta.object === "metadata") {
|
|
metadataDeltas.push(delta)
|
|
} else if (delta.object === "account") {
|
|
accountDeltas.push(delta)
|
|
} else {
|
|
modelDeltas.push(delta)
|
|
}
|
|
})
|
|
return {modelDeltas, metadataDeltas, accountDeltas}
|
|
}
|
|
|
|
_handleAccountDeltas = (accountDeltas) => {
|
|
const {modify} = this._clusterDeltas(accountDeltas);
|
|
if (!modify.account) return;
|
|
for (const accountJSON of _.values(modify.account)) {
|
|
Actions.updateAccount(accountJSON.account_id, {syncState: accountJSON.sync_state});
|
|
if (accountJSON.sync_state !== "running") {
|
|
Actions.recordUserEvent('Account Sync Errored', {
|
|
accountId: accountJSON.account_id,
|
|
syncState: accountJSON.sync_state,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
_notifyOfSyncbackRequestDeltas({created, updated} = {}) {
|
|
const createdRequests = created.syncbackRequest || []
|
|
const updatedRequests = updated.syncbackRequest || []
|
|
const syncbackRequests = createdRequests.concat(updatedRequests)
|
|
if (syncbackRequests.length === 0) { return }
|
|
|
|
Actions.didReceiveSyncbackRequestDeltas(syncbackRequests)
|
|
}
|
|
|
|
async _saveModels(modelDeltas) {
|
|
const {create, modify, destroy} = this._clusterDeltas(modelDeltas);
|
|
|
|
const created = await Promise.props(_.mapObject(create, (val) =>
|
|
NylasAPIHelpers.handleModelResponse(_.values(val))
|
|
))
|
|
|
|
const updated = await Promise.props(_.mapObject(modify, (val) =>
|
|
NylasAPIHelpers.handleModelResponse(_.values(val))
|
|
));
|
|
|
|
await Promise.map(destroy, this._handleDestroyDelta);
|
|
|
|
return {created, updated};
|
|
}
|
|
|
|
async _saveMetadata(deltas) {
|
|
const all = {};
|
|
|
|
for (const delta of deltas.filter(d => d.event === 'create')) {
|
|
all[delta.attributes.object_id] = delta.attributes;
|
|
}
|
|
for (const delta of deltas.filter(d => d.event === 'modify')) {
|
|
all[delta.attributes.object_id] = delta.attributes;
|
|
}
|
|
const allByObjectType = _.groupBy(_.values(all), "object_type")
|
|
|
|
return Promise.map(Object.keys(allByObjectType), (objType) => {
|
|
const jsons = allByObjectType[objType]
|
|
const klass = NylasAPIHelpers.apiObjectToClassMap[objType];
|
|
const objectIds = jsons.map(j => j.object_id)
|
|
|
|
return DatabaseStore.inTransaction((t) => {
|
|
return this._findModelsForMetadata(t, klass, objectIds).then((modelsByObjectId) => {
|
|
const models = [];
|
|
Object.keys(modelsByObjectId).forEach((objectId) => {
|
|
const model = modelsByObjectId[objectId];
|
|
const metadataJSON = all[objectId];
|
|
const modelWithMetadata = model.applyPluginMetadata(metadataJSON.plugin_id, metadataJSON.value);
|
|
const localMetadatum = modelWithMetadata.metadataObjectForPluginId(metadataJSON.plugin_id);
|
|
localMetadatum.version = metadataJSON.version;
|
|
models.push(model);
|
|
})
|
|
return t.persistModels(models)
|
|
});
|
|
});
|
|
})
|
|
}
|
|
|
|
/**
|
|
@param ids An array of metadata object_ids
|
|
@returns A map of the object_ids to models in the database, resolving the
|
|
IDs as necessary. Must be a hashmap because the metadata object_ids may not
|
|
actually be present in the resulting models.
|
|
*/
|
|
_findModelsForMetadata(t, klass, ids) {
|
|
if (klass === Thread) {
|
|
// go through the Message table first, since local Thread IDs may be
|
|
// the (static) ID of any Message in the thread
|
|
// We prepend 't:' to thread IDs to avoid global object ID conflicts
|
|
const messageIds = ids.map(i => i.slice(2))
|
|
return t.findAll(Message, {id: messageIds}).then((messages) => {
|
|
if (messages.length !== messageIds.length) {
|
|
throw new Error(`Didn't find message for each thread. Thread IDs from remote: ${ids}`);
|
|
}
|
|
const threadIds = messages.map(m => m.threadId);
|
|
return t.findAll(Thread, {id: threadIds}).then((threads) => {
|
|
const map = {};
|
|
for (const thread of threads) {
|
|
const pluginObjectId = ids[threadIds.indexOf(thread.id)];
|
|
map[pluginObjectId] = thread;
|
|
}
|
|
return map;
|
|
});
|
|
});
|
|
}
|
|
return t.findAll(klass, {id: ids}).then((models) => {
|
|
const map = {};
|
|
for (const model of models) {
|
|
const pluginObjectId = model.id;
|
|
map[pluginObjectId] = model;
|
|
}
|
|
return map;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Group deltas by object type so we can mutate the cache efficiently.
|
|
* NOTE: This code must not just accumulate creates, modifies and
|
|
* destroys but also de-dupe them. We cannot call
|
|
* "persistModels(itemA, itemA, itemB)" or it will throw an exception
|
|
*/
|
|
_clusterDeltas(deltas) {
|
|
const create = {};
|
|
const modify = {};
|
|
const destroy = [];
|
|
for (const delta of deltas) {
|
|
if (delta.event === 'create') {
|
|
if (!create[delta.object]) { create[delta.object] = {}; }
|
|
create[delta.object][delta.attributes.id] = delta.attributes;
|
|
} else if (delta.event === 'modify') {
|
|
if (!modify[delta.object]) { modify[delta.object] = {}; }
|
|
modify[delta.object][delta.attributes.id] = delta.attributes;
|
|
} else if (delta.event === 'delete') {
|
|
destroy.push(delta);
|
|
}
|
|
}
|
|
|
|
return {create, modify, destroy};
|
|
}
|
|
|
|
async _notifyOfNewMessages(created) {
|
|
const incomingMessages = created.message || [];
|
|
|
|
// Filter for new messages that are not sent by the current user
|
|
const newUnread = incomingMessages.filter((msg) => {
|
|
const isUnread = msg.unread === true;
|
|
const isNew = msg.date && msg.date.valueOf() >= this.activationTime;
|
|
const isFromMe = msg.isFromMe();
|
|
return isUnread && isNew && !isFromMe;
|
|
});
|
|
|
|
if (newUnread.length === 0) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await MailRulesProcessor.processMessages(created.message || [])
|
|
} catch (err) {
|
|
console.error("DeltaProcessor: Running mail rules on incoming mail failed.")
|
|
}
|
|
Actions.onNewMailDeltas(created)
|
|
}
|
|
|
|
_handleDestroyDelta(delta) {
|
|
const klass = NylasAPIHelpers.apiObjectToClassMap[delta.object];
|
|
if (!klass) { return Promise.resolve(); }
|
|
|
|
return DatabaseStore.inTransaction(t => {
|
|
return t.find(klass, delta.objectId).then((model) => {
|
|
if (!model) { return Promise.resolve(); }
|
|
return t.unpersistModel(model);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
export default new DeltaProcessor()
|