diff --git a/.gitignore b/.gitignore index 9daa8247d..4cd9e26d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .DS_Store node_modules +storage/a-1.sqlite diff --git a/core/database-connection-factory.js b/core/database-connection-factory.js index d9dd80a9b..59d021487 100644 --- a/core/database-connection-factory.js +++ b/core/database-connection-factory.js @@ -33,6 +33,7 @@ class DatabaseConnectionFactory { const sequelize = new Sequelize(accountId, '', '', { storage: path.join(STORAGE_DIR, `a-${accountId}.sqlite`), dialect: "sqlite", + logging: false, }); const modelsPath = path.join(__dirname, 'models/account'); @@ -55,6 +56,7 @@ class DatabaseConnectionFactory { const sequelize = new Sequelize('shared', '', '', { storage: path.join(STORAGE_DIR, 'shared.sqlite'), dialect: "sqlite", + logging: false, }); const modelsPath = path.join(__dirname, 'models/shared'); @@ -72,7 +74,6 @@ class DatabaseConnectionFactory { this._pools.shared = this._pools.shared || this._sequelizeForShared(); return this._pools.shared; } - } module.exports = new DatabaseConnectionFactory() diff --git a/core/models/account/message.js b/core/models/account/message.js index ced3bd00d..9418aaba4 100644 --- a/core/models/account/message.js +++ b/core/models/account/message.js @@ -1,3 +1,5 @@ +const crypto = require('crypto'); + module.exports = (sequelize, Sequelize) => { const Message = sequelize.define('Message', { subject: Sequelize.STRING, @@ -14,6 +16,9 @@ module.exports = (sequelize, Sequelize) => { // Message.hasMany(Contact, {as: 'from'}) Message.hasMany(MessageUID, {as: 'uids'}) }, + hashForHeaders: (headers) => { + return crypto.createHash('sha256').update(headers, 'utf8').digest('hex'); + }, }, }); diff --git a/core/models/account/message_uid.js b/core/models/account/message_uid.js index d9174061f..0c7a2352d 100644 --- a/core/models/account/message_uid.js +++ b/core/models/account/message_uid.js @@ -1,6 +1,7 @@ module.exports = (sequelize, Sequelize) => { const MessageUID = sequelize.define('MessageUID', { uid: Sequelize.STRING, + messageHash: Sequelize.STRING, flags: { type: Sequelize.STRING, get: function get() { @@ -14,13 +15,12 @@ module.exports = (sequelize, Sequelize) => { indexes: [ { unique: true, - fields: ['uid', 'MessageId', 'CategoryId'] - } + fields: ['uid', 'CategoryId', 'messageHash'], + }, ], classMethods: { - associate: ({Category, Message}) => { + associate: ({Category}) => { MessageUID.belongsTo(Category) - MessageUID.belongsTo(Message) }, }, }); diff --git a/storage/a-1.sqlite b/storage/a-1.sqlite deleted file mode 100644 index 2ef02b1ce..000000000 Binary files a/storage/a-1.sqlite and /dev/null differ diff --git a/sync/imap/connection.js b/sync/imap/connection.js new file mode 100644 index 000000000..85b4ac6a0 --- /dev/null +++ b/sync/imap/connection.js @@ -0,0 +1,179 @@ +const Imap = require('imap'); +const EventEmitter = require('events'); + +const Capabilities = { + Gmail: 'X-GM-EXT-1', + Quota: 'QUOTA', + UIDPlus: 'UIDPLUS', + Condstore: 'CONDSTORE', + Search: 'ESEARCH', + Sort: 'SORT', +} + +class IMAPConnection extends EventEmitter { + constructor(db, settings) { + super(); + + this._db = db; + this._queue = []; + this._current = null; + this._capabilities = []; + this._imap = Promise.promisifyAll(new Imap(settings)); + + this._imap.once('ready', () => { + for (const key of Object.keys(Capabilities)) { + const val = Capabilities[key]; + if (this._imap.serverSupports(val)) { + this._capabilities.push(val); + } + } + this.emit('ready'); + }); + + this._imap.once('error', (err) => { + console.log(err); + }); + + this._imap.once('end', () => { + console.log('Connection ended'); + }); + + this._imap.on('alert', (msg) => { + console.log(`IMAP SERVER SAYS: ${msg}`) + }) + + // Emitted when new mail arrives in the currently open mailbox. + // Fix https://github.com/mscdex/node-imap/issues/445 + let lastMailEventBox = null; + this._imap.on('mail', () => { + if (lastMailEventBox === this._imap._box.name) { + this.emit('mail'); + } + lastMailEventBox = this._imap._box.name + }); + + // Emitted if the UID validity value for the currently open mailbox + // changes during the current session. + this._imap.on('uidvalidity', () => this.emit('uidvalidity')) + + // Emitted when message metadata (e.g. flags) changes externally. + this._imap.on('update', () => this.emit('update')) + + this._imap.connect(); + } + + openBox(box) { + return this._imap.openBoxAsync(box, true); + } + + getBoxes() { + return this._imap.getBoxesAsync(); + } + + fetch(range, messageReadyCallback) { + return new Promise((resolve, reject) => { + const f = this._imap.fetch(range, { + bodies: ['HEADER', 'TEXT'], + }); + f.on('message', (msg, uid) => + this._receiveMessage(msg, uid, messageReadyCallback)); + f.once('error', reject); + f.once('end', resolve); + }); + } + + fetchMessages(uids, messageReadyCallback) { + if (uids.length === 0) { + return Promise.resolve(); + } + return this.fetch(uids, messageReadyCallback); + } + + fetchUIDAttributes(range) { + return new Promise((resolve, reject) => { + const latestUIDAttributes = {}; + const f = this._imap.fetch(range, {}); + f.on('message', (msg, uid) => { + msg.on('attributes', (attrs) => { + latestUIDAttributes[uid] = attrs; + }) + }); + f.once('error', reject); + f.once('end', () => { + resolve(latestUIDAttributes); + }); + }); + } + + _receiveMessage(msg, uid, callback) { + let attributes = null; + let body = null; + let headers = null; + + msg.on('attributes', (attrs) => { + attributes = attrs; + }); + msg.on('body', (stream, info) => { + const chunks = []; + + stream.on('data', (chunk) => { + chunks.push(chunk); + }); + stream.once('end', () => { + const full = Buffer.concat(chunks).toString('utf8'); + if (info.which === 'HEADER') { + headers = full; + } + if (info.which === 'TEXT') { + body = full; + } + }); + }); + msg.once('end', () => { + callback(attributes, headers, body, uid); + }); + } + + runOperation(operation) { + return new Promise((resolve, reject) => { + this._queue.push({operation, resolve, reject}); + if (this._imap.state === 'authenticated' && !this._current) { + this.processNextOperation(); + } + }); + } + + processNextOperation() { + if (this._current) { return; } + + this._current = this._queue.shift(); + + if (!this._current) { + this.emit('queue-empty'); + return; + } + + const {operation, resolve, reject} = this._current; + + console.log(`Starting task ${operation.description()}`) + const result = operation.run(this._db, this); + if (result instanceof Promise === false) { + throw new Error(`Expected ${operation.constructor.name} to return promise.`); + } + result.catch((err) => { + this._current = null; + console.error(err); + reject(); + }) + .then(() => { + this._current = null; + console.log(`Finished task ${operation.description()}`) + resolve(); + }) + .finally(() => { + this.processNextOperation(); + }); + } +} + +module.exports = IMAPConnection diff --git a/sync/imap/discover-messages-operation.js b/sync/imap/discover-messages-operation.js deleted file mode 100644 index 008aa1345..000000000 --- a/sync/imap/discover-messages-operation.js +++ /dev/null @@ -1,131 +0,0 @@ -class SyncMailboxOperation { - constructor(category) { - this._category = category; - if (!this._category) { - throw new Error("SyncMailboxOperation requires a category") - } - } - - description() { - return `SyncMailboxOperation (${this._category.name})`; - } - - _fetch(imap, range) { - return new Promise((resolve, reject) => { - const f = imap.fetch(range, { - bodies: ['HEADER', 'TEXT'], - }); - f.on('message', (msg, uid) => this._receiveMessage(msg, uid)); - f.once('error', reject); - f.once('end', resolve); - }); - } - - _unlinkAllMessages() { - const {MessageUID} = this._db; - return MessageUID.destroy({ - where: { - CategoryId: this._category.id, - }, - }) - } - - _receiveMessage(msg, uid) { - let attributes = null; - let body = null; - let headers = null; - - msg.on('attributes', (attrs) => { - attributes = attrs; - }); - msg.on('body', (stream, info) => { - const chunks = []; - - stream.on('data', (chunk) => { - chunks.push(chunk); - }); - stream.once('end', () => { - const full = Buffer.concat(chunks).toString('utf8'); - if (info.which === 'HEADER') { - headers = full; - } - if (info.which === 'TEXT') { - body = full; - } - }); - }); - msg.once('end', () => { - this._processMessage(attributes, headers, body, uid); - }); - } - - _processMessage(attributes, headers, body) { - console.log(attributes); - const {Message, MessageUID} = this._db; - - return Message.create({ - unread: attributes.flags.includes('\\Unseen'), - starred: attributes.flags.includes('\\Flagged'), - date: attributes.date, - headers: headers, - body: body, - }).then((model) => { - return MessageUID.create({ - MessageId: model.id, - CategoryId: this._category.id, - flags: attributes.flags, - uid: attributes.uid, - }); - }); - } - - // _flushProcessedMessages() { - // return sequelize.transaction((transaction) => { - // return Promise.props({ - // msgs: Message.bulkCreate(this._processedMessages, {transaction}) - // uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction}) - // }) - // }).then(() => { - // this._processedMessages = []; - // this._processedMessageUIDs = []; - // }); - // } - - run(db, imap) { - this._db = db; - - return imap.openBoxAsync(this._category.name, true).then((box) => { - this._box = box; - - if (box.persistentUIDs === false) { - throw new Error("Mailbox does not support persistentUIDs.") - } - if (box.uidvalidity !== this._category.syncState.uidvalidity) { - return this._unlinkAllMessages(); - } - return Promise.resolve(); - }) - .then(() => { - const savedSyncState = this._category.syncState; - const currentSyncState = { - uidnext: this._box.uidnext, - uidvalidity: this._box.uidvalidity, - } - - let fetchRange = `1:*`; - if (savedSyncState.uidnext) { - if (savedSyncState.uidnext === currentSyncState.uidnext) { - return Promise.resolve(); - } - fetchRange = `${savedSyncState.uidnext}:*` - } - - return this._fetch(imap, fetchRange).then(() => { - this._category.syncState = currentSyncState; - return this._category.save(); - }); - }) - } -} - -module.exports = SyncMailboxOperation; diff --git a/sync/imap/refresh-mailboxes-operation.js b/sync/imap/refresh-mailboxes-operation.js index de531bbab..1efaf9f8a 100644 --- a/sync/imap/refresh-mailboxes-operation.js +++ b/sync/imap/refresh-mailboxes-operation.js @@ -65,9 +65,8 @@ class RefreshMailboxesOperation { run(db, imap) { this._db = db; - this._imap = imap; - return imap.getBoxesAsync().then((boxes) => { + return imap.getBoxes().then((boxes) => { const {Category, sequelize} = this._db; return sequelize.transaction((transaction) => { diff --git a/sync/imap/scan-uids-operation.js b/sync/imap/scan-uids-operation.js deleted file mode 100644 index 615a940db..000000000 --- a/sync/imap/scan-uids-operation.js +++ /dev/null @@ -1,98 +0,0 @@ -class ScanUIDsOperation { - constructor(category) { - this._category = category; - } - - description() { - return `ScanUIDsOperation (${this._category.name})`; - } - - _fetchUIDAttributes(imap, range) { - return new Promise((resolve, reject) => { - const latestUIDAttributes = {}; - const f = imap.fetch(range, {}); - f.on('message', (msg, uid) => { - msg.on('attributes', (attrs) => { - latestUIDAttributes[uid] = attrs; - }) - }); - f.once('error', reject); - f.once('end', () => { - resolve(latestUIDAttributes); - }); - }); - } - - _fetchMessages(uids) { - if (uids.length === 0) { - return Promise.resolve(); - } - console.log(`TODO! NEED TO FETCH UIDS ${uids.join(', ')}`) - return Promise.resolve(); - } - - _removeDeletedMessageUIDs(removedUIDs) { - const {MessageUID} = this._db; - - if (removedUIDs.length === 0) { - return Promise.resolve(); - } - return this._db.sequelize.transaction((transaction) => - MessageUID.destroy({where: {uid: removedUIDs}}, {transaction}) - ); - } - - _deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs) { - const removedUIDs = []; - const neededUIDs = []; - - for (const known of knownUIDs) { - if (!latestUIDAttributes[known.uid]) { - removedUIDs.push(known.uid); - continue; - } - if (latestUIDAttributes[known.uid].flags !== known.flags) { - known.flags = latestUIDAttributes[known.uid].flags; - neededUIDs.push(known.uid); - } - delete latestUIDAttributes[known.uid]; - } - - return { - neededUIDs: neededUIDs.concat(Object.keys(latestUIDAttributes)), - removedUIDs: removedUIDs, - }; - } - - // _flushProcessedMessages() { - // return sequelize.transaction((transaction) => { - // return Promise.props({ - // msgs: Message.bulkCreate(this._processedMessages, {transaction}) - // uids: MessageUID.bulkCreate(this._processedMessageUIDs, {transaction}) - // }) - // }).then(() => { - // this._processedMessages = []; - // this._processedMessageUIDs = []; - // }); - // } - - run(db, imap) { - this._db = db; - const {MessageUID} = db; - - return imap.openBoxAsync(this._category.name, true).then(() => { - return this._fetchUIDAttributes(imap, `1:*`).then((latestUIDAttributes) => { - return MessageUID.findAll({CategoryId: this._category.id}).then((knownUIDs) => { - const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs); - - return Promise.props({ - deletes: this._removeDeletedMessageUIDs(removedUIDs), - changes: this._fetchMessages(neededUIDs), - }); - }); - }); - }); - } -} - -module.exports = ScanUIDsOperation; diff --git a/sync/imap/sync-mailbox-operation.js b/sync/imap/sync-mailbox-operation.js new file mode 100644 index 000000000..3ab16b52d --- /dev/null +++ b/sync/imap/sync-mailbox-operation.js @@ -0,0 +1,152 @@ +const _ = require('underscore'); + +class SyncMailboxOperation { + constructor(category) { + this._category = category; + if (!this._category) { + throw new Error("SyncMailboxOperation requires a category") + } + } + + description() { + return `SyncMailboxOperation (${this._category.name} - ${this._category.id})`; + } + + _unlinkAllMessages() { + const {MessageUID} = this._db; + return MessageUID.destroy({ + where: { + CategoryId: this._category.id, + }, + }) + } + + _removeDeletedMessageUIDs(removedUIDs) { + const {MessageUID} = this._db; + + if (removedUIDs.length === 0) { + return Promise.resolve(); + } + return this._db.sequelize.transaction((transaction) => + MessageUID.destroy({where: {uid: removedUIDs}}, {transaction}) + ); + } + + _deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs) { + const removedUIDs = []; + const neededUIDs = []; + + for (const known of knownUIDs) { + if (!latestUIDAttributes[known.uid]) { + removedUIDs.push(known.uid); + continue; + } + if (!_.isEqual(latestUIDAttributes[known.uid].flags, known.flags)) { + known.flags = latestUIDAttributes[known.uid].flags; + neededUIDs.push(known.uid); + } + + // delete entries from the attributes hash as we go. At the end, + // remaining keys will be the ones that we don't have locally. + delete latestUIDAttributes[known.uid]; + } + + return { + neededUIDs: neededUIDs.concat(Object.keys(latestUIDAttributes)), + removedUIDs: removedUIDs, + }; + } + + _processMessage(attributes, headers, body) { + const {Message, MessageUID} = this._db; + + const hash = Message.hashForHeaders(headers); + + MessageUID.create({ + messageHash: hash, + CategoryId: this._category.id, + flags: attributes.flags, + uid: attributes.uid, + }); + + return Message.create({ + unread: attributes.flags.includes('\\Unseen'), + starred: attributes.flags.includes('\\Flagged'), + date: attributes.date, + headers: headers, + body: body, + }); + } + + _openMailboxAndCheckValidity() { + return this._imap.openBox(this._category.name, true).then((box) => { + this._box = box; + + if (box.persistentUIDs === false) { + throw new Error("Mailbox does not support persistentUIDs.") + } + if (box.uidvalidity !== this._category.syncState.uidvalidity) { + return this._unlinkAllMessages(); + } + return Promise.resolve(); + }) + } + + _fetchUnseenMessages() { + const savedSyncState = this._category.syncState; + const currentSyncState = { + uidnext: this._box.uidnext, + uidvalidity: this._box.uidvalidity, + } + + console.log(" - fetching unseen messages") + + let fetchRange = `1:*`; + if (savedSyncState.uidnext) { + if (savedSyncState.uidnext === currentSyncState.uidnext) { + console.log(" --- nothing more to fetch") + return Promise.resolve(); + } + fetchRange = `${savedSyncState.uidnext}:*` + } + + return this._imap.fetch(fetchRange, this._processMessage.bind(this)).then(() => { + this._category.syncState = currentSyncState; + return this._category.save(); + }); + } + + _fetchChangesToMessages() { + const {MessageUID} = this._db; + + console.log(" - fetching changes to messages") + + return this._imap.fetchUIDAttributes(`1:*`).then((latestUIDAttributes) => { + return MessageUID.findAll({where: {CategoryId: this._category.id}}).then((knownUIDs) => { + const {removedUIDs, neededUIDs} = this._deltasInUIDsAndFlags(latestUIDAttributes, knownUIDs); + + console.log(` - found changed / new UIDs: ${neededUIDs.join(', ')}`) + console.log(` - found removed UIDs: ${removedUIDs.join(', ')}`) + + return Promise.props({ + deletes: this._removeDeletedMessageUIDs(removedUIDs), + changes: this._imap.fetchMessages(neededUIDs, this._processMessage.bind(this)), + }); + }); + }); + } + + run(db, imap) { + this._db = db; + this._imap = imap; + + return this._openMailboxAndCheckValidity() + .then(() => + this._fetchUnseenMessages() + ).then(() => + this._fetchChangesToMessages() + ) + } +} + +module.exports = SyncMailboxOperation; diff --git a/sync/package.json b/sync/package.json index 07c308f66..0dcb6228b 100644 --- a/sync/package.json +++ b/sync/package.json @@ -5,7 +5,8 @@ "main": "app.js", "dependencies": { "bluebird": "^3.4.1", - "imap": "^0.8.17" + "imap": "^0.8.17", + "underscore": "^1.8.3" }, "devDependencies": {}, "scripts": { diff --git a/sync/sync-worker.js b/sync/sync-worker.js index 4850f28cd..ad148afcc 100644 --- a/sync/sync-worker.js +++ b/sync/sync-worker.js @@ -1,120 +1,10 @@ -const Imap = require('imap'); -const EventEmitter = require('events'); - +const IMAPConnection = require('./imap/connection'); const RefreshMailboxesOperation = require('./imap/refresh-mailboxes-operation') -const DiscoverMessagesOperation = require('./imap/discover-messages-operation') -const ScanUIDsOperation = require('./imap/scan-uids-operation') - -const Capabilities = { - Gmail: 'X-GM-EXT-1', - Quota: 'QUOTA', - UIDPlus: 'UIDPLUS', - Condstore: 'CONDSTORE', - Search: 'ESEARCH', - Sort: 'SORT', -} - -class IMAPConnectionStateMachine extends EventEmitter { - constructor(db, settings) { - super(); - - this._db = db; - this._queue = []; - this._current = null; - this._capabilities = []; - this._imap = Promise.promisifyAll(new Imap(settings)); - - this._imap.once('ready', () => { - for (const key of Object.keys(Capabilities)) { - const val = Capabilities[key]; - if (this._imap.serverSupports(val)) { - this._capabilities.push(val); - } - } - this.emit('ready'); - }); - - this._imap.once('error', (err) => { - console.log(err); - }); - - this._imap.once('end', () => { - console.log('Connection ended'); - }); - - this._imap.on('alert', (msg) => { - console.log(`IMAP SERVER SAYS: ${msg}`) - }) - - // Emitted when new mail arrives in the currently open mailbox. - // Fix https://github.com/mscdex/node-imap/issues/445 - let lastMailEventBox = null; - this._imap.on('mail', () => { - if (lastMailEventBox === this._imap._box.name) { - this.emit('mail'); - } - lastMailEventBox = this._imap._box.name - }); - - // Emitted if the UID validity value for the currently open mailbox - // changes during the current session. - this._imap.on('uidvalidity', () => this.emit('uidvalidity')) - - // Emitted when message metadata (e.g. flags) changes externally. - this._imap.on('update', () => this.emit('update')) - - this._imap.connect(); - } - - getIMAP() { - return this._imap; - } - - runOperation(operation) { - return new Promise((resolve, reject) => { - this._queue.push({operation, resolve, reject}); - if (this._imap.state === 'authenticated' && !this._current) { - this.processNextOperation(); - } - }); - } - - processNextOperation() { - if (this._current) { return; } - - this._current = this._queue.shift(); - - if (!this._current) { - this.emit('queue-empty'); - return; - } - - const {operation, resolve, reject} = this._current; - - console.log(`Starting task ${operation.description()}`) - const result = operation.run(this._db, this._imap); - if (result instanceof Promise === false) { - throw new Error(`Expected ${operation.constructor.name} to return promise.`); - } - result.catch((err) => { - this._current = null; - console.error(err); - reject(); - }) - .then(() => { - this._current = null; - console.log(`Finished task ${operation.description()}`) - resolve(); - }) - .finally(() => { - this.processNextOperation(); - }); - } -} +const SyncMailboxOperation = require('./imap/sync-mailbox-operation') class SyncWorker { constructor(account, db) { - const main = new IMAPConnectionStateMachine(db, { + const main = new IMAPConnection(db, { user: 'inboxapptest1@fastmail.fm', password: 'trar2e', host: 'mail.messagingengine.com', @@ -134,13 +24,13 @@ class SyncWorker { throw new Error("Unable to find an inbox category.") } main.on('mail', () => { - main.runOperation(new DiscoverMessagesOperation(inboxCategory)); + main.runOperation(new SyncMailboxOperation(inboxCategory)); }) main.on('update', () => { - main.runOperation(new ScanUIDsOperation(inboxCategory)); + main.runOperation(new SyncMailboxOperation(inboxCategory)); }) main.on('queue-empty', () => { - main.getIMAP().openBoxAsync(inboxCategory.name, true).then(() => { + main.openBox(inboxCategory.name, true).then(() => { console.log("Idling on inbox category"); }); }); @@ -162,8 +52,7 @@ class SyncWorker { return priority.indexOf(b.role) - priority.indexOf(a.role); }) for (const cat of sorted) { - this._main.runOperation(new DiscoverMessagesOperation(cat)); - this._main.runOperation(new ScanUIDsOperation(cat)); + this._main.runOperation(new SyncMailboxOperation(cat)); } }); }