From a10543c1c826a04389683b5b7d90e21c7ecff01f Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Tue, 28 Jun 2016 15:30:51 -0700 Subject: [PATCH] Chunk sync of large mailboxes, keep track of synced UID range, not just uidnext --- .../processors/parsing.js | 2 +- .../imap/fetch-messages-in-category.js | 186 ++++++++++-------- 2 files changed, 109 insertions(+), 79 deletions(-) diff --git a/packages/nylas-message-processor/processors/parsing.js b/packages/nylas-message-processor/processors/parsing.js index 44f191d73..a54106e0e 100644 --- a/packages/nylas-message-processor/processors/parsing.js +++ b/packages/nylas-message-processor/processors/parsing.js @@ -1,7 +1,7 @@ const mimelib = require('mimelib'); const SNIPPET_SIZE = 100 -function Contact({name, address}) { +function Contact({name, address} = {}) { return { name, email: address, diff --git a/packages/nylas-sync/imap/fetch-messages-in-category.js b/packages/nylas-sync/imap/fetch-messages-in-category.js index d638e01ac..da1fcfef9 100644 --- a/packages/nylas-sync/imap/fetch-messages-in-category.js +++ b/packages/nylas-sync/imap/fetch-messages-in-category.js @@ -46,7 +46,7 @@ class FetchMessagesInCategory { ) } - _createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes) { + _updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) { const messageAttributesMap = {}; for (const msg of localMessageAttributes) { messageAttributesMap[msg.CategoryUID] = msg; @@ -74,18 +74,17 @@ class FetchMessagesInCategory { } }) - console.log(` -- found ${createdUIDs.length} new messages`) - console.log(` -- found ${changedMessages.length} flag changes`) + console.log(` --- found ${changedMessages.length || 'no'} flag changes`) + if (createdUIDs.length > 0) { + console.log(` --- found ${createdUIDs.length} new messages. These will not be processed because we assume that they will be assigned uid = uidnext, and will be picked up in the next sync when we discover unseen messages.`) + } - return Promise.props({ - creates: this._fetchMessagesAndQueueForProcessing(createdUIDs), - updates: this._db.sequelize.transaction((transaction) => - Promise.all(changedMessages.map(m => m.save({ - fields: MessageFlagAttributes, - transaction, - }))) - ), - }) + return this._db.sequelize.transaction((transaction) => + Promise.all(changedMessages.map(m => m.save({ + fields: MessageFlagAttributes, + transaction, + }))) + ); } _removeDeletedMessages(remoteUIDAttributes, localMessageAttributes) { @@ -95,7 +94,7 @@ class FetchMessagesInCategory { .filter(msg => !remoteUIDAttributes[msg.CategoryUID]) .map(msg => msg.CategoryUID) - console.log(` -- found ${removedUIDs.length} messages no longer in the folder`) + console.log(` --- found ${removedUIDs.length} messages no longer in the folder`) if (removedUIDs.length === 0) { return Promise.resolve(); @@ -148,6 +147,7 @@ class FetchMessagesInCategory { return; } const key = JSON.stringify(desiredParts); + console.log(key); uidsByPart[key] = uidsByPart[key] || []; uidsByPart[key].push(attributes.uid); }); @@ -159,14 +159,19 @@ class FetchMessagesInCategory { const bodies = ['HEADER'].concat(desiredParts.map(p => p.id)); console.log(`Fetching parts ${key} for ${uids.length} messages`) + // note: the order of UIDs in the array doesn't matter, Gmail always + // returns them in ascending (oldest => newest) order. + const $body = this._box.fetch(uids, {bodies, struct: true}) $body.subscribe((msg) => { + console.log(`Fetched message ${msg.attributes.uid}`) msg.body = {}; for (const {id, mimetype} of desiredParts) { msg.body[mimetype] = msg.parts[id]; } this._processMessage(msg); }); + return $body.toPromise(); }) }); @@ -245,88 +250,83 @@ class FetchMessagesInCategory { }) } - _fetchUnseenMessages() { + _fetchUnsyncedMessages() { const savedSyncState = this._category.syncState; - const boxSyncState = { - uidnext: this._box.uidnext, - uidvalidity: this._box.uidvalidity, - } + const isFirstSync = !savedSyncState.fetchedmax; + const boxUidnext = this._box.uidnext; + const boxUidvalidity = this._box.uidvalidity; - const {limit} = this._options; - let range = `${this._getLowerBoundUID(limit)}:*`; + const desiredRanges = []; - console.log(` - fetching unseen messages ${range}`) + console.log(` - Fetching messages. Currently have range: ${savedSyncState.fetchedmin}:${savedSyncState.fetchedmax}`) - if (savedSyncState.uidnext) { - if (savedSyncState.uidnext === boxSyncState.uidnext) { - console.log(" --- uidnext matches, nothing more to fetch") - return Promise.resolve(); + // Todo: In the future, this is where logic should go that limits + // sync based on number of messages / age of messages. + + if (isFirstSync) { + const lowerbound = Math.max(1, boxUidnext - 150); + desiredRanges.push({min: lowerbound, max: boxUidnext}) + } else { + if (savedSyncState.fetchedmax < boxUidnext) { + desiredRanges.push({min: savedSyncState.fetchedmax, max: boxUidnext}) + } else { + console.log(" --- fetchedmax == uidnext, nothing more recent to fetch.") + } + if (savedSyncState.fetchedmin > 1) { + const lowerbound = Math.max(1, savedSyncState.fetchedmin - 1000); + desiredRanges.push({min: lowerbound, max: savedSyncState.fetchedmin}) + } else { + console.log(" --- fetchedmin == 1, nothing older to fetch.") } - range = `${savedSyncState.uidnext}:*` } - return this._fetchMessagesAndQueueForProcessing(range).then(() => { - console.log(` - finished fetching unseen messages`); - return this.updateCategorySyncState({ - uidnext: boxSyncState.uidnext, - uidvalidity: boxSyncState.uidvalidity, - timeFetchedUnseen: Date.now(), - }); - }) + return Promise.each(desiredRanges, ({min, max}) => { + console.log(` --- fetching range: ${min}:${max}`); + + return this._fetchMessagesAndQueueForProcessing(`${min}:${max}`).then(() => { + const {fetchedmin, fetchedmax} = this._category.syncState; + return this.updateCategorySyncState({ + fetchedmin: fetchedmin ? Math.min(fetchedmin, min) : min, + fetchedmax: fetchedmax ? Math.max(fetchedmax, max) : max, + uidvalidity: boxUidvalidity, + timeFetchedUnseen: Date.now(), + }); + }) + }).then(() => { + console.log(` - Fetching messages finished`); + }); + } + + _runScan() { + const {fetchedmin, fetchedmax} = this._category.syncState; + if (!fetchedmin || !fetchedmax) { + throw new Error("Unseen messages must be fetched at least once before the first update/delete scan.") + } + return this._shouldRunDeepScan() ? this._runDeepScan() : this._runShallowScan() } _shouldRunDeepScan() { const {timeDeepScan} = this._category.syncState; - return Date.now() - (timeDeepScan || 0) > this._options.deepFolderScan + return Date.now() - (timeDeepScan || 0) > this._options.deepFolderScan; } - _runDeepScan(range) { - const {Message} = this._db; - console.log("fetchUIDAttributes START") - return this._box.fetchUIDAttributes(range) - .then((remoteUIDAttributes) => { - console.log(`fetchUIDAttributes FINISHED - ${Object.keys(remoteUIDAttributes).length} items returned`) - return Message.findAll({ - where: {CategoryId: this._category.id}, - attributes: MessageFlagAttributes, - }) - .then((localMessageAttributes) => ( - Promise.props({ - upserts: this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes), - deletes: this._removeDeletedMessages(remoteUIDAttributes, localMessageAttributes), - }) - )) - .then(() => { - console.log(` - finished fetching changes to messages ${range}`); - return this.updateCategorySyncState({ - highestmodseq: this._box.highestmodseq, - timeDeepScan: Date.now(), - timeShallowScan: Date.now(), - }) - }) - }); - } - - _fetchChangesToMessages() { + _runShallowScan() { const {highestmodseq} = this._category.syncState; const nextHighestmodseq = this._box.highestmodseq; - const range = `${this._getLowerBoundUID(this._options.limit)}:*`; - - console.log(` - fetching changes to messages ${range}`) - - if (this._shouldRunDeepScan()) { - return this._runDeepScan(range) - } let shallowFetch = null; + if (this._imap.serverSupports(Capabilities.Condstore)) { + console.log(` - Shallow attribute scan (using CONDSTORE)`) if (nextHighestmodseq === highestmodseq) { console.log(" --- highestmodseq matches, nothing more to fetch") return Promise.resolve(); } - shallowFetch = this._box.fetchUIDAttributes(range, {changedsince: highestmodseq}); + shallowFetch = this._box.fetchUIDAttributes(`1:*`, {changedsince: highestmodseq}); } else { - shallowFetch = this._box.fetchUIDAttributes(`${this._getLowerBoundUID(1000)}:*`); + const range = `${this._getLowerBoundUID(1000)}:*`; + console.log(` - Shallow attribute scan (using range: ${range})`) + shallowFetch = this._box.fetchUIDAttributes(range); } return shallowFetch @@ -336,10 +336,10 @@ class FetchMessagesInCategory { attributes: MessageFlagAttributes, }) .then((localMessageAttributes) => ( - this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes) + this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) )) .then(() => { - console.log(` - finished fetching changes to messages ${range}`); + console.log(` - finished fetching changes to messages`); return this.updateCategorySyncState({ highestmodseq: nextHighestmodseq, timeShallowScan: Date.now(), @@ -348,6 +348,36 @@ class FetchMessagesInCategory { )) } + _runDeepScan() { + const {Message} = this._db; + const {fetchedmin, fetchedmax} = this._category.syncState; + const range = `${fetchedmin}:${fetchedmax}`; + + console.log(` - Deep attribute scan: fetching attributes in range: ${range}`) + + return this._box.fetchUIDAttributes(range) + .then((remoteUIDAttributes) => { + return Message.findAll({ + where: {CategoryId: this._category.id}, + attributes: MessageFlagAttributes, + }) + .then((localMessageAttributes) => ( + Promise.props({ + updates: this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes), + deletes: this._removeDeletedMessages(remoteUIDAttributes, localMessageAttributes), + }) + )) + .then(() => { + console.log(` - Deep scan finished.`); + return this.updateCategorySyncState({ + highestmodseq: this._box.highestmodseq, + timeDeepScan: Date.now(), + timeShallowScan: Date.now(), + }) + }) + }); + } + updateCategorySyncState(newState) { if (_.isMatch(this._category.syncState, newState)) { return Promise.resolve(); @@ -360,11 +390,11 @@ class FetchMessagesInCategory { this._db = db; this._imap = imap; - return this._openMailboxAndEnsureValidity() - .then((box) => { + return this._openMailboxAndEnsureValidity().then((box) => { this._box = box - return this._fetchUnseenMessages() - .then(() => this._fetchChangesToMessages()) + return this._fetchUnsyncedMessages().then(() => + this._runScan() + ) }) } }