diff --git a/packages/isomorphic-core/src/delta-stream-builder.js b/packages/isomorphic-core/src/delta-stream-builder.js index 12f4f8a75..343f35050 100644 --- a/packages/isomorphic-core/src/delta-stream-builder.js +++ b/packages/isomorphic-core/src/delta-stream-builder.js @@ -2,9 +2,19 @@ const _ = require('underscore'); const Rx = require('rx') const stream = require('stream'); -function stringifyTransactions(db, transactions = []) { +/** + * A Transaction references objects that changed. This finds and inflates + * those objects. + * + * Resolves to an array of transactions with their `attributes` set to be + * the inflated model they reference. + */ +function inflateTransactions(db, accountId, transactions = []) { const transactionJSONs = transactions.map((t) => (t.toJSON ? t.toJSON() : t)) - transactionJSONs.forEach((t) => { t.cursor = t.id }); + transactionJSONs.forEach((t) => { + t.cursor = t.id; + t.accountId = accountId; + }); const byModel = _.groupBy(transactionJSONs, "object"); const byObjectIds = _.groupBy(transactionJSONs, "objectId"); @@ -22,6 +32,9 @@ function stringifyTransactions(db, transactions = []) { where: {id: modelIds}, include: includes, }).then((models) => { + if (models.length !== modelIds.length) { + console.error("Couldn't find a model for some IDs", modelName, modelIds, models) + } for (const model of models) { const transactionsForModel = byObjectIds[model.id]; for (const t of transactionsForModel) { @@ -29,19 +42,26 @@ function stringifyTransactions(db, transactions = []) { } } }); - })).then(() => { + })).then(() => transactionJSONs) +} + +function stringifyTransactions(db, accountId, transactions = []) { + return inflateTransactions(db, accountId, transactions).then((transactionJSONs) => { return `${transactionJSONs.map(JSON.stringify).join("\n")}\n`; }); } -module.exports = { - buildStream(request, {databasePromise, cursor, accountId, deltasSource}) { - return databasePromise.then((db) => { - const initialSource = db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }}); +function transactionsSinceCursor(db, cursor, accountId) { + return db.Transaction.streamAll({where: { id: {$gt: cursor}, accountId }}); +} +module.exports = { + buildAPIStream(request, {databasePromise, cursor, accountId, deltasSource}) { + return databasePromise.then((db) => { + const initialSource = transactionsSinceCursor(db, cursor, accountId); const source = Rx.Observable.merge( - initialSource.flatMap((t) => stringifyTransactions(db, t)), - deltasSource.flatMap((t) => stringifyTransactions(db, [t])), + initialSource.flatMap((ts) => stringifyTransactions(db, accountId, ts)), + deltasSource.flatMap((t) => stringifyTransactions(db, accountId, [t])), Rx.Observable.interval(1000).map(() => "\n") ) @@ -54,6 +74,14 @@ module.exports = { }); }, + buildDeltaObservable({db, cursor, accountId, deltasSource}) { + const initialSource = transactionsSinceCursor(db, cursor, accountId); + return Rx.Observable.merge( + initialSource.flatMap((ts) => inflateTransactions(db, accountId, ts)), + deltasSource.flatMap((t) => inflateTransactions(db, accountId, [t])) + ) + }, + buildCursor({databasePromise}) { return databasePromise.then(({Transaction}) => { return Transaction.findOne({order: [['id', 'DESC']]}).then((t) => { diff --git a/packages/local-sync/src/local-api/routes/delta.js b/packages/local-sync/src/local-api/routes/delta.js deleted file mode 100644 index 2fdbfd1a4..000000000 --- a/packages/local-sync/src/local-api/routes/delta.js +++ /dev/null @@ -1,41 +0,0 @@ -const Joi = require('joi'); -const TransactionConnector = require('../../shared/transaction-connector') -const {DeltaStreamBuilder} = require('isomorphic-core') - -module.exports = (server) => { - server.route({ - method: 'GET', - path: '/delta/streaming', - config: { - validate: { - query: { - cursor: Joi.string().required(), - }, - }, - }, - handler: (request, reply) => { - const account = request.auth.credentials; - - DeltaStreamBuilder.buildStream(request, { - cursor: request.query.cursor, - accountId: account.id, - databasePromise: request.getAccountDatabase(), - deltasSource: TransactionConnector.getObservableForAccountId(account.id), - }).then((stream) => { - reply(stream) - }); - }, - }); - - server.route({ - method: 'POST', - path: '/delta/latest_cursor', - handler: (request, reply) => { - DeltaStreamBuilder.buildCursor({ - databasePromise: request.getAccountDatabase(), - }).then((cursor) => { - reply({cursor}) - }); - }, - }); -}; diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js b/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js index d1ab8af3d..d9c46300d 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-folder-list.js @@ -129,39 +129,38 @@ class FetchFolderList { this._db = db; const boxes = await imap.getBoxes(); - const {Folder, Label, sequelize} = this._db; + const {Folder, Label} = this._db; - return sequelize.transaction(async (transaction) => { - const {folders, labels} = await PromiseUtils.props({ - folders: Folder.findAll({transaction}), - labels: Label.findAll({transaction}), - }) - const all = [].concat(folders, labels); - const {next, created, deleted} = this._updateCategoriesWithBoxes(all, boxes); + const {folders, labels} = await PromiseUtils.props({ + folders: Folder.findAll(), + labels: Label.findAll(), + }) + const all = [].concat(folders, labels); + const {next, created, deleted} = this._updateCategoriesWithBoxes(all, boxes); - const categoriesByRoles = next.reduce((obj, cat) => { - const role = this._roleByName(cat.name); - if (role in obj) { - obj[role].push(cat); - } else { - obj[role] = [cat]; - } - return obj; - }, {}) + const categoriesByRoles = next.reduce((obj, cat) => { + const role = this._roleByName(cat.name); + if (role in obj) { + obj[role].push(cat); + } else { + obj[role] = [cat]; + } + return obj; + }, {}) - this._getMissingRoles(next).forEach((role) => { - if (categoriesByRoles[role] && categoriesByRoles[role].length === 1) { - categoriesByRoles[role][0].role = role; - } - }) + this._getMissingRoles(next).forEach((role) => { + if (categoriesByRoles[role] && categoriesByRoles[role].length === 1) { + categoriesByRoles[role][0].role = role; + } + }) - await Promise.all([].concat( - created.map(cat => cat.save({transaction})), - deleted.map(cat => cat.destroy({transaction})) - )) + for (const category of created) { + await category.save() + } - return Promise.resolve() - }); + for (const category of deleted) { + await category.destroy() + } } } diff --git a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js index 4660c8215..697e7169f 100644 --- a/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js +++ b/packages/local-sync/src/local-sync-worker/imap/fetch-messages-in-folder.js @@ -45,17 +45,10 @@ class FetchMessagesInFolder { // we just remove the category ID and UID. We may re-assign the same message // the same UID. Otherwise they're eventually garbage collected. const {Message} = this._db; - await this._db.sequelize.transaction((transaction) => - Message.update({ - folderImapUID: null, - folderId: null, - }, { - transaction: transaction, - where: { - folderId: this._folder.id, - }, - }) - ) + await Message.update({ + folderImapUID: null, + folderId: null, + }, {where: {folderId: this._folder.id}}) } async _updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) { @@ -159,18 +152,10 @@ class FetchMessagesInFolder { // removed_messages: removedUIDs.length, // }, `FetchMessagesInFolder: found messages no longer in the folder`) - await this._db.sequelize.transaction((transaction) => - Message.update({ - folderImapUID: null, - folderId: null, - }, { - transaction, - where: { - folderId: this._folder.id, - folderImapUID: removedUIDs, - }, - }) - ); + await Message.update({ + folderImapUID: null, + folderId: null, + }, {where: {folderId: this._folder.id, folderImapUID: removedUIDs}}) } _getDesiredMIMEParts(struct) { diff --git a/packages/local-sync/src/local-sync-worker/local-sync-delta-emitter.es6 b/packages/local-sync/src/local-sync-worker/local-sync-delta-emitter.es6 new file mode 100644 index 000000000..ffd4ec8c5 --- /dev/null +++ b/packages/local-sync/src/local-sync-worker/local-sync-delta-emitter.es6 @@ -0,0 +1,35 @@ +const TransactionConnector = require('../shared/transaction-connector') +const {DeltaStreamBuilder} = require('isomorphic-core') + +export default class LocalSyncDeltaEmitter { + constructor(db, accountId) { + this._db = db; + this._accountId = accountId; + NylasEnv.localSyncEmitter.on("startDeltasFor", this._startDeltasFor) + NylasEnv.localSyncEmitter.on("endDeltasFor", this._endDeltasFor) + /** + * The local-sync/sync-worker starts up asynchronously. We need to + * notify N1 client that there are more deltas it should be looking + * for. + */ + NylasEnv.localSyncEmitter.emit("refreshLocalDeltas", accountId) + } + + _startDeltasFor = ({accountId, cursor}) => { + if (accountId !== this._accountId) return; + if (this._disp && this._disp.dispose) this._disp.dispose() + this._disp = DeltaStreamBuilder.buildDeltaObservable({ + db: this._db, + cursor: cursor, + accountId: accountId, + deltasSource: TransactionConnector.getObservableForAccountId(accountId), + }).subscribe((deltas) => { + NylasEnv.localSyncEmitter.emit("localSyncDeltas", deltas) + }) + } + + _endDeltasFor = ({accountId}) => { + if (accountId !== this._accountId) return; + if (this._disp && this._disp.dispose) this._disp.dispose() + } +} diff --git a/packages/local-sync/src/local-sync-worker/sync-worker.js b/packages/local-sync/src/local-sync-worker/sync-worker.js index cb816a526..73ba62722 100644 --- a/packages/local-sync/src/local-sync-worker/sync-worker.js +++ b/packages/local-sync/src/local-sync-worker/sync-worker.js @@ -14,6 +14,7 @@ const FetchFolderList = require('./imap/fetch-folder-list') const FetchMessagesInFolder = require('./imap/fetch-messages-in-folder') const SyncbackTaskFactory = require('./syncback-task-factory') const SyncMetricsReporter = require('./sync-metrics-reporter'); +const LocalSyncDeltaEmitter = require('./local-sync-delta-emitter').default const RESTART_THRESHOLD = 10 @@ -29,6 +30,7 @@ class SyncWorker { this._interrupted = false this._syncInProgress = false this._syncAttemptsWhileInProgress = 0 + this._localDeltas = new LocalSyncDeltaEmitter(db, account.id) this._destroyed = false; this._syncTimer = setTimeout(() => { diff --git a/packages/local-sync/src/models/message.js b/packages/local-sync/src/models/message.js index e8fc43074..25f91ea9d 100644 --- a/packages/local-sync/src/models/message.js +++ b/packages/local-sync/src/models/message.js @@ -212,8 +212,8 @@ module.exports = (sequelize, Sequelize) => { date: this.date ? this.date.getTime() / 1000.0 : null, unread: this.unread, starred: this.starred, - folder: this.folder, - labels: this.labels, + folder: this.folder.toJSON(), + labels: this.labels.map(l => l.toJSON()), thread_id: this.threadId, }; }, diff --git a/packages/local-sync/src/models/thread.js b/packages/local-sync/src/models/thread.js index 8a5f24a55..7b246a605 100644 --- a/packages/local-sync/src/models/thread.js +++ b/packages/local-sync/src/models/thread.js @@ -144,8 +144,8 @@ module.exports = (sequelize, Sequelize) => { const response = { id: `${this.id}`, object: 'thread', - folders: this.folders, - labels: this.labels, + folders: this.folders.map(f => f.toJSON()), + labels: this.labels.map(l => l.toJSON()), account_id: this.accountId, participants: this.participants, subject: this.subject, diff --git a/packages/local-sync/src/new-message-processor/extract-contacts.js b/packages/local-sync/src/new-message-processor/extract-contacts.js index 96784b958..8a634932f 100644 --- a/packages/local-sync/src/new-message-processor/extract-contacts.js +++ b/packages/local-sync/src/new-message-processor/extract-contacts.js @@ -37,21 +37,17 @@ async function extractContacts({db, message}) { }, }) - await db.sequelize.transaction(async (transaction) => { - const promises = [] - for (const c of contactsDataById.values()) { - const existing = existingContacts.find(({id}) => id === c.id) - if (!existing) { - promises.push(Contact.create(c, {transaction})); - } else { - const updateRequired = (c.name !== existing.name); - if (updateRequired) { - promises.push(existing.update(c, {transaction})); - } + for (const c of contactsDataById.values()) { + const existing = existingContacts.find(({id}) => id === c.id) + if (!existing) { + await Contact.create(c) + } else { + const updateRequired = (c.name !== existing.name); + if (updateRequired) { + await existing.update(c) } } - await Promise.all(promises); - }) + } return message; } diff --git a/packages/local-sync/src/new-message-processor/extract-files.js b/packages/local-sync/src/new-message-processor/extract-files.js index 74bff1dc7..0502b2d81 100644 --- a/packages/local-sync/src/new-message-processor/extract-files.js +++ b/packages/local-sync/src/new-message-processor/extract-files.js @@ -29,13 +29,13 @@ function collectFilesFromStruct({db, message, struct, fileIds = new Set()}) { return collected; } -function extractFiles({db, message, imapMessage}) { +async function extractFiles({db, message, imapMessage}) { const {attributes: {struct}} = imapMessage const files = collectFilesFromStruct({db, message, struct}); if (files.length > 0) { - return db.sequelize.transaction((transaction) => - Promise.all(files.map(f => f.save({transaction}))) - ) + for (const file of files) { + await file.save() + } } return Promise.resolve() }