Cleaner sync state machine

This commit is contained in:
Ben Gotow 2016-06-20 00:19:16 -07:00
parent 25270c0b75
commit 3dc03688e8
6 changed files with 443 additions and 236 deletions

View file

@ -1,7 +1,22 @@
module.exports = (sequelize, Sequelize) => {
const MessageUID = sequelize.define('MessageUID', {
uid: Sequelize.STRING,
flags: {
type: Sequelize.STRING,
get: function get() {
return JSON.parse(this.getDataValue('flags'))
},
set: function set(val) {
this.setDataValue('flags', JSON.stringify(val));
},
},
}, {
indexes: [
{
unique: true,
fields: ['uid', 'MessageId', 'CategoryId']
}
],
classMethods: {
associate: ({Category, Message}) => {
MessageUID.belongsTo(Category)

View file

@ -2,6 +2,7 @@ const path = require('path');
global.__base = path.join(__dirname, '..')
global.config = require(`${__base}/core/config/${process.env.ENV || 'development'}.json`);
global.Promise = require('bluebird');
const DatabaseConnectionFactory = require(`${__base}/core/database-connection-factory`)
const SyncWorkerPool = require('./sync-worker-pool');

View file

@ -0,0 +1,131 @@
class SyncMailboxOperation {
constructor(category) {
this._category = category;
if (!this._category) {
throw new Error("SyncMailboxOperation requires a category")
}
}
description() {
return `SyncMailboxOperation (${this._category.name})`;
}
_fetch(imap, range) {
return new Promise((resolve, reject) => {
const f = imap.fetch(range, {
bodies: ['HEADER', 'TEXT'],
});
f.on('message', (msg, uid) => this._receiveMessage(msg, uid));
f.once('error', reject);
f.once('end', resolve);
});
}
_unlinkAllMessages() {
const {MessageUID} = this._db;
return MessageUID.destroy({
where: {
CategoryId: this._category.id,
},
})
}
_receiveMessage(msg, uid) {
let attributes = null;
let body = null;
let headers = null;
msg.on('attributes', (attrs) => {
attributes = attrs;
});
msg.on('body', (stream, info) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.once('end', () => {
const full = Buffer.concat(chunks).toString('utf8');
if (info.which === 'HEADER') {
headers = full;
}
if (info.which === 'TEXT') {
body = full;
}
});
});
msg.once('end', () => {
this._processMessage(attributes, headers, body, uid);
});
}
_processMessage(attributes, headers, body) {
console.log(attributes);
const {Message, MessageUID} = this._db;
return Message.create({
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
headers: headers,
body: body,
}).then((model) => {
return MessageUID.create({
MessageId: model.id,
CategoryId: this._category.id,
flags: attributes.flags,
uid: attributes.uid,
});
});
}
// _flushProcessedMessages() {
// return sequelize.transaction((transaction) => {
// return Promise.props({
// msgs: Message.bulkCreate(this._processedMessages, {transaction})
// uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction})
// })
// }).then(() => {
// this._processedMessages = [];
// this._processedMessageUIDs = [];
// });
// }
run(db, imap) {
this._db = db;
return imap.openBoxAsync(this._category.name, true).then((box) => {
this._box = box;
if (box.persistentUIDs === false) {
throw new Error("Mailbox does not support persistentUIDs.")
}
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
return this._unlinkAllMessages();
}
return Promise.resolve();
})
.then(() => {
const savedSyncState = this._category.syncState;
const currentSyncState = {
uidnext: this._box.uidnext,
uidvalidity: this._box.uidvalidity,
}
let fetchRange = `1:*`;
if (savedSyncState.uidnext) {
if (savedSyncState.uidnext === currentSyncState.uidnext) {
return Promise.resolve();
}
fetchRange = `${savedSyncState.uidnext}:*`
}
return this._fetch(imap, fetchRange).then(() => {
this._category.syncState = currentSyncState;
return this._category.save();
});
})
}
}
module.exports = SyncMailboxOperation;

View file

@ -0,0 +1,87 @@
class RefreshMailboxesOperation {
description() {
return `RefreshMailboxesOperation`;
}
_roleForMailbox(boxName, box) {
for (const attrib of (box.attribs || [])) {
const role = {
'\\Sent': 'sent',
'\\Drafts': 'drafts',
'\\Junk': 'junk',
'\\Flagged': 'flagged',
}[attrib];
if (role) {
return role;
}
}
if (boxName.toLowerCase().trim() === 'inbox') {
return 'inbox';
}
return null;
}
_updateCategoriesWithBoxes(categories, boxes) {
const {Category} = this._db;
const stack = [];
const created = [];
const next = [];
Object.keys(boxes).forEach((boxName) => {
stack.push([boxName, boxes[boxName]]);
});
while (stack.length > 0) {
const [boxName, box] = stack.pop();
if (!box.attribs) {
// Some boxes seem to come back as partial objects. Not sure why, but
// I also can't access them via openMailbox. Possible node-imap i8n issue?
continue;
}
if (box.children && box.attribs.includes('\\HasChildren')) {
Object.keys(box.children).forEach((subname) => {
stack.push([`${boxName}${box.delimiter}${subname}`, box.children[subname]]);
});
}
let category = categories.find((cat) => cat.name === boxName);
if (!category) {
category = Category.build({
name: boxName,
role: this._roleForMailbox(boxName, box),
});
created.push(category);
}
next.push(category);
}
// Todo: decide whether these are renames or deletes
const deleted = categories.filter(cat => !next.includes(cat));
return {next, created, deleted};
}
run(db, imap) {
this._db = db;
this._imap = imap;
return imap.getBoxesAsync().then((boxes) => {
const {Category, sequelize} = this._db;
return sequelize.transaction((transaction) => {
return Category.findAll({transaction}).then((categories) => {
const {created, deleted} = this._updateCategoriesWithBoxes(categories, boxes);
let promises = [Promise.resolve()]
promises = promises.concat(created.map(cat => cat.save({transaction})))
promises = promises.concat(deleted.map(cat => cat.destroy({transaction})))
return Promise.all(promises)
});
});
});
}
}
module.exports = RefreshMailboxesOperation;

View file

@ -0,0 +1,98 @@
class ScanUIDsOperation {
constructor(category) {
this._category = category;
}
description() {
return `ScanUIDsOperation (${this._category.name})`;
}
_fetchUIDAttributes(imap, range) {
return new Promise((resolve, reject) => {
const latestUIDAttributes = {};
const f = imap.fetch(range, {});
f.on('message', (msg, uid) => {
msg.on('attributes', (attrs) => {
latestUIDAttributes[uid] = attrs;
})
});
f.once('error', reject);
f.once('end', () => {
resolve(latestUIDAttributes);
});
});
}
_fetchMessages(uids) {
if (uids.length === 0) {
return Promise.resolve();
}
console.log(`TODO! NEED TO FETCH UIDS ${uids.join(', ')}`)
return Promise.resolve();
}
_removeDeletedMessageUIDs(removedUIDs) {
const {MessageUID} = this._db;
if (removedUIDs.length === 0) {
return Promise.resolve();
}
return this._db.sequelize.transaction((transaction) =>
MessageUID.destroy({where: {uid: removedUIDs}}, {transaction})
);
}
_deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs) {
const removedUIDs = [];
const neededUIDs = [];
for (const known of knownUIDs) {
if (!latestUIDAttributes[known.uid]) {
removedUIDs.push(known.uid);
continue;
}
if (latestUIDAttributes[known.uid].flags !== known.flags) {
known.flags = latestUIDAttributes[known.uid].flags;
neededUIDs.push(known.uid);
}
delete latestUIDAttributes[known.uid];
}
return {
neededUIDs: neededUIDs.concat(Object.keys(latestUIDAttributes)),
removedUIDs: removedUIDs,
};
}
// _flushProcessedMessages() {
// return sequelize.transaction((transaction) => {
// return Promise.props({
// msgs: Message.bulkCreate(this._processedMessages, {transaction})
// uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction})
// })
// }).then(() => {
// this._processedMessages = [];
// this._processedMessageUIDs = [];
// });
// }
run(db, imap) {
this._db = db;
const {MessageUID} = db;
return imap.openBoxAsync(this._category.name, true).then(() => {
return this._fetchUIDAttributes(imap, `1:*`).then((latestUIDAttributes) => {
return MessageUID.findAll({CategoryId: this._category.id}).then((knownUIDs) => {
const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs);
return Promise.props({
deletes: this._removeDeletedMessageUIDs(removedUIDs),
changes: this._fetchMessages(neededUIDs),
});
});
});
});
}
}
module.exports = ScanUIDsOperation;

View file

@ -1,12 +1,9 @@
const {inspect} = require('util');
const Promise = require('bluebird');
const Imap = require('imap');
const EventEmitter = require('events');
const State = {
Closed: 'closed',
Connecting: 'connecting',
Open: 'open',
}
const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation')
const DiscoverMessagesOperation = require('./imap/discover-messages-operation')
const ScanUIDsOperation = require('./imap/scan-uids-operation')
const Capabilities = {
Gmail: 'X-GM-EXT-1',
@ -17,40 +14,69 @@ const Capabilities = {
Sort: 'SORT',
}
class SyncIMAPConnection {
constructor(settings) {
class IMAPConnectionStateMachine extends EventEmitter {
constructor(db, settings) {
super();
this._db = db;
this._queue = [];
this._current = null;
this._state = State.Connecting;
this._capabilities = [];
this._imap = Promise.promisifyAll(new Imap(settings));
this._imap.once('ready', () => {
this._state = State.Open;
for (const key of Object.keys(Capabilities)) {
const val = Capabilities[key];
if (this._imap.serverSupports(val)) {
this._capabilities.push(val);
}
}
this.processNextOperation();
this.emit('ready');
});
this._imap.once('error', (err) => {
console.log(err);
});
this._imap.once('end', () => {
this._state = State.Closed;
console.log('Connection ended');
});
this._imap.on('alert', (msg) => {
console.log(`IMAP SERVER SAYS: ${msg}`)
})
// Emitted when new mail arrives in the currently open mailbox.
// Fix https://github.com/mscdex/node-imap/issues/445
let lastMailEventBox = null;
this._imap.on('mail', () => {
if (lastMailEventBox === this._imap._box.name) {
this.emit('mail');
}
lastMailEventBox = this._imap._box.name
});
// Emitted if the UID validity value for the currently open mailbox
// changes during the current session.
this._imap.on('uidvalidity', () => this.emit('uidvalidity'))
// Emitted when message metadata (e.g. flags) changes externally.
this._imap.on('update', () => this.emit('update'))
this._imap.connect();
}
queueOperation(op) {
this._queue.push(op);
if (this._state === State.Open && !this._current) {
this.processNextOperation();
}
getIMAP() {
return this._imap;
}
runOperation(operation) {
return new Promise((resolve, reject) => {
this._queue.push({operation, resolve, reject});
if (this._imap.state === 'authenticated' && !this._current) {
this.processNextOperation();
}
});
}
processNextOperation() {
@ -58,239 +84,88 @@ class SyncIMAPConnection {
this._current = this._queue.shift();
if (this._current) {
console.log(`Starting task ${this._current.constructor.name}`)
const result = this._current.run(this._imap);
if (result instanceof Promise === false) {
throw new Error(`processNextOperation: Expected ${this._current.constructor.name} to return promise.`);
}
result.catch((err) => {
this._current = null;
console.error(err);
});
result.then(() => {
console.log(`Finished task ${this._current.constructor.name}`)
this._current = null;
this.processNextOperation();
});
if (!this._current) {
this.emit('queue-empty');
return;
}
}
}
class SyncMailboxOperation {
constructor(db, {role} = {}) {
this._db = db;
this._category = null;
this._box = null;
}
const {operation, resolve, reject} = this._current;
_fetch(imap, range) {
return new Promise((resolve, reject) => {
const f = imap.fetch(range, {
bodies: ['HEADER', 'TEXT'],
});
f.on('message', (msg, uid) => this._receiveMessage(msg, uid));
f.once('error', reject);
f.once('end', resolve);
});
}
_unlinkAllMessages() {
const {MessageUID} = this._db;
return MessageUID.destroy({
where: {
categoryId: this._category.id,
},
})
}
_receiveMessage(msg, uid) {
let attributes = null;
let body = null;
let headers = null;
msg.on('attributes', (attrs) => {
attributes = attrs;
});
msg.on('body', (stream, type) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.once('end', () => {
const full = Buffer.concat(chunks).toString('utf8');
if (type === 'TEXT') {
body = full;
}
if (type === 'HEADERS') {
headers = full;
}
});
});
msg.once('end', () => {
this._processMessage(attributes, headers, body, uid);
});
}
_processMessage(attributes, headers, body) {
console.log(attributes);
const {Message, MessageUID} = this._db;
return Message.create({
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
body: body,
}).then((model) => {
return MessageUID.create({
MessageId: model.id,
CategoryId: this._category.id,
uid: attributes.uid,
});
});
}
// _flushProcessedMessages() {
// return sequelize.transaction((transaction) => {
// return Promise.props({
// msgs: Message.bulkCreate(this._processedMessages, {transaction})
// uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction})
// })
// }).then(() => {
// this._processedMessages = [];
// this._processedMessageUIDs = [];
// });
// }
run(imap) {
const {Category} = this._db;
return Promise.props({
box: imap.openBoxAsync('INBOX', true),
category: Category.find({name: 'INBOX'}),
})
.then(({category, box}) => {
if (this.box.persistentUIDs === false) {
throw new Error("Mailbox does not support persistentUIDs.")
}
this._category = category;
this._box = box;
if (box.uidvalidity !== category.syncState.uidvalidity) {
return this._unlinkAllMessages();
}
return Promise.resolve();
console.log(`Starting task ${operation.description()}`)
const result = operation.run(this._db, this._imap);
if (result instanceof Promise === false) {
throw new Error(`Expected ${operation.constructor.name} to return promise.`);
}
result.catch((err) => {
this._current = null;
console.error(err);
reject();
})
.then(() => {
const lastUIDNext = this._category.syncState.uidnext;
const currentUIDNext = this._box.uidnext
if (lastUIDNext) {
if (lastUIDNext === currentUIDNext) {
return Promise.resolve();
}
// just request mail >= UIDNext
return this._fetch(imap, `${lastUIDNext}:*`);
}
return this._fetch(imap, `1:*`);
});
}
}
class RefreshMailboxesOperation {
constructor(db) {
this._db = db;
}
_roleForMailbox(box) {
for (const attrib of (box.attribs || [])) {
const role = {
'\\Sent': 'sent',
'\\Drafts': 'drafts',
'\\Junk': 'junk',
'\\Flagged': 'flagged',
}[attrib];
if (role) {
return role;
}
}
return null;
}
_updateCategoriesWithBoxes(categories, boxes) {
const {Category} = this._db;
const stack = [];
const created = [];
const next = [];
Object.keys(boxes).forEach((name) => {
stack.push([name, boxes[name]]);
});
while (stack.length > 0) {
const [name, box] = stack.pop();
if (box.children) {
Object.keys(box.children).forEach((subname) => {
stack.push([`${name}/${subname}`, box.children[subname]]);
});
}
let category = categories.find((cat) => cat.name === name);
if (!category) {
category = Category.build({
name: name,
role: this._roleForMailbox(box),
});
created.push(category);
}
next.push(category);
}
// Todo: decide whether these are renames or deletes
const deleted = categories.filter(cat => !next.includes(cat));
return {next, created, deleted};
}
run(imap) {
return imap.getBoxesAsync().then((boxes) => {
const {Category, sequelize} = this._db;
return sequelize.transaction((transaction) => {
return Category.findAll({transaction}).then((categories) => {
const {created, deleted} = this._updateCategoriesWithBoxes(categories, boxes);
let promises = [Promise.resolve()]
promises = promises.concat(created.map(cat => cat.save({transaction})))
promises = promises.concat(deleted.map(cat => cat.destroy({transaction})))
return Promise.all(promises)
});
});
this._current = null;
console.log(`Finished task ${operation.description()}`)
resolve();
})
.finally(() => {
this.processNextOperation();
});
}
}
class SyncWorker {
constructor(account, db) {
this._db = db
this._conns = []
const main = new SyncIMAPConnection({
const main = new IMAPConnectionStateMachine(db, {
user: 'inboxapptest1@fastmail.fm',
password: 'trar2e',
host: 'mail.messagingengine.com',
port: 993,
tls: true,
})
main.queueOperation(new RefreshMailboxesOperation(db));
main.queueOperation(new SyncMailboxOperation(db, {
role: 'inbox',
}));
this._conns.push(main);
});
// Todo: SyncWorker should decide what operations to queue and what params
// to pass them, and how often, based on SyncPolicy model (TBD).
main.on('ready', () => {
main.runOperation(new RefreshMailboxesOperation())
.then(() =>
this._db.Category.find({where: {role: 'inbox'}})
).then((inboxCategory) => {
if (!inboxCategory) {
throw new Error("Unable to find an inbox category.")
}
main.on('mail', () => {
main.runOperation(new DiscoverMessagesOperation(inboxCategory));
})
main.on('update', () => {
main.runOperation(new ScanUIDsOperation(inboxCategory));
})
main.on('queue-empty', () => {
main.getIMAP().openBoxAsync(inboxCategory.name, true).then(() => {
console.log("Idling on inbox category");
});
});
setInterval(() => this.syncAllMailboxes(), 120 * 1000);
this.syncAllMailboxes();
});
});
this._db = db;
this._main = main;
}
syncAllMailboxes() {
const {Category} = this._db;
Category.findAll().then((categories) => {
const priority = ['inbox', 'drafts', 'sent'];
const sorted = categories.sort((a, b) => {
return priority.indexOf(b.role) - priority.indexOf(a.role);
})
for (const cat of sorted) {
this._main.runOperation(new DiscoverMessagesOperation(cat));
this._main.runOperation(new ScanUIDsOperation(cat));
}
});
}
}