From 29a6448922c7ba64c7ebbcf378435750e7627a84 Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Mon, 20 Jun 2016 17:33:23 -0700 Subject: [PATCH] Initial concept of sync policy --- core/models/shared/account.js | 9 ++ storage/shared.sqlite | Bin 4096 -> 4096 bytes sync/imap/connection.js | 38 +++-- sync/imap/sync-mailbox-operation.js | 60 ++++--- sync/sync-worker-pool.js | 15 ++ sync/sync-worker.js | 235 ++++++++++++++++++++++------ 6 files changed, 278 insertions(+), 79 deletions(-) diff --git a/core/models/shared/account.js b/core/models/shared/account.js index 9195662d9..0f5a56940 100644 --- a/core/models/shared/account.js +++ b/core/models/shared/account.js @@ -1,6 +1,15 @@ module.exports = (sequelize, Sequelize) => { const Account = sequelize.define('Account', { emailAddress: Sequelize.STRING, + syncPolicy: { + type: Sequelize.STRING, + get: function get() { + return JSON.parse(this.getDataValue('syncPolicy')) + }, + set: function set(val) { + this.setDataValue('syncPolicy', JSON.stringify(val)); + }, + }, }, { classMethods: { associate: ({AccountToken}) => { diff --git a/storage/shared.sqlite b/storage/shared.sqlite index 60b0d053d6888e58dfeaa081eb714239b65f4a39..677d35ba3b4e2a681f44ff8cbc09eb8d9345d43a 100644 GIT binary patch delta 146 zcmZorXi%6S&B#1a##xw|LHCgyF9QPuGvhV}W-X>0jN3Lgx-f3O$XLqA8O+EoE-TB} zR5{t7xs@1%dm_Kh8I8CW2CCx`(6!n-9R delta 177 zcmZorXi%6S&B!uQ##xw!LHCgyF9QPuGvh@DW>==qj2AZxvcxlP-o{wU$Z5gIE-ov} z*yKFfgt>IGEtB=+|IF+xPZ^k>0u|k6R+nUCWiaI9OG?d4&o9X@cSz4o%*@eC&d)V8 z1ewXi{E&h9GxI~B;;YP>0!+;6oQ}!K`K5U!A^F* { - for (const key of Object.keys(Capabilities)) { - const val = Capabilities[key]; - if (this._imap.serverSupports(val)) { - this._capabilities.push(val); - } - } - this.emit('ready'); - }); - this._imap.once('error', (err) => { console.log(err); }); @@ -58,8 +48,34 @@ class IMAPConnection extends EventEmitter { // Emitted when message metadata (e.g. flags) changes externally. this._imap.on('update', () => this.emit('update')) + } - this._imap.connect(); + populateCapabilities() { + this._capabilities = []; + for (const key of Object.keys(Capabilities)) { + const val = Capabilities[key]; + if (this._imap.serverSupports(val)) { + this._capabilities.push(val); + } + } + } + + connect() { + if (!this._connectPromise) { + this._connectPromise = new Promise((resolve) => { + this._imap.once('ready', () => { + this.populateCapabilities(); + resolve(); + }); + this._imap.connect(); + }); + } + return this._connectPromise; + } + + end() { + this._queue = []; + this._imap.end(); } openBox(box) { diff --git a/sync/imap/sync-mailbox-operation.js b/sync/imap/sync-mailbox-operation.js index 632f9fccd..ca7c4f962 100644 --- a/sync/imap/sync-mailbox-operation.js +++ b/sync/imap/sync-mailbox-operation.js @@ -3,24 +3,36 @@ const { processMessage } = require(`${__base}/message-processor`) class SyncMailboxOperation { - constructor(category) { + constructor(category, options) { this._category = category; + this._options = options; if (!this._category) { throw new Error("SyncMailboxOperation requires a category") } } description() { - return `SyncMailboxOperation (${this._category.name} - ${this._category.id})`; + return `SyncMailboxOperation (${this._category.name} - ${this._category.id})\n Options: ${JSON.stringify(this._options)}`; } - _unlinkAllMessages() { + _getLowerBoundUID() { + const {count} = this._options.limit; + return count ? Math.max(1, this._box.uidnext - count) : 1; + } + + _recoverFromUIDInvalidity() { + // UID invalidity means the server has asked us to delete all the UIDs for + // this folder and start from scratch. We let a garbage collector clean up + // actual Messages, because we may just get new UIDs pointing to the same + // messages. const {MessageUID} = this._db; - return MessageUID.destroy({ - where: { - CategoryId: this._category.id, - }, - }) + return this._db.sequelize.transaction((transaction) => + MessageUID.destroy({ + where: { + CategoryId: this._category.id, + }, + }, {transaction}) + ) } _removeDeletedMessageUIDs(removedUIDs) { @@ -30,7 +42,12 @@ class SyncMailboxOperation { return Promise.resolve(); } return this._db.sequelize.transaction((transaction) => - MessageUID.destroy({where: {uid: removedUIDs}}, {transaction}) + MessageUID.destroy({ + where: { + CategoryId: this._category.id, + uid: removedUIDs, + }, + }, {transaction}) ); } @@ -73,7 +90,7 @@ class SyncMailboxOperation { return processMessage({accountId, attributes, headers, body, hash}) } - _openMailboxAndCheckValidity() { + _openMailboxAndEnsureValidity() { return this._imap.openBox(this._category.name, true).then((box) => { this._box = box; @@ -81,7 +98,7 @@ class SyncMailboxOperation { throw new Error("Mailbox does not support persistentUIDs.") } if (box.uidvalidity !== this._category.syncState.uidvalidity) { - return this._unlinkAllMessages(); + return this._recoverFromUIDInvalidity(); } return Promise.resolve(); }) @@ -94,18 +111,18 @@ class SyncMailboxOperation { uidvalidity: this._box.uidvalidity, } - console.log(" - fetching unseen messages") - - let fetchRange = `1:*`; + let range = `${this._getLowerBoundUID()}:*`; if (savedSyncState.uidnext) { if (savedSyncState.uidnext === currentSyncState.uidnext) { console.log(" --- nothing more to fetch") return Promise.resolve(); } - fetchRange = `${savedSyncState.uidnext}:*` + range = `${savedSyncState.uidnext}:*` } - return this._imap.fetch(fetchRange, this._processMessage.bind(this)).then(() => { + console.log(` - fetching unseen messages ${range}`) + + return this._imap.fetch(range, this._processMessage.bind(this)).then(() => { this._category.syncState = currentSyncState; return this._category.save(); }); @@ -113,15 +130,16 @@ class SyncMailboxOperation { _fetchChangesToMessages() { const {MessageUID} = this._db; + const range = `${this._getLowerBoundUID()}:*`; - console.log(" - fetching changes to messages") + console.log(` - fetching changes to messages ${range}`) - return this._imap.fetchUIDAttributes(`1:*`).then((latestUIDAttributes) => { + return this._imap.fetchUIDAttributes(range).then((latestUIDAttributes) => { return MessageUID.findAll({where: {CategoryId: this._category.id}}).then((knownUIDs) => { const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs); - console.log(` - found changed / new UIDs: ${neededUIDs.join(', ')}`) - console.log(` - found removed UIDs: ${removedUIDs.join(', ')}`) + console.log(` - found changed / new UIDs: ${neededUIDs.join(', ') || 'none'}`) + console.log(` - found removed UIDs: ${removedUIDs.join(', ') || 'none'}`) return Promise.props({ deletes: this._removeDeletedMessageUIDs(removedUIDs), @@ -135,7 +153,7 @@ class SyncMailboxOperation { this._db = db; this._imap = imap; - return this._openMailboxAndCheckValidity() + return this._openMailboxAndEnsureValidity() .then(() => this._fetchUnseenMessages() ).then(() => diff --git a/sync/sync-worker-pool.js b/sync/sync-worker-pool.js index 8423d0ceb..e76b7f4c8 100644 --- a/sync/sync-worker-pool.js +++ b/sync/sync-worker-pool.js @@ -7,6 +7,21 @@ class SyncWorkerPool { } addWorkerForAccount(account) { + account.syncPolicy = { + limit: { + after: Date.now() - 7 * 24 * 60 * 60 * 1000, + count: 10000, + }, + afterSync: 'idle', + folderRecentSync: { + every: 60 * 1000, + }, + folderDeepSync: { + every: 5 * 60 * 1000, + }, + expiration: Date.now() + 60 * 60 * 1000, + } + DatabaseConnectionFactory.forAccount(account.id).then((db) => { this._workers[account.id] = new SyncWorker(account, db); }); diff --git a/sync/sync-worker.js b/sync/sync-worker.js index ad148afcc..88bd1bbd2 100644 --- a/sync/sync-worker.js +++ b/sync/sync-worker.js @@ -1,61 +1,202 @@ const IMAPConnection = require('./imap/connection'); const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation') const SyncMailboxOperation = require('./imap/sync-mailbox-operation') +// +// account.syncPolicy = { +// afterSync: 'idle', +// limit: { +// after: Date.now() - 7 * 24 * 60 * 60 * 1000, +// count: 10000, +// }, +// folderRecentSync: { +// every: 60 * 1000, +// }, +// folderDeepSync: { +// every: 5 * 60 * 1000, +// }, +// expiration: Date.now() + 60 * 60 * 1000, +// } class SyncWorker { + constructor(account, db) { - const main = new IMAPConnection(db, { - user: 'inboxapptest1@fastmail.fm', - password: 'trar2e', - host: 'mail.messagingengine.com', - port: 993, - tls: true, - }); - - // Todo: SyncWorker should decide what operations to queue and what params - // to pass them, and how often, based on SyncPolicy model (TBD). - - main.on('ready', () => { - main.runOperation(new RefreshMailboxesOperation()) - .then(() => - this._db.Category.find({where: {role: 'inbox'}}) - ).then((inboxCategory) => { - if (!inboxCategory) { - throw new Error("Unable to find an inbox category.") - } - main.on('mail', () => { - main.runOperation(new SyncMailboxOperation(inboxCategory)); - }) - main.on('update', () => { - main.runOperation(new SyncMailboxOperation(inboxCategory)); - }) - main.on('queue-empty', () => { - main.openBox(inboxCategory.name, true).then(() => { - console.log("Idling on inbox category"); - }); - }); - - setInterval(() => this.syncAllMailboxes(), 120 * 1000); - this.syncAllMailboxes(); - }); - }); - this._db = db; - this._main = main; + this._conn = null; + this._account = account; + this._lastFolderRecentSync = null; + this._lastFolderDeepSync = null; + + this._syncTimer = null; + this._expirationTimer = null; + + this.syncNow(); + this.scheduleExpiration(); } - syncAllMailboxes() { - const {Category} = this._db; - Category.findAll().then((categories) => { - const priority = ['inbox', 'drafts', 'sent']; - const sorted = categories.sort((a, b) => { - return priority.indexOf(b.role) - priority.indexOf(a.role); - }) - for (const cat of sorted) { - this._main.runOperation(new SyncMailboxOperation(cat)); - } + // TODO: How does this get called? + onAccountChanged() { + this.syncNow(); + this.scheduleExpiration(); + } + + onExpired() { + // Returning syncs to the unclaimed queue every so often is healthy. + // TODO: That. + } + + onSyncDidComplete() { + const {afterSync} = this._account.syncPolicy; + + if (afterSync === 'idle') { + this.getInboxCategory().then((inboxCategory) => { + this._conn.openBox(inboxCategory.name, true).then(() => { + console.log(" - Idling on inbox category"); + }); + }); + } else if (afterSync === 'close') { + console.log(" - Closing connection"); + this._conn.end(); + this._conn = null; + } else { + throw new Error(`onSyncDidComplete: Unknown afterSync behavior: ${afterSync}`) + } + } + + onConnectionIdleUpdate() { + this.getInboxCategory((inboxCategory) => { + this._conn.runOperation(new SyncMailboxOperation(inboxCategory, { + scanAllUIDs: false, + limit: this.account.syncPolicy.options, + })); }); } + + getInboxCategory() { + return this._db.Category.find({where: {role: 'inbox'}}) + } + + getCurrentFolderSyncOptionsForPolicy() { + const {folderRecentSync, folderDeepSync, limit} = this._account.syncPolicy; + + if (Date.now() - this._lastFolderDeepSync > folderDeepSync.every) { + return { + mode: 'deep', + options: { + scanAllUIDs: true, + limit: limit, + }, + }; + } + if (Date.now() - this._lastFolderRecentSync > folderRecentSync.every) { + return { + mode: 'shallow', + options: { + scanAllUIDs: false, + limit: limit, + }, + }; + } + return { + mode: 'none', + }; + } + + ensureConnection() { + if (!this._conn) { + const conn = new IMAPConnection(this._db, { + user: 'inboxapptest1@fastmail.fm', + password: 'trar2e', + host: 'mail.messagingengine.com', + port: 993, + tls: true, + }); + conn.on('mail', () => { + this.onConnectionIdleUpdate(); + }) + conn.on('update', () => { + this.onConnectionIdleUpdate(); + }) + conn.on('queue-empty', () => { + }); + + this._conn = conn; + } + + return this._conn.connect(); + } + + queueOperationsForUpdates() { + // todo: syncback operations belong here! + return this._conn.runOperation(new RefreshMailboxesOperation()) + } + + queueOperationsForFolderSyncs() { + const {Category} = this._db; + const {mode, options} = this.getCurrentFolderSyncOptionsForPolicy(); + + if (mode === 'none') { + return Promise.resolve(); + } + + return Category.findAll().then((categories) => { + const priority = ['inbox', 'drafts', 'sent']; + const sorted = categories.sort((a, b) => + priority.indexOf(b.role) - priority.indexOf(a.role) + ) + return Promise.all(sorted.map((cat) => + this._conn.runOperation(new SyncMailboxOperation(cat, options)) + )).then(() => { + if (mode === 'deep') { + this._lastFolderDeepSync = Date.now(); + this._lastFolderRecentSync = Date.now(); + } else if (mode === 'shallow') { + this._lastFolderRecentSync = Date.now(); + } + }); + }); + } + + syncNow() { + clearTimeout(this._syncTimer); + + this.ensureConnection().then(() => + this.queueOperationsForUpdates().then(() => + this.queueOperationsForFolderSyncs() + ) + ).catch((err) => { + // Sync has failed for some reason. What do we do?! + console.error(err); + }).finally(() => { + this.onSyncDidComplete(); + this.scheduleNextSync(); + }); + } + + scheduleExpiration() { + const {expiration} = this._account.syncPolicy; + + clearTimeout(this._expirationTimer); + this._expirationTimer = setTimeout(() => this.onExpired(), expiration); + } + + scheduleNextSync() { + const {folderRecentSync, folderDeepSync} = this._account.syncPolicy; + + let target = Number.MAX_SAFE_INTEGER; + + if (folderRecentSync) { + target = Math.min(target, this._lastFolderRecentSync + folderRecentSync.every); + } + if (folderDeepSync) { + target = Math.min(target, this._lastFolderDeepSync + folderDeepSync.every); + } + + console.log(`Next sync scheduled for ${new Date(target).toLocaleString()}`); + + this._syncTimer = setTimeout(() => { + this.syncNow(); + }, target - Date.now()); + } } module.exports = SyncWorker;