Unify operations, fix a few bugs, logging issues

This commit is contained in:
Ben Gotow 2016-06-20 14:44:02 -07:00
parent 6577bd9358
commit de8e09d6b5
12 changed files with 353 additions and 355 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
.DS_Store
node_modules
storage/a-1.sqlite

View file

@ -33,6 +33,7 @@ class DatabaseConnectionFactory {
const sequelize = new Sequelize(accountId, '', '', {
storage: path.join(STORAGE_DIR, `a-${accountId}.sqlite`),
dialect: "sqlite",
logging: false,
});
const modelsPath = path.join(__dirname, 'models/account');
@ -55,6 +56,7 @@ class DatabaseConnectionFactory {
const sequelize = new Sequelize('shared', '', '', {
storage: path.join(STORAGE_DIR, 'shared.sqlite'),
dialect: "sqlite",
logging: false,
});
const modelsPath = path.join(__dirname, 'models/shared');
@ -72,7 +74,6 @@ class DatabaseConnectionFactory {
this._pools.shared = this._pools.shared || this._sequelizeForShared();
return this._pools.shared;
}
}
module.exports = new DatabaseConnectionFactory()

View file

@ -1,3 +1,5 @@
const crypto = require('crypto');
module.exports = (sequelize, Sequelize) => {
const Message = sequelize.define('Message', {
subject: Sequelize.STRING,
@ -14,6 +16,9 @@ module.exports = (sequelize, Sequelize) => {
// Message.hasMany(Contact, {as: 'from'})
Message.hasMany(MessageUID, {as: 'uids'})
},
hashForHeaders: (headers) => {
return crypto.createHash('sha256').update(headers, 'utf8').digest('hex');
},
},
});

View file

@ -1,6 +1,7 @@
module.exports = (sequelize, Sequelize) => {
const MessageUID = sequelize.define('MessageUID', {
uid: Sequelize.STRING,
messageHash: Sequelize.STRING,
flags: {
type: Sequelize.STRING,
get: function get() {
@ -14,13 +15,12 @@ module.exports = (sequelize, Sequelize) => {
indexes: [
{
unique: true,
fields: ['uid', 'MessageId', 'CategoryId']
}
fields: ['uid', 'CategoryId', 'messageHash'],
},
],
classMethods: {
associate: ({Category, Message}) => {
associate: ({Category}) => {
MessageUID.belongsTo(Category)
MessageUID.belongsTo(Message)
},
},
});

Binary file not shown.

179
sync/imap/connection.js Normal file
View file

@ -0,0 +1,179 @@
const Imap = require('imap');
const EventEmitter = require('events');
const Capabilities = {
Gmail: 'X-GM-EXT-1',
Quota: 'QUOTA',
UIDPlus: 'UIDPLUS',
Condstore: 'CONDSTORE',
Search: 'ESEARCH',
Sort: 'SORT',
}
class IMAPConnection extends EventEmitter {
constructor(db, settings) {
super();
this._db = db;
this._queue = [];
this._current = null;
this._capabilities = [];
this._imap = Promise.promisifyAll(new Imap(settings));
this._imap.once('ready', () => {
for (const key of Object.keys(Capabilities)) {
const val = Capabilities[key];
if (this._imap.serverSupports(val)) {
this._capabilities.push(val);
}
}
this.emit('ready');
});
this._imap.once('error', (err) => {
console.log(err);
});
this._imap.once('end', () => {
console.log('Connection ended');
});
this._imap.on('alert', (msg) => {
console.log(`IMAP SERVER SAYS: ${msg}`)
})
// Emitted when new mail arrives in the currently open mailbox.
// Fix https://github.com/mscdex/node-imap/issues/445
let lastMailEventBox = null;
this._imap.on('mail', () => {
if (lastMailEventBox === this._imap._box.name) {
this.emit('mail');
}
lastMailEventBox = this._imap._box.name
});
// Emitted if the UID validity value for the currently open mailbox
// changes during the current session.
this._imap.on('uidvalidity', () => this.emit('uidvalidity'))
// Emitted when message metadata (e.g. flags) changes externally.
this._imap.on('update', () => this.emit('update'))
this._imap.connect();
}
openBox(box) {
return this._imap.openBoxAsync(box, true);
}
getBoxes() {
return this._imap.getBoxesAsync();
}
fetch(range, messageReadyCallback) {
return new Promise((resolve, reject) => {
const f = this._imap.fetch(range, {
bodies: ['HEADER', 'TEXT'],
});
f.on('message', (msg, uid) =>
this._receiveMessage(msg, uid, messageReadyCallback));
f.once('error', reject);
f.once('end', resolve);
});
}
fetchMessages(uids, messageReadyCallback) {
if (uids.length === 0) {
return Promise.resolve();
}
return this.fetch(uids, messageReadyCallback);
}
fetchUIDAttributes(range) {
return new Promise((resolve, reject) => {
const latestUIDAttributes = {};
const f = this._imap.fetch(range, {});
f.on('message', (msg, uid) => {
msg.on('attributes', (attrs) => {
latestUIDAttributes[uid] = attrs;
})
});
f.once('error', reject);
f.once('end', () => {
resolve(latestUIDAttributes);
});
});
}
_receiveMessage(msg, uid, callback) {
let attributes = null;
let body = null;
let headers = null;
msg.on('attributes', (attrs) => {
attributes = attrs;
});
msg.on('body', (stream, info) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.once('end', () => {
const full = Buffer.concat(chunks).toString('utf8');
if (info.which === 'HEADER') {
headers = full;
}
if (info.which === 'TEXT') {
body = full;
}
});
});
msg.once('end', () => {
callback(attributes, headers, body, uid);
});
}
runOperation(operation) {
return new Promise((resolve, reject) => {
this._queue.push({operation, resolve, reject});
if (this._imap.state === 'authenticated' && !this._current) {
this.processNextOperation();
}
});
}
processNextOperation() {
if (this._current) { return; }
this._current = this._queue.shift();
if (!this._current) {
this.emit('queue-empty');
return;
}
const {operation, resolve, reject} = this._current;
console.log(`Starting task ${operation.description()}`)
const result = operation.run(this._db, this);
if (result instanceof Promise === false) {
throw new Error(`Expected ${operation.constructor.name} to return promise.`);
}
result.catch((err) => {
this._current = null;
console.error(err);
reject();
})
.then(() => {
this._current = null;
console.log(`Finished task ${operation.description()}`)
resolve();
})
.finally(() => {
this.processNextOperation();
});
}
}
module.exports = IMAPConnection

View file

@ -1,131 +0,0 @@
class SyncMailboxOperation {
constructor(category) {
this._category = category;
if (!this._category) {
throw new Error("SyncMailboxOperation requires a category")
}
}
description() {
return `SyncMailboxOperation (${this._category.name})`;
}
_fetch(imap, range) {
return new Promise((resolve, reject) => {
const f = imap.fetch(range, {
bodies: ['HEADER', 'TEXT'],
});
f.on('message', (msg, uid) => this._receiveMessage(msg, uid));
f.once('error', reject);
f.once('end', resolve);
});
}
_unlinkAllMessages() {
const {MessageUID} = this._db;
return MessageUID.destroy({
where: {
CategoryId: this._category.id,
},
})
}
_receiveMessage(msg, uid) {
let attributes = null;
let body = null;
let headers = null;
msg.on('attributes', (attrs) => {
attributes = attrs;
});
msg.on('body', (stream, info) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.once('end', () => {
const full = Buffer.concat(chunks).toString('utf8');
if (info.which === 'HEADER') {
headers = full;
}
if (info.which === 'TEXT') {
body = full;
}
});
});
msg.once('end', () => {
this._processMessage(attributes, headers, body, uid);
});
}
_processMessage(attributes, headers, body) {
console.log(attributes);
const {Message, MessageUID} = this._db;
return Message.create({
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
headers: headers,
body: body,
}).then((model) => {
return MessageUID.create({
MessageId: model.id,
CategoryId: this._category.id,
flags: attributes.flags,
uid: attributes.uid,
});
});
}
// _flushProcessedMessages() {
// return sequelize.transaction((transaction) => {
// return Promise.props({
// msgs: Message.bulkCreate(this._processedMessages, {transaction})
// uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction})
// })
// }).then(() => {
// this._processedMessages = [];
// this._processedMessageUIDs = [];
// });
// }
run(db, imap) {
this._db = db;
return imap.openBoxAsync(this._category.name, true).then((box) => {
this._box = box;
if (box.persistentUIDs === false) {
throw new Error("Mailbox does not support persistentUIDs.")
}
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
return this._unlinkAllMessages();
}
return Promise.resolve();
})
.then(() => {
const savedSyncState = this._category.syncState;
const currentSyncState = {
uidnext: this._box.uidnext,
uidvalidity: this._box.uidvalidity,
}
let fetchRange = `1:*`;
if (savedSyncState.uidnext) {
if (savedSyncState.uidnext === currentSyncState.uidnext) {
return Promise.resolve();
}
fetchRange = `${savedSyncState.uidnext}:*`
}
return this._fetch(imap, fetchRange).then(() => {
this._category.syncState = currentSyncState;
return this._category.save();
});
})
}
}
module.exports = SyncMailboxOperation;

View file

@ -65,9 +65,8 @@ class RefreshMailboxesOperation {
run(db, imap) {
this._db = db;
this._imap = imap;
return imap.getBoxesAsync().then((boxes) => {
return imap.getBoxes().then((boxes) => {
const {Category, sequelize} = this._db;
return sequelize.transaction((transaction) => {

View file

@ -1,98 +0,0 @@
class ScanUIDsOperation {
constructor(category) {
this._category = category;
}
description() {
return `ScanUIDsOperation (${this._category.name})`;
}
_fetchUIDAttributes(imap, range) {
return new Promise((resolve, reject) => {
const latestUIDAttributes = {};
const f = imap.fetch(range, {});
f.on('message', (msg, uid) => {
msg.on('attributes', (attrs) => {
latestUIDAttributes[uid] = attrs;
})
});
f.once('error', reject);
f.once('end', () => {
resolve(latestUIDAttributes);
});
});
}
_fetchMessages(uids) {
if (uids.length === 0) {
return Promise.resolve();
}
console.log(`TODO! NEED TO FETCH UIDS ${uids.join(', ')}`)
return Promise.resolve();
}
_removeDeletedMessageUIDs(removedUIDs) {
const {MessageUID} = this._db;
if (removedUIDs.length === 0) {
return Promise.resolve();
}
return this._db.sequelize.transaction((transaction) =>
MessageUID.destroy({where: {uid: removedUIDs}}, {transaction})
);
}
_deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs) {
const removedUIDs = [];
const neededUIDs = [];
for (const known of knownUIDs) {
if (!latestUIDAttributes[known.uid]) {
removedUIDs.push(known.uid);
continue;
}
if (latestUIDAttributes[known.uid].flags !== known.flags) {
known.flags = latestUIDAttributes[known.uid].flags;
neededUIDs.push(known.uid);
}
delete latestUIDAttributes[known.uid];
}
return {
neededUIDs: neededUIDs.concat(Object.keys(latestUIDAttributes)),
removedUIDs: removedUIDs,
};
}
// _flushProcessedMessages() {
// return sequelize.transaction((transaction) => {
// return Promise.props({
// msgs: Message.bulkCreate(this._processedMessages, {transaction})
// uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction})
// })
// }).then(() => {
// this._processedMessages = [];
// this._processedMessageUIDs = [];
// });
// }
run(db, imap) {
this._db = db;
const {MessageUID} = db;
return imap.openBoxAsync(this._category.name, true).then(() => {
return this._fetchUIDAttributes(imap, `1:*`).then((latestUIDAttributes) => {
return MessageUID.findAll({CategoryId: this._category.id}).then((knownUIDs) => {
const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs);
return Promise.props({
deletes: this._removeDeletedMessageUIDs(removedUIDs),
changes: this._fetchMessages(neededUIDs),
});
});
});
});
}
}
module.exports = ScanUIDsOperation;

View file

@ -0,0 +1,152 @@
const _ = require('underscore');
class SyncMailboxOperation {
constructor(category) {
this._category = category;
if (!this._category) {
throw new Error("SyncMailboxOperation requires a category")
}
}
description() {
return `SyncMailboxOperation (${this._category.name} - ${this._category.id})`;
}
_unlinkAllMessages() {
const {MessageUID} = this._db;
return MessageUID.destroy({
where: {
CategoryId: this._category.id,
},
})
}
_removeDeletedMessageUIDs(removedUIDs) {
const {MessageUID} = this._db;
if (removedUIDs.length === 0) {
return Promise.resolve();
}
return this._db.sequelize.transaction((transaction) =>
MessageUID.destroy({where: {uid: removedUIDs}}, {transaction})
);
}
_deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs) {
const removedUIDs = [];
const neededUIDs = [];
for (const known of knownUIDs) {
if (!latestUIDAttributes[known.uid]) {
removedUIDs.push(known.uid);
continue;
}
if (!_.isEqual(latestUIDAttributes[known.uid].flags, known.flags)) {
known.flags = latestUIDAttributes[known.uid].flags;
neededUIDs.push(known.uid);
}
// delete entries from the attributes hash as we go. At the end,
// remaining keys will be the ones that we don't have locally.
delete latestUIDAttributes[known.uid];
}
return {
neededUIDs: neededUIDs.concat(Object.keys(latestUIDAttributes)),
removedUIDs: removedUIDs,
};
}
_processMessage(attributes, headers, body) {
const {Message, MessageUID} = this._db;
const hash = Message.hashForHeaders(headers);
MessageUID.create({
messageHash: hash,
CategoryId: this._category.id,
flags: attributes.flags,
uid: attributes.uid,
});
return Message.create({
unread: attributes.flags.includes('\\Unseen'),
starred: attributes.flags.includes('\\Flagged'),
date: attributes.date,
headers: headers,
body: body,
});
}
_openMailboxAndCheckValidity() {
return this._imap.openBox(this._category.name, true).then((box) => {
this._box = box;
if (box.persistentUIDs === false) {
throw new Error("Mailbox does not support persistentUIDs.")
}
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
return this._unlinkAllMessages();
}
return Promise.resolve();
})
}
_fetchUnseenMessages() {
const savedSyncState = this._category.syncState;
const currentSyncState = {
uidnext: this._box.uidnext,
uidvalidity: this._box.uidvalidity,
}
console.log(" - fetching unseen messages")
let fetchRange = `1:*`;
if (savedSyncState.uidnext) {
if (savedSyncState.uidnext === currentSyncState.uidnext) {
console.log(" --- nothing more to fetch")
return Promise.resolve();
}
fetchRange = `${savedSyncState.uidnext}:*`
}
return this._imap.fetch(fetchRange, this._processMessage.bind(this)).then(() => {
this._category.syncState = currentSyncState;
return this._category.save();
});
}
_fetchChangesToMessages() {
const {MessageUID} = this._db;
console.log(" - fetching changes to messages")
return this._imap.fetchUIDAttributes(`1:*`).then((latestUIDAttributes) => {
return MessageUID.findAll({where: {CategoryId: this._category.id}}).then((knownUIDs) => {
const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs);
console.log(` - found changed / new UIDs: ${neededUIDs.join(', ')}`)
console.log(` - found removed UIDs: ${removedUIDs.join(', ')}`)
return Promise.props({
deletes: this._removeDeletedMessageUIDs(removedUIDs),
changes: this._imap.fetchMessages(neededUIDs, this._processMessage.bind(this)),
});
});
});
}
run(db, imap) {
this._db = db;
this._imap = imap;
return this._openMailboxAndCheckValidity()
.then(() =>
this._fetchUnseenMessages()
).then(() =>
this._fetchChangesToMessages()
)
}
}
module.exports = SyncMailboxOperation;

View file

@ -5,7 +5,8 @@
"main": "app.js",
"dependencies": {
"bluebird": "^3.4.1",
"imap": "^0.8.17"
"imap": "^0.8.17",
"underscore": "^1.8.3"
},
"devDependencies": {},
"scripts": {

View file

@ -1,120 +1,10 @@
const Imap = require('imap');
const EventEmitter = require('events');
const IMAPConnection = require('./imap/connection');
const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation')
const DiscoverMessagesOperation = require('./imap/discover-messages-operation')
const ScanUIDsOperation = require('./imap/scan-uids-operation')
const Capabilities = {
Gmail: 'X-GM-EXT-1',
Quota: 'QUOTA',
UIDPlus: 'UIDPLUS',
Condstore: 'CONDSTORE',
Search: 'ESEARCH',
Sort: 'SORT',
}
class IMAPConnectionStateMachine extends EventEmitter {
constructor(db, settings) {
super();
this._db = db;
this._queue = [];
this._current = null;
this._capabilities = [];
this._imap = Promise.promisifyAll(new Imap(settings));
this._imap.once('ready', () => {
for (const key of Object.keys(Capabilities)) {
const val = Capabilities[key];
if (this._imap.serverSupports(val)) {
this._capabilities.push(val);
}
}
this.emit('ready');
});
this._imap.once('error', (err) => {
console.log(err);
});
this._imap.once('end', () => {
console.log('Connection ended');
});
this._imap.on('alert', (msg) => {
console.log(`IMAP SERVER SAYS: ${msg}`)
})
// Emitted when new mail arrives in the currently open mailbox.
// Fix https://github.com/mscdex/node-imap/issues/445
let lastMailEventBox = null;
this._imap.on('mail', () => {
if (lastMailEventBox === this._imap._box.name) {
this.emit('mail');
}
lastMailEventBox = this._imap._box.name
});
// Emitted if the UID validity value for the currently open mailbox
// changes during the current session.
this._imap.on('uidvalidity', () => this.emit('uidvalidity'))
// Emitted when message metadata (e.g. flags) changes externally.
this._imap.on('update', () => this.emit('update'))
this._imap.connect();
}
getIMAP() {
return this._imap;
}
runOperation(operation) {
return new Promise((resolve, reject) => {
this._queue.push({operation, resolve, reject});
if (this._imap.state === 'authenticated' && !this._current) {
this.processNextOperation();
}
});
}
processNextOperation() {
if (this._current) { return; }
this._current = this._queue.shift();
if (!this._current) {
this.emit('queue-empty');
return;
}
const {operation, resolve, reject} = this._current;
console.log(`Starting task ${operation.description()}`)
const result = operation.run(this._db, this._imap);
if (result instanceof Promise === false) {
throw new Error(`Expected ${operation.constructor.name} to return promise.`);
}
result.catch((err) => {
this._current = null;
console.error(err);
reject();
})
.then(() => {
this._current = null;
console.log(`Finished task ${operation.description()}`)
resolve();
})
.finally(() => {
this.processNextOperation();
});
}
}
const SyncMailboxOperation = require('./imap/sync-mailbox-operation')
class SyncWorker {
constructor(account, db) {
const main = new IMAPConnectionStateMachine(db, {
const main = new IMAPConnection(db, {
user: 'inboxapptest1@fastmail.fm',
password: 'trar2e',
host: 'mail.messagingengine.com',
@ -134,13 +24,13 @@ class SyncWorker {
throw new Error("Unable to find an inbox category.")
}
main.on('mail', () => {
main.runOperation(new DiscoverMessagesOperation(inboxCategory));
main.runOperation(new SyncMailboxOperation(inboxCategory));
})
main.on('update', () => {
main.runOperation(new ScanUIDsOperation(inboxCategory));
main.runOperation(new SyncMailboxOperation(inboxCategory));
})
main.on('queue-empty', () => {
main.getIMAP().openBoxAsync(inboxCategory.name, true).then(() => {
main.openBox(inboxCategory.name, true).then(() => {
console.log("Idling on inbox category");
});
});
@ -162,8 +52,7 @@ class SyncWorker {
return priority.indexOf(b.role) - priority.indexOf(a.role);
})
for (const cat of sorted) {
this._main.runOperation(new DiscoverMessagesOperation(cat));
this._main.runOperation(new ScanUIDsOperation(cat));
this._main.runOperation(new SyncMailboxOperation(cat));
}
});
}