Chunk sync of large mailboxes, keep track of synced UID range, not just uidnext

This commit is contained in:
Ben Gotow 2016-06-28 15:30:51 -07:00
parent 0f4ed7f4a1
commit a10543c1c8
2 changed files with 109 additions and 79 deletions

View file

@ -1,7 +1,7 @@
const mimelib = require('mimelib');
const SNIPPET_SIZE = 100
function Contact({name, address}) {
function Contact({name, address} = {}) {
return {
name,
email: address,

View file

@ -46,7 +46,7 @@ class FetchMessagesInCategory {
)
}
_createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes) {
_updateMessageAttributes(remoteUIDAttributes, localMessageAttributes) {
const messageAttributesMap = {};
for (const msg of localMessageAttributes) {
messageAttributesMap[msg.CategoryUID] = msg;
@ -74,18 +74,17 @@ class FetchMessagesInCategory {
}
})
console.log(` -- found ${createdUIDs.length} new messages`)
console.log(` -- found ${changedMessages.length} flag changes`)
console.log(` --- found ${changedMessages.length || 'no'} flag changes`)
if (createdUIDs.length > 0) {
console.log(` --- found ${createdUIDs.length} new messages. These will not be processed because we assume that they will be assigned uid = uidnext, and will be picked up in the next sync when we discover unseen messages.`)
}
return Promise.props({
creates: this._fetchMessagesAndQueueForProcessing(createdUIDs),
updates: this._db.sequelize.transaction((transaction) =>
Promise.all(changedMessages.map(m => m.save({
fields: MessageFlagAttributes,
transaction,
})))
),
})
return this._db.sequelize.transaction((transaction) =>
Promise.all(changedMessages.map(m => m.save({
fields: MessageFlagAttributes,
transaction,
})))
);
}
_removeDeletedMessages(remoteUIDAttributes, localMessageAttributes) {
@ -95,7 +94,7 @@ class FetchMessagesInCategory {
.filter(msg => !remoteUIDAttributes[msg.CategoryUID])
.map(msg => msg.CategoryUID)
console.log(` -- found ${removedUIDs.length} messages no longer in the folder`)
console.log(` --- found ${removedUIDs.length} messages no longer in the folder`)
if (removedUIDs.length === 0) {
return Promise.resolve();
@ -148,6 +147,7 @@ class FetchMessagesInCategory {
return;
}
const key = JSON.stringify(desiredParts);
console.log(key);
uidsByPart[key] = uidsByPart[key] || [];
uidsByPart[key].push(attributes.uid);
});
@ -159,14 +159,19 @@ class FetchMessagesInCategory {
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
console.log(`Fetching parts ${key} for ${uids.length} messages`)
// note: the order of UIDs in the array doesn't matter, Gmail always
// returns them in ascending (oldest => newest) order.
const $body = this._box.fetch(uids, {bodies, struct: true})
$body.subscribe((msg) => {
console.log(`Fetched message ${msg.attributes.uid}`)
msg.body = {};
for (const {id, mimetype} of desiredParts) {
msg.body[mimetype] = msg.parts[id];
}
this._processMessage(msg);
});
return $body.toPromise();
})
});
@ -245,88 +250,83 @@ class FetchMessagesInCategory {
})
}
_fetchUnseenMessages() {
_fetchUnsyncedMessages() {
const savedSyncState = this._category.syncState;
const boxSyncState = {
uidnext: this._box.uidnext,
uidvalidity: this._box.uidvalidity,
}
const isFirstSync = !savedSyncState.fetchedmax;
const boxUidnext = this._box.uidnext;
const boxUidvalidity = this._box.uidvalidity;
const {limit} = this._options;
let range = `${this._getLowerBoundUID(limit)}:*`;
const desiredRanges = [];
console.log(` - fetching unseen messages ${range}`)
console.log(` - Fetching messages. Currently have range: ${savedSyncState.fetchedmin}:${savedSyncState.fetchedmax}`)
if (savedSyncState.uidnext) {
if (savedSyncState.uidnext === boxSyncState.uidnext) {
console.log(" --- uidnext matches, nothing more to fetch")
return Promise.resolve();
// Todo: In the future, this is where logic should go that limits
// sync based on number of messages / age of messages.
if (isFirstSync) {
const lowerbound = Math.max(1, boxUidnext - 150);
desiredRanges.push({min: lowerbound, max: boxUidnext})
} else {
if (savedSyncState.fetchedmax < boxUidnext) {
desiredRanges.push({min: savedSyncState.fetchedmax, max: boxUidnext})
} else {
console.log(" --- fetchedmax == uidnext, nothing more recent to fetch.")
}
if (savedSyncState.fetchedmin > 1) {
const lowerbound = Math.max(1, savedSyncState.fetchedmin - 1000);
desiredRanges.push({min: lowerbound, max: savedSyncState.fetchedmin})
} else {
console.log(" --- fetchedmin == 1, nothing older to fetch.")
}
range = `${savedSyncState.uidnext}:*`
}
return this._fetchMessagesAndQueueForProcessing(range).then(() => {
console.log(` - finished fetching unseen messages`);
return this.updateCategorySyncState({
uidnext: boxSyncState.uidnext,
uidvalidity: boxSyncState.uidvalidity,
timeFetchedUnseen: Date.now(),
});
})
return Promise.each(desiredRanges, ({min, max}) => {
console.log(` --- fetching range: ${min}:${max}`);
return this._fetchMessagesAndQueueForProcessing(`${min}:${max}`).then(() => {
const {fetchedmin, fetchedmax} = this._category.syncState;
return this.updateCategorySyncState({
fetchedmin: fetchedmin ? Math.min(fetchedmin, min) : min,
fetchedmax: fetchedmax ? Math.max(fetchedmax, max) : max,
uidvalidity: boxUidvalidity,
timeFetchedUnseen: Date.now(),
});
})
}).then(() => {
console.log(` - Fetching messages finished`);
});
}
_runScan() {
const {fetchedmin, fetchedmax} = this._category.syncState;
if (!fetchedmin || !fetchedmax) {
throw new Error("Unseen messages must be fetched at least once before the first update/delete scan.")
}
return this._shouldRunDeepScan() ? this._runDeepScan() : this._runShallowScan()
}
_shouldRunDeepScan() {
const {timeDeepScan} = this._category.syncState;
return Date.now() - (timeDeepScan || 0) > this._options.deepFolderScan
return Date.now() - (timeDeepScan || 0) > this._options.deepFolderScan;
}
_runDeepScan(range) {
const {Message} = this._db;
console.log("fetchUIDAttributes START")
return this._box.fetchUIDAttributes(range)
.then((remoteUIDAttributes) => {
console.log(`fetchUIDAttributes FINISHED - ${Object.keys(remoteUIDAttributes).length} items returned`)
return Message.findAll({
where: {CategoryId: this._category.id},
attributes: MessageFlagAttributes,
})
.then((localMessageAttributes) => (
Promise.props({
upserts: this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes),
deletes: this._removeDeletedMessages(remoteUIDAttributes, localMessageAttributes),
})
))
.then(() => {
console.log(` - finished fetching changes to messages ${range}`);
return this.updateCategorySyncState({
highestmodseq: this._box.highestmodseq,
timeDeepScan: Date.now(),
timeShallowScan: Date.now(),
})
})
});
}
_fetchChangesToMessages() {
_runShallowScan() {
const {highestmodseq} = this._category.syncState;
const nextHighestmodseq = this._box.highestmodseq;
const range = `${this._getLowerBoundUID(this._options.limit)}:*`;
console.log(` - fetching changes to messages ${range}`)
if (this._shouldRunDeepScan()) {
return this._runDeepScan(range)
}
let shallowFetch = null;
if (this._imap.serverSupports(Capabilities.Condstore)) {
console.log(` - Shallow attribute scan (using CONDSTORE)`)
if (nextHighestmodseq === highestmodseq) {
console.log(" --- highestmodseq matches, nothing more to fetch")
return Promise.resolve();
}
shallowFetch = this._box.fetchUIDAttributes(range, {changedsince: highestmodseq});
shallowFetch = this._box.fetchUIDAttributes(`1:*`, {changedsince: highestmodseq});
} else {
shallowFetch = this._box.fetchUIDAttributes(`${this._getLowerBoundUID(1000)}:*`);
const range = `${this._getLowerBoundUID(1000)}:*`;
console.log(` - Shallow attribute scan (using range: ${range})`)
shallowFetch = this._box.fetchUIDAttributes(range);
}
return shallowFetch
@ -336,10 +336,10 @@ class FetchMessagesInCategory {
attributes: MessageFlagAttributes,
})
.then((localMessageAttributes) => (
this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes)
this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes)
))
.then(() => {
console.log(` - finished fetching changes to messages ${range}`);
console.log(` - finished fetching changes to messages`);
return this.updateCategorySyncState({
highestmodseq: nextHighestmodseq,
timeShallowScan: Date.now(),
@ -348,6 +348,36 @@ class FetchMessagesInCategory {
))
}
_runDeepScan() {
const {Message} = this._db;
const {fetchedmin, fetchedmax} = this._category.syncState;
const range = `${fetchedmin}:${fetchedmax}`;
console.log(` - Deep attribute scan: fetching attributes in range: ${range}`)
return this._box.fetchUIDAttributes(range)
.then((remoteUIDAttributes) => {
return Message.findAll({
where: {CategoryId: this._category.id},
attributes: MessageFlagAttributes,
})
.then((localMessageAttributes) => (
Promise.props({
updates: this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes),
deletes: this._removeDeletedMessages(remoteUIDAttributes, localMessageAttributes),
})
))
.then(() => {
console.log(` - Deep scan finished.`);
return this.updateCategorySyncState({
highestmodseq: this._box.highestmodseq,
timeDeepScan: Date.now(),
timeShallowScan: Date.now(),
})
})
});
}
updateCategorySyncState(newState) {
if (_.isMatch(this._category.syncState, newState)) {
return Promise.resolve();
@ -360,11 +390,11 @@ class FetchMessagesInCategory {
this._db = db;
this._imap = imap;
return this._openMailboxAndEnsureValidity()
.then((box) => {
return this._openMailboxAndEnsureValidity().then((box) => {
this._box = box
return this._fetchUnseenMessages()
.then(() => this._fetchChangesToMessages())
return this._fetchUnsyncedMessages().then(() =>
this._runScan()
)
})
}
}