Remove DeltaProcessor, JS side will not write to database

This commit is contained in:
Ben Gotow 2017-06-27 11:34:20 -07:00
parent efbea58e1e
commit f6340d5555
5 changed files with 1 additions and 573 deletions

View file

@ -1,231 +0,0 @@
_ = require 'underscore'
fs = require 'fs'
path = require 'path'
{NylasAPIHelpers,
Thread,
DatabaseWriter,
Actions,
Message,
Thread} = require 'nylas-exports'
DeltaProcessor = require('../../src/services/delta-processor').default
fixturesPath = path.resolve(__dirname, '..', 'fixtures')
describe "DeltaProcessor", ->
describe "handleDeltas", ->
beforeEach ->
@sampleDeltas = JSON.parse(fs.readFileSync("#{fixturesPath}/sample-deltas.json"))
@sampleClustered = JSON.parse(fs.readFileSync("#{fixturesPath}/sample-deltas-clustered.json"))
it "should immediately fire the received raw deltas event", ->
spyOn(Actions, 'longPollReceivedRawDeltas')
spyOn(DeltaProcessor, '_clusterDeltas').andReturn({create: {}, modify: {}, destroy: []})
waitsForPromise ->
DeltaProcessor.process(@sampleDeltas, {source: 'n1Cloud'})
runs ->
expect(Actions.longPollReceivedRawDeltas).toHaveBeenCalled()
xit "should call helper methods for all creates first, then modifications, then destroys", ->
spyOn(Actions, 'longPollProcessedDeltas')
handleDeltaDeletionPromises = []
resolveDeltaDeletionPromises = ->
fn() for fn in handleDeltaDeletionPromises
handleDeltaDeletionPromises = []
spyOn(DeltaProcessor, '_handleDestroyDelta').andCallFake ->
new Promise (resolve, reject) ->
handleDeltaDeletionPromises.push(resolve)
handleModelResponsePromises = []
resolveModelResponsePromises = ->
fn() for fn in handleModelResponsePromises
handleModelResponsePromises = []
spyOn(NylasAPIHelpers, 'handleModelResponse').andCallFake ->
new Promise (resolve, reject) ->
handleModelResponsePromises.push(resolve)
spyOn(DeltaProcessor, '_clusterDeltas').andReturn(JSON.parse(JSON.stringify(@sampleClustered)))
DeltaProcessor.process(@sampleDeltas)
createTypes = Object.keys(@sampleClustered['create'])
expect(NylasAPIHelpers.handleModelResponse.calls.length).toEqual(createTypes.length)
expect(NylasAPIHelpers.handleModelResponse.calls[0].args[0]).toEqual(_.values(@sampleClustered['create'][createTypes[0]]))
expect(DeltaProcessor._handleDestroyDelta.calls.length).toEqual(0)
NylasAPIHelpers.handleModelResponse.reset()
resolveModelResponsePromises()
advanceClock()
modifyTypes = Object.keys(@sampleClustered['modify'])
expect(NylasAPIHelpers.handleModelResponse.calls.length).toEqual(modifyTypes.length)
expect(NylasAPIHelpers.handleModelResponse.calls[0].args[0]).toEqual(_.values(@sampleClustered['modify'][modifyTypes[0]]))
expect(DeltaProcessor._handleDestroyDelta.calls.length).toEqual(0)
NylasAPIHelpers.handleModelResponse.reset()
resolveModelResponsePromises()
advanceClock()
destroyCount = @sampleClustered['destroy'].length
expect(DeltaProcessor._handleDestroyDelta.calls.length).toEqual(destroyCount)
expect(DeltaProcessor._handleDestroyDelta.calls[0].args[0]).toEqual(@sampleClustered['destroy'][0])
expect(Actions.longPollProcessedDeltas).not.toHaveBeenCalled()
resolveDeltaDeletionPromises()
advanceClock()
expect(Actions.longPollProcessedDeltas).toHaveBeenCalled()
describe "clusterDeltas", ->
beforeEach ->
@sampleDeltas = JSON.parse(fs.readFileSync("#{fixturesPath}/sample-deltas.json"))
@expectedClustered = JSON.parse(fs.readFileSync("#{fixturesPath}/sample-deltas-clustered.json"))
it "should collect create/modify events into a hash by model type", ->
{create, modify} = DeltaProcessor._clusterDeltas(@sampleDeltas)
expect(create).toEqual(@expectedClustered.create)
expect(modify).toEqual(@expectedClustered.modify)
it "should collect destroys into an array", ->
{destroy} = DeltaProcessor._clusterDeltas(@sampleDeltas)
expect(destroy).toEqual(@expectedClustered.destroy)
describe "handleDeltaDeletion", ->
beforeEach ->
@thread = new Thread(id: 'idhere')
@delta =
"cursor": "bb95ddzqtr2gpmvgrng73t6ih",
"object": "thread",
"event": "delete",
"objectId": @thread.id,
"timestamp": "2015-08-26T17:36:45.297Z"
spyOn(DatabaseWriter.prototype, 'unpersistModel')
it "should resolve if the object cannot be found", ->
spyOn(DatabaseWriter.prototype, 'find').andCallFake (klass, id) =>
return Promise.resolve(null)
waitsForPromise =>
DeltaProcessor._handleDestroyDelta(@delta)
runs =>
expect(DatabaseWriter.prototype.find).toHaveBeenCalledWith(Thread, 'idhere')
expect(DatabaseWriter.prototype.unpersistModel).not.toHaveBeenCalled()
it "should call unpersistModel if the object exists", ->
spyOn(DatabaseWriter.prototype, 'find').andCallFake (klass, id) =>
return Promise.resolve(@thread)
waitsForPromise =>
DeltaProcessor._handleDestroyDelta(@delta)
runs =>
expect(DatabaseWriter.prototype.find).toHaveBeenCalledWith(Thread, 'idhere')
expect(DatabaseWriter.prototype.unpersistModel).toHaveBeenCalledWith(@thread)
describe "handleModelResponse", ->
# SEE spec/nylas-api-spec.coffee
describe "receives metadata deltas", ->
beforeEach ->
@stubDB = {}
spyOn(DatabaseWriter.prototype, 'find').andCallFake (klass, id) =>
return @stubDB[id]
spyOn(DatabaseWriter.prototype, 'findAll').andCallFake (klass, where) =>
ids = where.id
models = []
ids.forEach (id) =>
model = @stubDB[id]
if model
models.push(model)
return models
spyOn(DatabaseWriter.prototype, 'persistModels').andCallFake (models) =>
models.forEach (model) =>
@stubDB[model.id] = model
return Promise.resolve()
@messageMetadataDelta =
id: 519,
event: "create",
object: "metadata",
objectId: 8876,
changedFields: ["version", "object"],
attributes:
id: 8876,
value: {link_clicks: 1},
object: "metadata",
version: 2,
plugin_id: "link-tracking",
object_id: '2887',
object_type: "message",
account_id: 2
@threadMetadataDelta =
id: 392,
event: "create",
object: "metadata",
objectId: 3845,
changedFields: ["version", "object"],
attributes:
id: 3845,
value: {shouldNotify: true},
object: "metadata",
version: 2,
plugin_id: "send-reminders",
object_id: 't:3984',
object_type: "thread"
account_id: 2,
it "saves metadata to existing Messages", ->
message = new Message({id: @messageMetadataDelta.attributes.object_id})
@stubDB[message.id] = message
waitsForPromise =>
DeltaProcessor.process([@messageMetadataDelta])
runs ->
message = @stubDB[message.id] # refresh reference
expect(message.pluginMetadata.length).toEqual(1)
expect(message.metadataForPluginId('link-tracking')).toEqual({link_clicks: 1})
it "saves metadata to existing Threads", ->
thread = new Thread({id: @threadMetadataDelta.attributes.object_id})
@stubDB[thread.id] = thread
waitsForPromise =>
DeltaProcessor.process([@threadMetadataDelta])
runs ->
thread = @stubDB[thread.id] # refresh reference
expect(thread.pluginMetadata.length).toEqual(1)
expect(thread.metadataForPluginId('send-reminders')).toEqual({shouldNotify: true})
it "knows how to reconcile different thread ids", ->
thread = new Thread({id: 't:1948'})
@stubDB[thread.id] = thread
message = new Message({
id: @threadMetadataDelta.attributes.object_id.substring(2),
threadId: thread.id
})
@stubDB[message.id] = message
waitsForPromise =>
DeltaProcessor.process([@threadMetadataDelta])
runs ->
thread = @stubDB[thread.id] # refresh reference
expect(thread.pluginMetadata.length).toEqual(1)
expect(thread.metadataForPluginId('send-reminders')).toEqual({shouldNotify: true})
it "creates ghost Messages if necessary", ->
waitsForPromise =>
DeltaProcessor.process([@messageMetadataDelta])
runs ->
message = @stubDB[@messageMetadataDelta.attributes.object_id]
expect(message).toBeDefined()
expect(message.pluginMetadata.length).toEqual(1)
expect(message.metadataForPluginId('link-tracking')).toEqual({link_clicks: 1})
it "creates ghost Threads if necessary", ->
waitsForPromise =>
DeltaProcessor.process([@threadMetadataDelta])
runs ->
thread = @stubDB[@threadMetadataDelta.attributes.object_id]
expect(thread.pluginMetadata.length).toEqual(1)
expect(thread.metadataForPluginId('send-reminders')).toEqual({shouldNotify: true})

View file

@ -102,8 +102,6 @@ class Actions {
*Scope: Main Window*
*/
static longPollReceivedRawDeltas = ActionScopeMainWindow;
static longPollProcessedDeltas = ActionScopeMainWindow;
static willMakeAPIRequest = ActionScopeMainWindow;
static didMakeAPIRequest = ActionScopeMainWindow;
static checkOnlineStatus = ActionScopeWindow;
@ -111,13 +109,6 @@ class Actions {
static wakeLocalSyncWorkerForAccount = ActionScopeGlobal;
/*
Public: Retry the initial sync
*Scope: Global*
*/
static retryDeltaConnection = ActionScopeGlobal;
/*
Public: Open the preferences view.

View file

@ -284,7 +284,7 @@ class AccountStore extends NylasStore {
account.syncState = Account.SYNC_STATE_RUNNING
account.fromJSON(json)
// Restart the connection in case account credentials have changed
Actions.retryDeltaConnection()
// todo bg
}
this._save()

View file

@ -197,7 +197,6 @@ lazyLoad(`SoundRegistry`, 'registries/sound-registry');
lazyLoad(`MailRulesTemplates`, 'mail-rules-templates');
lazyLoad(`MailRulesProcessor`, 'mail-rules-processor');
lazyLoad(`MailboxPerspective`, 'mailbox-perspective');
lazyLoad(`DeltaProcessor`, 'services/delta-processor');
lazyLoad(`NativeNotifications`, 'native-notifications');
lazyLoad(`SanitizeTransformer`, 'services/sanitize-transformer');
lazyLoad(`QuotedHTMLTransformer`, 'services/quoted-html-transformer');

View file

@ -1,331 +0,0 @@
import _ from 'underscore';
import Actions from '../flux/actions'
import Thread from '../flux/models/thread'
import Message from '../flux/models/message'
import DatabaseStore from '../flux/stores/database-store'
import * as NylasAPIHelpers from '../flux/nylas-api-helpers'
/**
* This injests deltas from multiple sources. One is from local-sync, the
* other is from n1-cloud. Both sources use delta-stream-builder to generate the delta stream.
*
* In both cases we are given the JSON serialized form of a `Transaction`
* model. An example Thread delta would look like:
*
* modelDelta = {
* id: 518,
* event: "modify",
* object: "thread",
* objectId: 2887,
* changedFields: ["subject", "unread"],
* attributes: {
* id: 2887,
* object: 'thread',
* account_id: 2,
* subject: "Hello World",
* unread: true,
* ...
* }
* }
*
* An example Metadata delta would look like:
*
* metadataDelta = {
* id: 519,
* event: "create",
* object: "metadata",
* objectId: 8876,
* changedFields: ["version", "object"],
* attributes: {
* id: 8876,
* value: {link_clicks: 1},
* object: "metadata",
* version: 2,
* plugin_id: "link-tracking",
* object_id: 2887,
* object_type: "thread"
* account_id: 2,
* }
* }
*
* The `object` may be "thread", "message", "metadata", or any other model
* type we support
*/
class DeltaProcessor {
constructor() {
this.activationTime = Date.now()
}
async process(rawDeltas = [], {source} = {}) {
try {
const deltas = await this._decorateDeltas(rawDeltas);
if (source === "n1Cloud") {
Actions.longPollReceivedRawDeltas(deltas);
}
const {
modelDeltas,
accountDeltas,
metadataDeltas,
} = this._extractDeltaTypes(deltas);
this._handleAccountDeltas(accountDeltas);
const models = await this._saveModels(modelDeltas);
await this._saveMetadata(metadataDeltas);
await this._notifyOfNewMessages(models.created);
this._notifyOfSyncbackRequestDeltas(models)
} catch (err) {
console.error(rawDeltas)
console.error("DeltaProcessor: Process failed.", err)
NylasEnv.reportError(err);
} finally {
Actions.longPollProcessedDeltas()
}
}
/**
* Create a (non-enumerable) reference from the attributes which we
* carry forward back to their original deltas. This allows us to
* mark the deltas that the app ignores later in the process.
*/
_decorateDeltas(rawDeltas) {
rawDeltas.forEach((delta) => {
if (!delta.attributes) return;
Object.defineProperty(delta.attributes, '_delta', {
configurable: true,
get() { return delta; },
});
})
return rawDeltas
}
_extractDeltaTypes(rawDeltas) {
const modelDeltas = []
const accountDeltas = []
const metadataDeltas = []
rawDeltas.forEach((delta) => {
if (delta.object === "metadata") {
metadataDeltas.push(delta)
} else if (delta.object === "account") {
accountDeltas.push(delta)
} else {
modelDeltas.push(delta)
}
})
return {modelDeltas, metadataDeltas, accountDeltas}
}
_handleAccountDeltas = (accountDeltas) => {
const {modify} = this._clusterDeltas(accountDeltas);
if (!modify.account) return;
for (const accountJSON of _.values(modify.account)) {
Actions.updateAccount(accountJSON.account_id, {syncState: accountJSON.sync_state});
if (accountJSON.sync_state !== "running") {
Actions.recordUserEvent('Account Sync Errored', {
accountId: accountJSON.account_id,
syncState: accountJSON.sync_state,
});
}
}
}
_notifyOfSyncbackRequestDeltas({created, updated} = {}) {
const createdRequests = created.syncbackRequest || []
const updatedRequests = updated.syncbackRequest || []
const syncbackRequests = createdRequests.concat(updatedRequests)
if (syncbackRequests.length === 0) { return }
Actions.didReceiveSyncbackRequestDeltas(syncbackRequests)
}
async _saveModels(modelDeltas) {
const {create, modify, destroy} = this._clusterDeltas(modelDeltas);
const created = await Promise.props(_.mapObject(create, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
))
const updated = await Promise.props(_.mapObject(modify, (val) =>
NylasAPIHelpers.handleModelResponse(_.values(val))
));
await Promise.map(destroy, this._handleDestroyDelta);
return {created, updated};
}
async _saveMetadata(deltas) {
const all = {};
for (const delta of deltas.filter(d => d.event === 'create')) {
all[delta.attributes.object_id] = delta.attributes;
}
for (const delta of deltas.filter(d => d.event === 'modify')) {
all[delta.attributes.object_id] = delta.attributes;
}
const allByObjectType = _.groupBy(_.values(all), "object_type")
return Promise.map(Object.keys(allByObjectType), (objType) => {
const jsons = allByObjectType[objType]
const Klass = NylasAPIHelpers.apiObjectToClassMap[objType];
const objectIds = jsons.map(j => j.object_id)
return DatabaseStore.inTransaction((t) => {
return this._findOrCreateModelsForMetadata(t, Klass, objectIds).then((modelsByObjectId) => {
const models = [];
Object.keys(modelsByObjectId).forEach((objectId) => {
const model = modelsByObjectId[objectId];
const metadataJSON = all[objectId];
const modelWithMetadata = model.applyPluginMetadata(metadataJSON.plugin_id, metadataJSON.value);
const localMetadatum = modelWithMetadata.metadataObjectForPluginId(metadataJSON.plugin_id);
localMetadatum.version = metadataJSON.version;
models.push(modelWithMetadata);
})
return t.persistModels(models)
});
});
})
}
/**
@param ids An array of metadata object_ids that correspond to threads
@returns A map of the object_ids to threads in the database, resolving the
IDs as necessary. If the thread does not yet exist, we create a ghost thread that
contains only the id and the metadata.
*/
async _findOrCreateThreadsForMetadata(t, ids) {
// Since threads can have different ids, we need to find the equivalent threads
// through their messages. First, find the messages that correspond to the thread
// ids (which are of the format `t:${messageId}`)
const messageIds = ids.map(i => i.substring(2))
const messages = await t.findAll(Message, {id: messageIds})
// Create a map of which local thread ids are equivalent to the ids from the server
const localIdToRemoteId = {}
messages.forEach(msg => { localIdToRemoteId[msg.threadId] = `t:${msg.id}` })
// Then map the actual thread models to the server ids
const threads = await t.findAll(Thread, {id: Object.keys(localIdToRemoteId)})
const map = {};
for (const thread of threads) {
const pluginObjectId = localIdToRemoteId[thread.id]
map[pluginObjectId] = thread;
}
// Create ghost models for threads that we haven't synced yet
const missingIds = ids.filter(id => !map[id])
const newThreads = [];
const newMessages = [];
missingIds.forEach(id => {
// Build both the thread and corresponding message. We won't be able to find
// the ghost thread if the message with the corresponding id doesn't exist.
const thread = new Thread();
thread.id = id;
thread.categories = [];
const message = new Message();
message.id = id.substring(2);
message.threadId = id;
map[id] = thread;
newThreads.push(thread);
newMessages.push(message);
})
if (newThreads.length > 0) {
await t.persistModels(newThreads);
await t.persistModels(newMessages);
}
return map;
}
/**
@param ids An array of metadata object_ids
@returns A map of the object_ids to models in the database, resolving the
IDs as necessary. If the model does not yet exist, we create a ghost model that
contains only the id and the metadata.
*/
async _findOrCreateModelsForMetadata(t, Klass, ids) {
if (Klass === Thread) {
return this._findOrCreateThreadsForMetadata(t, ids)
}
const models = await t.findAll(Klass, {id: ids})
const map = {};
for (const model of models) {
const pluginObjectId = model.id;
map[pluginObjectId] = model;
}
// Build ghost models for objects we haven't synced yet
const missingIds = ids.filter(id => !map[id])
const instances = []
missingIds.forEach(id => {
const klass = new Klass({id: id})
map[id] = klass
instances.push(klass)
})
if (instances.length > 0) {
await t.persistModels(instances)
}
return map;
}
/**
* Group deltas by object type so we can mutate the cache efficiently.
* NOTE: This code must not just accumulate creates, modifies and
* destroys but also de-dupe them. We cannot call
* "persistModels(itemA, itemA, itemB)" or it will throw an exception
*/
_clusterDeltas(deltas) {
const create = {};
const modify = {};
const destroy = [];
for (const delta of deltas) {
if (delta.event === 'create') {
if (!create[delta.object]) { create[delta.object] = {}; }
create[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'modify') {
if (!modify[delta.object]) { modify[delta.object] = {}; }
modify[delta.object][delta.attributes.id] = delta.attributes;
} else if (delta.event === 'delete') {
destroy.push(delta);
}
}
return {create, modify, destroy};
}
async _notifyOfNewMessages(created) {
const incomingMessages = created.message || [];
// Filter for new messages that are not sent by the current user
const newUnread = incomingMessages.filter((msg) => {
const isUnread = msg.unread === true;
const isNew = msg.date && msg.date.valueOf() >= this.activationTime;
const isFromMe = msg.isFromMe();
return isUnread && isNew && !isFromMe;
});
if (newUnread.length === 0) {
return;
}
Actions.onNewMailDeltas(created)
}
_handleDestroyDelta(delta) {
const klass = NylasAPIHelpers.apiObjectToClassMap[delta.object];
if (!klass) { return Promise.resolve(); }
return DatabaseStore.inTransaction(t => {
return t.find(klass, delta.objectId).then((model) => {
if (!model) { return Promise.resolve(); }
return t.unpersistModel(model);
});
});
}
}
export default new DeltaProcessor()