perf(delta): replaces API delta stream with direct in-memory one

Summary:
This replaces the API delta stream with a direct in-memory one

Addresses T7300

Test Plan: manual

Reviewers: jackie, halla, juan

Reviewed By: halla

Differential Revision: https://phab.nylas.com/D3548
This commit is contained in:
Evan Morikawa 2016-12-21 18:42:52 -08:00
parent 49c61fde0c
commit c299fd9ebe
10 changed files with 126 additions and 122 deletions

View file

@ -2,9 +2,19 @@ const _ = require('underscore');
const Rx = require('rx')
const stream = require('stream');
function stringifyTransactions(db, transactions = []) {
/**
* A Transaction references objects that changed. This finds and inflates
* those objects.
*
* Resolves to an array of transactions with their `attributes` set to be
* the inflated model they reference.
*/
function inflateTransactions(db, accountId, transactions = []) {
const transactionJSONs = transactions.map((t) => (t.toJSON ? t.toJSON() : t))
transactionJSONs.forEach((t) => { t.cursor = t.id });
transactionJSONs.forEach((t) => {
t.cursor = t.id;
t.accountId = accountId;
});
const byModel = _.groupBy(transactionJSONs, "object");
const byObjectIds = _.groupBy(transactionJSONs, "objectId");
@ -22,6 +32,9 @@ function stringifyTransactions(db, transactions = []) {
where: {id: modelIds},
include: includes,
}).then((models) => {
if (models.length !== modelIds.length) {
console.error("Couldn't find a model for some IDs", modelName, modelIds, models)
}
for (const model of models) {
const transactionsForModel = byObjectIds[model.id];
for (const t of transactionsForModel) {
@ -29,19 +42,26 @@ function stringifyTransactions(db, transactions = []) {
}
}
});
})).then(() => {
})).then(() => transactionJSONs)
}
function stringifyTransactions(db, accountId, transactions = []) {
return inflateTransactions(db, accountId, transactions).then((transactionJSONs) => {
return `${transactionJSONs.map(JSON.stringify).join("\n")}\n`;
});
}
module.exports = {
buildStream(request, {databasePromise, cursor, accountId, deltasSource}) {
return databasePromise.then((db) => {
const initialSource = db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }});
function transactionsSinceCursor(db, cursor, accountId) {
return db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }});
}
module.exports = {
buildAPIStream(request, {databasePromise, cursor, accountId, deltasSource}) {
return databasePromise.then((db) => {
const initialSource = transactionsSinceCursor(db, cursor, accountId);
const source = Rx.Observable.merge(
initialSource.flatMap((t) => stringifyTransactions(db, t)),
deltasSource.flatMap((t) => stringifyTransactions(db, [t])),
initialSource.flatMap((ts) => stringifyTransactions(db, accountId, ts)),
deltasSource.flatMap((t) => stringifyTransactions(db, accountId, [t])),
Rx.Observable.interval(1000).map(() => "\n")
)
@ -54,6 +74,14 @@ module.exports = {
});
},
buildDeltaObservable({db, cursor, accountId, deltasSource}) {
const initialSource = transactionsSinceCursor(db, cursor, accountId);
return Rx.Observable.merge(
initialSource.flatMap((ts) => inflateTransactions(db, accountId, ts)),
deltasSource.flatMap((t) => inflateTransactions(db, accountId, [t]))
)
},
buildCursor({databasePromise}) {
return databasePromise.then(({Transaction}) => {
return Transaction.findOne({order: [['id', 'DESC']]}).then((t) => {

View file

@ -1,41 +0,0 @@
const Joi = require('joi');
const TransactionConnector = require('../../shared/transaction-connector')
const {DeltaStreamBuilder} = require('isomorphic-core')
module.exports = (server) => {
server.route({
method: 'GET',
path: '/delta/streaming',
config: {
validate: {
query: {
cursor: Joi.string().required(),
},
},
},
handler: (request, reply) => {
const account = request.auth.credentials;
DeltaStreamBuilder.buildStream(request, {
cursor: request.query.cursor,
accountId: account.id,
databasePromise: request.getAccountDatabase(),
deltasSource: TransactionConnector.getObservableForAccountId(account.id),
}).then((stream) => {
reply(stream)
});
},
});
server.route({
method: 'POST',
path: '/delta/latest_cursor',
handler: (request, reply) => {
DeltaStreamBuilder.buildCursor({
databasePromise: request.getAccountDatabase(),
}).then((cursor) => {
reply({cursor})
});
},
});
};

View file

@ -129,39 +129,38 @@ class FetchFolderList {
this._db = db;
const boxes = await imap.getBoxes();
const {Folder, Label, sequelize} = this._db;
const {Folder, Label} = this._db;
return sequelize.transaction(async (transaction) => {
const {folders, labels} = await PromiseUtils.props({
folders: Folder.findAll({transaction}),
labels: Label.findAll({transaction}),
})
const all = [].concat(folders, labels);
const {next, created, deleted} = this._updateCategoriesWithBoxes(all, boxes);
const {folders, labels} = await PromiseUtils.props({
folders: Folder.findAll(),
labels: Label.findAll(),
})
const all = [].concat(folders, labels);
const {next, created, deleted} = this._updateCategoriesWithBoxes(all, boxes);
const categoriesByRoles = next.reduce((obj, cat) => {
const role = this._roleByName(cat.name);
if (role in obj) {
obj[role].push(cat);
} else {
obj[role] = [cat];
}
return obj;
}, {})
const categoriesByRoles = next.reduce((obj, cat) => {
const role = this._roleByName(cat.name);
if (role in obj) {
obj[role].push(cat);
} else {
obj[role] = [cat];
}
return obj;
}, {})
this._getMissingRoles(next).forEach((role) => {
if (categoriesByRoles[role] && categoriesByRoles[role].length === 1) {
categoriesByRoles[role][0].role = role;
}
})
this._getMissingRoles(next).forEach((role) => {
if (categoriesByRoles[role] && categoriesByRoles[role].length === 1) {
categoriesByRoles[role][0].role = role;
}
})
await Promise.all([].concat(
created.map(cat => cat.save({transaction})),
deleted.map(cat => cat.destroy({transaction}))
))
for (const category of created) {
await category.save()
}
return Promise.resolve()
});
for (const category of deleted) {
await category.destroy()
}
}
}

View file

@ -45,17 +45,10 @@ class FetchMessagesInFolder {
// we just remove the category ID and UID. We may re-assign the same message
// the same UID. Otherwise they're eventually garbage collected.
const {Message} = this._db;
await this._db.sequelize.transaction((transaction) =>
Message.update({
folderImapUID: null,
folderId: null,
}, {
transaction: transaction,
where: {
folderId: this._folder.id,
},
})
)
await Message.update({
folderImapUID: null,
folderId: null,
}, {where: {folderId: this._folder.id}})
}
async _updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) {
@ -159,18 +152,10 @@ class FetchMessagesInFolder {
// removed_messages: removedUIDs.length,
// }, `FetchMessagesInFolder: found messages no longer in the folder`)
await this._db.sequelize.transaction((transaction) =>
Message.update({
folderImapUID: null,
folderId: null,
}, {
transaction,
where: {
folderId: this._folder.id,
folderImapUID: removedUIDs,
},
})
);
await Message.update({
folderImapUID: null,
folderId: null,
}, {where: {folderId: this._folder.id, folderImapUID: removedUIDs}})
}
_getDesiredMIMEParts(struct) {

View file

@ -0,0 +1,35 @@
const TransactionConnector = require('../shared/transaction-connector')
const {DeltaStreamBuilder} = require('isomorphic-core')
export default class LocalSyncDeltaEmitter {
constructor(db, accountId) {
this._db = db;
this._accountId = accountId;
NylasEnv.localSyncEmitter.on("startDeltasFor", this._startDeltasFor)
NylasEnv.localSyncEmitter.on("endDeltasFor", this._endDeltasFor)
/**
* The local-sync/sync-worker starts up asynchronously. We need to
* notify N1 client that there are more deltas it should be looking
* for.
*/
NylasEnv.localSyncEmitter.emit("refreshLocalDeltas", accountId)
}
_startDeltasFor = ({accountId, cursor}) => {
if (accountId !== this._accountId) return;
if (this._disp && this._disp.dispose) this._disp.dispose()
this._disp = DeltaStreamBuilder.buildDeltaObservable({
db: this._db,
cursor: cursor,
accountId: accountId,
deltasSource: TransactionConnector.getObservableForAccountId(accountId),
}).subscribe((deltas) => {
NylasEnv.localSyncEmitter.emit("localSyncDeltas", deltas)
})
}
_endDeltasFor = ({accountId}) => {
if (accountId !== this._accountId) return;
if (this._disp && this._disp.dispose) this._disp.dispose()
}
}

View file

@ -14,6 +14,7 @@ const FetchFolderList = require('./imap/fetch-folder-list')
const FetchMessagesInFolder = require('./imap/fetch-messages-in-folder')
const SyncbackTaskFactory = require('./syncback-task-factory')
const SyncMetricsReporter = require('./sync-metrics-reporter');
const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default
const RESTART_THRESHOLD = 10
@ -29,6 +30,7 @@ class SyncWorker {
this._interrupted = false
this._syncInProgress = false
this._syncAttemptsWhileInProgress = 0
this._localDeltas = new LocalSyncDeltaEmitter(db, account.id)
this._destroyed = false;
this._syncTimer = setTimeout(() => {

View file

@ -212,8 +212,8 @@ module.exports = (sequelize, Sequelize) => {
date: this.date ? this.date.getTime() / 1000.0 : null,
unread: this.unread,
starred: this.starred,
folder: this.folder,
labels: this.labels,
folder: this.folder.toJSON(),
labels: this.labels.map(l => l.toJSON()),
thread_id: this.threadId,
};
},

View file

@ -144,8 +144,8 @@ module.exports = (sequelize, Sequelize) => {
const response = {
id: `${this.id}`,
object: 'thread',
folders: this.folders,
labels: this.labels,
folders: this.folders.map(f => f.toJSON()),
labels: this.labels.map(l => l.toJSON()),
account_id: this.accountId,
participants: this.participants,
subject: this.subject,

View file

@ -37,21 +37,17 @@ async function extractContacts({db, message}) {
},
})
await db.sequelize.transaction(async (transaction) => {
const promises = []
for (const c of contactsDataById.values()) {
const existing = existingContacts.find(({id}) => id === c.id)
if (!existing) {
promises.push(Contact.create(c, {transaction}));
} else {
const updateRequired = (c.name !== existing.name);
if (updateRequired) {
promises.push(existing.update(c, {transaction}));
}
for (const c of contactsDataById.values()) {
const existing = existingContacts.find(({id}) => id === c.id)
if (!existing) {
await Contact.create(c)
} else {
const updateRequired = (c.name !== existing.name);
if (updateRequired) {
await existing.update(c)
}
}
await Promise.all(promises);
})
}
return message;
}

View file

@ -29,13 +29,13 @@ function collectFilesFromStruct({db, message, struct, fileIds = new Set()}) {
return collected;
}
function extractFiles({db, message, imapMessage}) {
async function extractFiles({db, message, imapMessage}) {
const {attributes: {struct}} = imapMessage
const files = collectFilesFromStruct({db, message, struct});
if (files.length > 0) {
return db.sequelize.transaction((transaction) =>
Promise.all(files.map(f => f.save({transaction})))
)
for (const file of files) {
await file.save()
}
}
return Promise.resolve()
}