mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-01-01 13:14:16 +08:00
Initial concept of sync policy
This commit is contained in:
parent
f995f6fda1
commit
29a6448922
6 changed files with 278 additions and 79 deletions
|
@ -1,6 +1,15 @@
|
|||
module.exports = (sequelize, Sequelize) => {
|
||||
const Account = sequelize.define('Account', {
|
||||
emailAddress: Sequelize.STRING,
|
||||
syncPolicy: {
|
||||
type: Sequelize.STRING,
|
||||
get: function get() {
|
||||
return JSON.parse(this.getDataValue('syncPolicy'))
|
||||
},
|
||||
set: function set(val) {
|
||||
this.setDataValue('syncPolicy', JSON.stringify(val));
|
||||
},
|
||||
},
|
||||
}, {
|
||||
classMethods: {
|
||||
associate: ({AccountToken}) => {
|
||||
|
|
Binary file not shown.
|
@ -20,16 +20,6 @@ class IMAPConnection extends EventEmitter {
|
|||
this._capabilities = [];
|
||||
this._imap = Promise.promisifyAll(new Imap(settings));
|
||||
|
||||
this._imap.once('ready', () => {
|
||||
for (const key of Object.keys(Capabilities)) {
|
||||
const val = Capabilities[key];
|
||||
if (this._imap.serverSupports(val)) {
|
||||
this._capabilities.push(val);
|
||||
}
|
||||
}
|
||||
this.emit('ready');
|
||||
});
|
||||
|
||||
this._imap.once('error', (err) => {
|
||||
console.log(err);
|
||||
});
|
||||
|
@ -58,8 +48,34 @@ class IMAPConnection extends EventEmitter {
|
|||
|
||||
// Emitted when message metadata (e.g. flags) changes externally.
|
||||
this._imap.on('update', () => this.emit('update'))
|
||||
}
|
||||
|
||||
this._imap.connect();
|
||||
populateCapabilities() {
|
||||
this._capabilities = [];
|
||||
for (const key of Object.keys(Capabilities)) {
|
||||
const val = Capabilities[key];
|
||||
if (this._imap.serverSupports(val)) {
|
||||
this._capabilities.push(val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
connect() {
|
||||
if (!this._connectPromise) {
|
||||
this._connectPromise = new Promise((resolve) => {
|
||||
this._imap.once('ready', () => {
|
||||
this.populateCapabilities();
|
||||
resolve();
|
||||
});
|
||||
this._imap.connect();
|
||||
});
|
||||
}
|
||||
return this._connectPromise;
|
||||
}
|
||||
|
||||
end() {
|
||||
this._queue = [];
|
||||
this._imap.end();
|
||||
}
|
||||
|
||||
openBox(box) {
|
||||
|
|
|
@ -3,24 +3,36 @@ const { processMessage } = require(`${__base}/message-processor`)
|
|||
|
||||
|
||||
class SyncMailboxOperation {
|
||||
constructor(category) {
|
||||
constructor(category, options) {
|
||||
this._category = category;
|
||||
this._options = options;
|
||||
if (!this._category) {
|
||||
throw new Error("SyncMailboxOperation requires a category")
|
||||
}
|
||||
}
|
||||
|
||||
description() {
|
||||
return `SyncMailboxOperation (${this._category.name} - ${this._category.id})`;
|
||||
return `SyncMailboxOperation (${this._category.name} - ${this._category.id})\n Options: ${JSON.stringify(this._options)}`;
|
||||
}
|
||||
|
||||
_unlinkAllMessages() {
|
||||
_getLowerBoundUID() {
|
||||
const {count} = this._options.limit;
|
||||
return count ? Math.max(1, this._box.uidnext - count) : 1;
|
||||
}
|
||||
|
||||
_recoverFromUIDInvalidity() {
|
||||
// UID invalidity means the server has asked us to delete all the UIDs for
|
||||
// this folder and start from scratch. We let a garbage collector clean up
|
||||
// actual Messages, because we may just get new UIDs pointing to the same
|
||||
// messages.
|
||||
const {MessageUID} = this._db;
|
||||
return MessageUID.destroy({
|
||||
where: {
|
||||
CategoryId: this._category.id,
|
||||
},
|
||||
})
|
||||
return this._db.sequelize.transaction((transaction) =>
|
||||
MessageUID.destroy({
|
||||
where: {
|
||||
CategoryId: this._category.id,
|
||||
},
|
||||
}, {transaction})
|
||||
)
|
||||
}
|
||||
|
||||
_removeDeletedMessageUIDs(removedUIDs) {
|
||||
|
@ -30,7 +42,12 @@ class SyncMailboxOperation {
|
|||
return Promise.resolve();
|
||||
}
|
||||
return this._db.sequelize.transaction((transaction) =>
|
||||
MessageUID.destroy({where: {uid: removedUIDs}}, {transaction})
|
||||
MessageUID.destroy({
|
||||
where: {
|
||||
CategoryId: this._category.id,
|
||||
uid: removedUIDs,
|
||||
},
|
||||
}, {transaction})
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -73,7 +90,7 @@ class SyncMailboxOperation {
|
|||
return processMessage({accountId, attributes, headers, body, hash})
|
||||
}
|
||||
|
||||
_openMailboxAndCheckValidity() {
|
||||
_openMailboxAndEnsureValidity() {
|
||||
return this._imap.openBox(this._category.name, true).then((box) => {
|
||||
this._box = box;
|
||||
|
||||
|
@ -81,7 +98,7 @@ class SyncMailboxOperation {
|
|||
throw new Error("Mailbox does not support persistentUIDs.")
|
||||
}
|
||||
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
|
||||
return this._unlinkAllMessages();
|
||||
return this._recoverFromUIDInvalidity();
|
||||
}
|
||||
return Promise.resolve();
|
||||
})
|
||||
|
@ -94,18 +111,18 @@ class SyncMailboxOperation {
|
|||
uidvalidity: this._box.uidvalidity,
|
||||
}
|
||||
|
||||
console.log(" - fetching unseen messages")
|
||||
|
||||
let fetchRange = `1:*`;
|
||||
let range = `${this._getLowerBoundUID()}:*`;
|
||||
if (savedSyncState.uidnext) {
|
||||
if (savedSyncState.uidnext === currentSyncState.uidnext) {
|
||||
console.log(" --- nothing more to fetch")
|
||||
return Promise.resolve();
|
||||
}
|
||||
fetchRange = `${savedSyncState.uidnext}:*`
|
||||
range = `${savedSyncState.uidnext}:*`
|
||||
}
|
||||
|
||||
return this._imap.fetch(fetchRange, this._processMessage.bind(this)).then(() => {
|
||||
console.log(` - fetching unseen messages ${range}`)
|
||||
|
||||
return this._imap.fetch(range, this._processMessage.bind(this)).then(() => {
|
||||
this._category.syncState = currentSyncState;
|
||||
return this._category.save();
|
||||
});
|
||||
|
@ -113,15 +130,16 @@ class SyncMailboxOperation {
|
|||
|
||||
_fetchChangesToMessages() {
|
||||
const {MessageUID} = this._db;
|
||||
const range = `${this._getLowerBoundUID()}:*`;
|
||||
|
||||
console.log(" - fetching changes to messages")
|
||||
console.log(` - fetching changes to messages ${range}`)
|
||||
|
||||
return this._imap.fetchUIDAttributes(`1:*`).then((latestUIDAttributes) => {
|
||||
return this._imap.fetchUIDAttributes(range).then((latestUIDAttributes) => {
|
||||
return MessageUID.findAll({where: {CategoryId: this._category.id}}).then((knownUIDs) => {
|
||||
const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs);
|
||||
|
||||
console.log(` - found changed / new UIDs: ${neededUIDs.join(', ')}`)
|
||||
console.log(` - found removed UIDs: ${removedUIDs.join(', ')}`)
|
||||
console.log(` - found changed / new UIDs: ${neededUIDs.join(', ') || 'none'}`)
|
||||
console.log(` - found removed UIDs: ${removedUIDs.join(', ') || 'none'}`)
|
||||
|
||||
return Promise.props({
|
||||
deletes: this._removeDeletedMessageUIDs(removedUIDs),
|
||||
|
@ -135,7 +153,7 @@ class SyncMailboxOperation {
|
|||
this._db = db;
|
||||
this._imap = imap;
|
||||
|
||||
return this._openMailboxAndCheckValidity()
|
||||
return this._openMailboxAndEnsureValidity()
|
||||
.then(() =>
|
||||
this._fetchUnseenMessages()
|
||||
).then(() =>
|
||||
|
|
|
@ -7,6 +7,21 @@ class SyncWorkerPool {
|
|||
}
|
||||
|
||||
addWorkerForAccount(account) {
|
||||
account.syncPolicy = {
|
||||
limit: {
|
||||
after: Date.now() - 7 * 24 * 60 * 60 * 1000,
|
||||
count: 10000,
|
||||
},
|
||||
afterSync: 'idle',
|
||||
folderRecentSync: {
|
||||
every: 60 * 1000,
|
||||
},
|
||||
folderDeepSync: {
|
||||
every: 5 * 60 * 1000,
|
||||
},
|
||||
expiration: Date.now() + 60 * 60 * 1000,
|
||||
}
|
||||
|
||||
DatabaseConnectionFactory.forAccount(account.id).then((db) => {
|
||||
this._workers[account.id] = new SyncWorker(account, db);
|
||||
});
|
||||
|
|
|
@ -1,61 +1,202 @@
|
|||
const IMAPConnection = require('./imap/connection');
|
||||
const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation')
|
||||
const SyncMailboxOperation = require('./imap/sync-mailbox-operation')
|
||||
//
|
||||
// account.syncPolicy = {
|
||||
// afterSync: 'idle',
|
||||
// limit: {
|
||||
// after: Date.now() - 7 * 24 * 60 * 60 * 1000,
|
||||
// count: 10000,
|
||||
// },
|
||||
// folderRecentSync: {
|
||||
// every: 60 * 1000,
|
||||
// },
|
||||
// folderDeepSync: {
|
||||
// every: 5 * 60 * 1000,
|
||||
// },
|
||||
// expiration: Date.now() + 60 * 60 * 1000,
|
||||
// }
|
||||
|
||||
class SyncWorker {
|
||||
|
||||
constructor(account, db) {
|
||||
const main = new IMAPConnection(db, {
|
||||
user: 'inboxapptest1@fastmail.fm',
|
||||
password: 'trar2e',
|
||||
host: 'mail.messagingengine.com',
|
||||
port: 993,
|
||||
tls: true,
|
||||
});
|
||||
|
||||
// Todo: SyncWorker should decide what operations to queue and what params
|
||||
// to pass them, and how often, based on SyncPolicy model (TBD).
|
||||
|
||||
main.on('ready', () => {
|
||||
main.runOperation(new RefreshMailboxesOperation())
|
||||
.then(() =>
|
||||
this._db.Category.find({where: {role: 'inbox'}})
|
||||
).then((inboxCategory) => {
|
||||
if (!inboxCategory) {
|
||||
throw new Error("Unable to find an inbox category.")
|
||||
}
|
||||
main.on('mail', () => {
|
||||
main.runOperation(new SyncMailboxOperation(inboxCategory));
|
||||
})
|
||||
main.on('update', () => {
|
||||
main.runOperation(new SyncMailboxOperation(inboxCategory));
|
||||
})
|
||||
main.on('queue-empty', () => {
|
||||
main.openBox(inboxCategory.name, true).then(() => {
|
||||
console.log("Idling on inbox category");
|
||||
});
|
||||
});
|
||||
|
||||
setInterval(() => this.syncAllMailboxes(), 120 * 1000);
|
||||
this.syncAllMailboxes();
|
||||
});
|
||||
});
|
||||
|
||||
this._db = db;
|
||||
this._main = main;
|
||||
this._conn = null;
|
||||
this._account = account;
|
||||
this._lastFolderRecentSync = null;
|
||||
this._lastFolderDeepSync = null;
|
||||
|
||||
this._syncTimer = null;
|
||||
this._expirationTimer = null;
|
||||
|
||||
this.syncNow();
|
||||
this.scheduleExpiration();
|
||||
}
|
||||
|
||||
syncAllMailboxes() {
|
||||
const {Category} = this._db;
|
||||
Category.findAll().then((categories) => {
|
||||
const priority = ['inbox', 'drafts', 'sent'];
|
||||
const sorted = categories.sort((a, b) => {
|
||||
return priority.indexOf(b.role) - priority.indexOf(a.role);
|
||||
})
|
||||
for (const cat of sorted) {
|
||||
this._main.runOperation(new SyncMailboxOperation(cat));
|
||||
}
|
||||
// TODO: How does this get called?
|
||||
onAccountChanged() {
|
||||
this.syncNow();
|
||||
this.scheduleExpiration();
|
||||
}
|
||||
|
||||
onExpired() {
|
||||
// Returning syncs to the unclaimed queue every so often is healthy.
|
||||
// TODO: That.
|
||||
}
|
||||
|
||||
onSyncDidComplete() {
|
||||
const {afterSync} = this._account.syncPolicy;
|
||||
|
||||
if (afterSync === 'idle') {
|
||||
this.getInboxCategory().then((inboxCategory) => {
|
||||
this._conn.openBox(inboxCategory.name, true).then(() => {
|
||||
console.log(" - Idling on inbox category");
|
||||
});
|
||||
});
|
||||
} else if (afterSync === 'close') {
|
||||
console.log(" - Closing connection");
|
||||
this._conn.end();
|
||||
this._conn = null;
|
||||
} else {
|
||||
throw new Error(`onSyncDidComplete: Unknown afterSync behavior: ${afterSync}`)
|
||||
}
|
||||
}
|
||||
|
||||
onConnectionIdleUpdate() {
|
||||
this.getInboxCategory((inboxCategory) => {
|
||||
this._conn.runOperation(new SyncMailboxOperation(inboxCategory, {
|
||||
scanAllUIDs: false,
|
||||
limit: this.account.syncPolicy.options,
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
getInboxCategory() {
|
||||
return this._db.Category.find({where: {role: 'inbox'}})
|
||||
}
|
||||
|
||||
getCurrentFolderSyncOptionsForPolicy() {
|
||||
const {folderRecentSync, folderDeepSync, limit} = this._account.syncPolicy;
|
||||
|
||||
if (Date.now() - this._lastFolderDeepSync > folderDeepSync.every) {
|
||||
return {
|
||||
mode: 'deep',
|
||||
options: {
|
||||
scanAllUIDs: true,
|
||||
limit: limit,
|
||||
},
|
||||
};
|
||||
}
|
||||
if (Date.now() - this._lastFolderRecentSync > folderRecentSync.every) {
|
||||
return {
|
||||
mode: 'shallow',
|
||||
options: {
|
||||
scanAllUIDs: false,
|
||||
limit: limit,
|
||||
},
|
||||
};
|
||||
}
|
||||
return {
|
||||
mode: 'none',
|
||||
};
|
||||
}
|
||||
|
||||
ensureConnection() {
|
||||
if (!this._conn) {
|
||||
const conn = new IMAPConnection(this._db, {
|
||||
user: 'inboxapptest1@fastmail.fm',
|
||||
password: 'trar2e',
|
||||
host: 'mail.messagingengine.com',
|
||||
port: 993,
|
||||
tls: true,
|
||||
});
|
||||
conn.on('mail', () => {
|
||||
this.onConnectionIdleUpdate();
|
||||
})
|
||||
conn.on('update', () => {
|
||||
this.onConnectionIdleUpdate();
|
||||
})
|
||||
conn.on('queue-empty', () => {
|
||||
});
|
||||
|
||||
this._conn = conn;
|
||||
}
|
||||
|
||||
return this._conn.connect();
|
||||
}
|
||||
|
||||
queueOperationsForUpdates() {
|
||||
// todo: syncback operations belong here!
|
||||
return this._conn.runOperation(new RefreshMailboxesOperation())
|
||||
}
|
||||
|
||||
queueOperationsForFolderSyncs() {
|
||||
const {Category} = this._db;
|
||||
const {mode, options} = this.getCurrentFolderSyncOptionsForPolicy();
|
||||
|
||||
if (mode === 'none') {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return Category.findAll().then((categories) => {
|
||||
const priority = ['inbox', 'drafts', 'sent'];
|
||||
const sorted = categories.sort((a, b) =>
|
||||
priority.indexOf(b.role) - priority.indexOf(a.role)
|
||||
)
|
||||
return Promise.all(sorted.map((cat) =>
|
||||
this._conn.runOperation(new SyncMailboxOperation(cat, options))
|
||||
)).then(() => {
|
||||
if (mode === 'deep') {
|
||||
this._lastFolderDeepSync = Date.now();
|
||||
this._lastFolderRecentSync = Date.now();
|
||||
} else if (mode === 'shallow') {
|
||||
this._lastFolderRecentSync = Date.now();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
syncNow() {
|
||||
clearTimeout(this._syncTimer);
|
||||
|
||||
this.ensureConnection().then(() =>
|
||||
this.queueOperationsForUpdates().then(() =>
|
||||
this.queueOperationsForFolderSyncs()
|
||||
)
|
||||
).catch((err) => {
|
||||
// Sync has failed for some reason. What do we do?!
|
||||
console.error(err);
|
||||
}).finally(() => {
|
||||
this.onSyncDidComplete();
|
||||
this.scheduleNextSync();
|
||||
});
|
||||
}
|
||||
|
||||
scheduleExpiration() {
|
||||
const {expiration} = this._account.syncPolicy;
|
||||
|
||||
clearTimeout(this._expirationTimer);
|
||||
this._expirationTimer = setTimeout(() => this.onExpired(), expiration);
|
||||
}
|
||||
|
||||
scheduleNextSync() {
|
||||
const {folderRecentSync, folderDeepSync} = this._account.syncPolicy;
|
||||
|
||||
let target = Number.MAX_SAFE_INTEGER;
|
||||
|
||||
if (folderRecentSync) {
|
||||
target = Math.min(target, this._lastFolderRecentSync + folderRecentSync.every);
|
||||
}
|
||||
if (folderDeepSync) {
|
||||
target = Math.min(target, this._lastFolderDeepSync + folderDeepSync.every);
|
||||
}
|
||||
|
||||
console.log(`Next sync scheduled for ${new Date(target).toLocaleString()}`);
|
||||
|
||||
this._syncTimer = setTimeout(() => {
|
||||
this.syncNow();
|
||||
}, target - Date.now());
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = SyncWorker;
|
||||
|
|
Loading…
Reference in a new issue