diff --git a/packages/nylas-core/imap-box.js b/packages/nylas-core/imap-box.js new file mode 100644 index 000000000..ec9283749 --- /dev/null +++ b/packages/nylas-core/imap-box.js @@ -0,0 +1,156 @@ +const _ = require('underscore'); + +const { + IMAPConnectionNotReadyError, +} = require('./imap-errors'); + +class IMAPBox { + constructor(imapConn, box) { + this._conn = imapConn + this._box = box + + return new Proxy(this, { + get(target, name) { + const prop = Reflect.get(target, name) + if (!prop) { + return Reflect.get(target._box, name) + } + if (_.isFunction(prop) && target._conn._imap._box.name !== target._box.name) { + return () => Promise.reject( + new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`) + ) + } + return prop + }, + }) + } + + /** + * @param {array|string} range - can be a single message identifier, + * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), + * an array of message identifiers, or an array of message identifier ranges. + * @return {Observable} that will feed each message as it becomes ready + */ + fetchEach(range, options, forEachMessageCallback) { + if (!options) { + throw new Error("IMAPBox.fetch now requires an options object.") + } + if (range.length === 0) { + return Promise.resolve() + } + + return this._conn.createConnectionPromise((resolve, reject) => { + const f = this._conn._imap.fetch(range, options); + f.on('message', (imapMessage) => { + const parts = {}; + let headers = null; + let attributes = null; + imapMessage.on('attributes', (attrs) => { + attributes = attrs; + }); + imapMessage.on('body', (stream, info) => { + const chunks = []; + + stream.on('data', (chunk) => { + chunks.push(chunk); + }); + + stream.once('end', () => { + const full = Buffer.concat(chunks).toString('utf8'); + if (info.which === 'HEADER') { + headers = full; + } else { + parts[info.which] = full; + } + }); + }); + imapMessage.once('end', () => { + forEachMessageCallback({attributes, headers, parts}); + }); + }) + f.once('error', reject); + f.once('end', resolve); + }); + } + + /** + * @return {Promise} that resolves to requested message + */ + fetchMessage(uid) { + if (!uid) { + throw new Error("IMAPConnection.fetchMessage requires a message uid.") + } + return this.fetchEach([uid], { + bodies: ['HEADER', 'TEXT'], + }) + } + + fetchMessageStream(uid, options) { + if (!uid) { + throw new Error("IMAPConnection.fetchStream requires a message uid.") + } + if (!options) { + throw new Error("IMAPConnection.fetchStream requires an options object.") + } + return this._conn.createConnectionPromise((resolve, reject) => { + const f = this._conn._imap.fetch(uid, options); + f.on('message', (imapMessage) => { + imapMessage.on('body', (stream) => { + resolve(stream) + }) + }) + f.once('error', reject) + }) + } + + /** + * @param {array|string} range - can be a single message identifier, + * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), + * an array of message identifiers, or an array of message identifier ranges. + * @return {Promise} that resolves to a map of uid -> attributes for every + * message in the range + */ + fetchUIDAttributes(range) { + return this._conn.createConnectionPromise((resolve, reject) => { + const attributesByUID = {}; + const f = this._conn._imap.fetch(range, {}); + f.on('message', (msg) => { + msg.on('attributes', (attrs) => { + attributesByUID[attrs.uid] = attrs; + }) + }); + f.once('error', reject); + f.once('end', () => resolve(attributesByUID)); + }); + } + + addFlags(range, flags) { + if (!this._conn._imap) { + throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`) + } + return this._conn._imap.addFlagsAsync(range, flags) + } + + delFlags(range, flags) { + if (!this._conn._imap) { + throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`) + } + return this._conn._imap.delFlagsAsync(range, flags) + } + + moveFromBox(range, folderName) { + if (!this._conn._imap) { + throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`) + } + return this._conn._imap.moveAsync(range, folderName) + } + + closeBox({expunge = true} = {}) { + if (!this._conn._imap) { + throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`) + } + return this._conn._imap.closeBoxAsync(expunge) + } +} + +module.exports = IMAPBox; diff --git a/packages/nylas-core/imap-connection.js b/packages/nylas-core/imap-connection.js index bbbef526f..8c37acd28 100644 --- a/packages/nylas-core/imap-connection.js +++ b/packages/nylas-core/imap-connection.js @@ -1,163 +1,13 @@ -const Rx = require('rx') const Imap = require('imap'); const _ = require('underscore'); const xoauth2 = require('xoauth2'); const EventEmitter = require('events'); -class IMAPConnectionNotReadyError extends Error { - constructor(funcName) { - super(`${funcName} - You must call connect() first.`); - - // hack so that the error matches the ones used by node-imap - this.source = 'socket'; - } -} - -class IMAPBox { - - constructor(imapConn, box) { - this._imap = imapConn - this._box = box - return new Proxy(this, { - get(target, name) { - const prop = Reflect.get(target, name) - if (!prop) { - return Reflect.get(target._box, name) - } - if (_.isFunction(prop) && target._imap._box.name !== target._box.name) { - return () => Promise.reject( - new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`) - ) - } - return prop - }, - }) - } - - /** - * @param {array|string} range - can be a single message identifier, - * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), - * an array of message identifiers, or an array of message identifier ranges. - * @return {Observable} that will feed each message as it becomes ready - */ - fetch(range, options) { - if (!options) { - throw new Error("IMAPBox.fetch now requires an options object.") - } - if (range.length === 0) { - return Rx.Observable.empty() - } - return Rx.Observable.create((observer) => { - const f = this._imap.fetch(range, options); - f.on('message', (imapMessage) => { - const parts = {}; - let headers = null; - let attributes = null; - imapMessage.on('attributes', (attrs) => { - attributes = attrs; - }); - imapMessage.on('body', (stream, info) => { - const chunks = []; - - stream.on('data', (chunk) => { - chunks.push(chunk); - }); - - stream.once('end', () => { - const full = Buffer.concat(chunks).toString('utf8'); - if (info.which === 'HEADER') { - headers = full; - } else { - parts[info.which] = full; - } - }); - }); - imapMessage.once('end', () => { - observer.onNext({attributes, headers, parts}); - }); - }) - f.once('error', (error) => observer.onError(error)) - f.once('end', () => observer.onCompleted()) - }) - } - - fetchStream({uid, options}) { - if (!uid) { - throw new Error("IMAPConnection.fetchStream requires a message uid.") - } - if (!options) { - throw new Error("IMAPConnection.fetchStream requires an options object.") - } - return new Promise((resolve, reject) => { - const f = this._imap.fetch(uid, options); - f.on('message', (imapMessage) => { - imapMessage.on('body', (stream) => { - resolve(stream) - }) - }) - f.once('error', reject) - }) - } - - /** - * @return {Promise} that resolves to requested message - */ - fetchMessage(uid) { - return this.fetch([uid], { - bodies: ['HEADER', 'TEXT'], - }).toPromise() - } - - /** - * @param {array|string} range - can be a single message identifier, - * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), - * an array of message identifiers, or an array of message identifier ranges. - * @return {Promise} that resolves to a map of uid -> attributes for every - * message in the range - */ - fetchUIDAttributes(range) { - return new Promise((resolve, reject) => { - const attributesByUID = {}; - const f = this._imap.fetch(range, {}); - f.on('message', (msg) => { - msg.on('attributes', (attrs) => { - attributesByUID[attrs.uid] = attrs; - }) - }); - f.once('error', reject); - f.once('end', () => resolve(attributesByUID)); - }); - } - - addFlags(range, flags) { - if (!this._imap) { - throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`) - } - return this._imap.addFlagsAsync(range, flags) - } - - delFlags(range, flags) { - if (!this._imap) { - throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`) - } - return this._imap.delFlagsAsync(range, flags) - } - - moveFromBox(range, folderName) { - if (!this._imap) { - throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`) - } - return this._imap.moveAsync(range, folderName) - } - - closeBox({expunge = true} = {}) { - if (!this._imap) { - throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`) - } - return this._imap.closeBoxAsync(expunge) - } -} - +const IMAPBox = require('./imap-box'); +const { + IMAPConnectionNotReadyError, + IMAPConnectionEndedError, +} = require('./imap-errors'); const Capabilities = { Gmail: 'X-GM-EXT-1', @@ -195,8 +45,9 @@ class IMAPConnection extends EventEmitter { connect() { if (!this._connectPromise) { - this._connectPromise = this._resolveIMAPSettings() - .then((settings) => this._buildUnderlyingConnection(settings)) + this._connectPromise = this._resolveIMAPSettings().then((settings) => + this._buildUnderlyingConnection(settings) + ); } return this._connectPromise; } @@ -237,16 +88,6 @@ class IMAPConnection extends EventEmitter { return new Promise((resolve, reject) => { this._imap = Promise.promisifyAll(new Imap(settings)); - this._imap.once('end', () => { - this._logger.info('Underlying IMAP Connection ended'); - this._connectPromise = null; - this._imap = null; - }); - - this._imap.on('alert', (msg) => { - this._logger.info({imap_server_msg: msg}, `IMAP server message`) - }) - // Emitted when new mail arrives in the currently open mailbox. // Fix https://github.com/mscdex/node-imap/issues/445 let lastMailEventBox = null; @@ -263,8 +104,26 @@ class IMAPConnection extends EventEmitter { // Emitted when message metadata (e.g. flags) changes externally. this._imap.on('update', () => this.emit('update')) - this._imap.once('ready', () => resolve(this)); - this._imap.once('error', reject); + + this._imap.once('ready', () => { + resolve(this) + }); + + this._imap.once('error', (err) => { + this.end(); + reject(err); + }); + + this._imap.once('end', () => { + this._logger.info('Underlying IMAP Connection ended'); + this._connectPromise = null; + this._imap = null; + }); + + this._imap.on('alert', (msg) => { + this._logger.info({imap_server_msg: msg}, `IMAP server message`) + }); + this._imap.connect(); }); } @@ -293,7 +152,7 @@ class IMAPConnection extends EventEmitter { throw new IMAPConnectionNotReadyError(`IMAPConnection::openBox`) } return this._imap.openBoxAsync(folderName, readOnly).then((box) => - new IMAPBox(this._imap, box) + new IMAPBox(this, box) ) } @@ -337,6 +196,45 @@ class IMAPConnection extends EventEmitter { }); } + /* + Equivalent to new Promise, but allows you to easily create promises + which are also rejected when the IMAP connection is closed or ends. + This is important because node-imap sometimes just hangs the current + fetch / action forever after emitting an `end` event. + */ + createConnectionPromise(callback) { + if (!this._imap) { + throw new IMAPConnectionNotReadyError(`IMAPConnection::createConnectionPromise`) + } + + let onEnded = null; + let onErrored = null; + + return new Promise((resolve, reject) => { + let returned = false; + onEnded = () => { + returned = true; + reject(new IMAPConnectionEndedError()); + }; + onErrored = (error) => { + returned = true; + reject(error || new Error("Unspecified IMAP error.")); + }; + + this._imap.once('error', onEnded); + this._imap.once('end', onErrored); + + const cresolve = (...args) => (!returned ? resolve(...args) : null) + const creject = (...args) => (!returned ? reject(...args) : null) + return callback(cresolve, creject) + }).finally(() => { + if (this._imap) { + this._imap.removeListener('error', onEnded); + this._imap.removeListener('end', onErrored); + } + }); + } + processNextOperation() { if (this._currentOperation) { return; diff --git a/packages/nylas-core/imap-errors.js b/packages/nylas-core/imap-errors.js new file mode 100644 index 000000000..3161deff5 --- /dev/null +++ b/packages/nylas-core/imap-errors.js @@ -0,0 +1,21 @@ + +// "Source" is a hack so that the error matches the ones used by node-imap + +class IMAPConnectionNotReadyError extends Error { + constructor(funcName) { + super(`${funcName} - You must call connect() first.`); + this.source = 'socket'; + } +} + +class IMAPConnectionEndedError extends Error { + constructor(msg = "The IMAP Connection was ended.") { + super(msg); + this.source = 'socket'; + } +} + +module.exports = { + IMAPConnectionNotReadyError, + IMAPConnectionEndedError, +}; diff --git a/packages/nylas-core/models/account/file.js b/packages/nylas-core/models/account/file.js index 655e8b13d..ba52ed4d2 100644 --- a/packages/nylas-core/models/account/file.js +++ b/packages/nylas-core/models/account/file.js @@ -24,12 +24,9 @@ module.exports = (sequelize, Sequelize) => { .then(({message, connection}) => { return message.getFolder() .then((folder) => connection.openBox(folder.name)) - .then((imapBox) => imapBox.fetchStream({ - uid: message.folderImapUID, - options: { - bodies: [this.partId], - struct: true, - }, + .then((imapBox) => imapBox.fetchMessageStream(message.folderImapUID, { + bodies: [this.partId], + struct: true, })) .then((stream) => { if (stream) { diff --git a/packages/nylas-sync/imap/fetch-messages-in-folder.js b/packages/nylas-sync/imap/fetch-messages-in-folder.js index 038dde66d..9ce3fe108 100644 --- a/packages/nylas-sync/imap/fetch-messages-in-folder.js +++ b/packages/nylas-sync/imap/fetch-messages-in-folder.js @@ -166,8 +166,7 @@ class FetchMessagesInFolder { _fetchMessagesAndQueueForProcessing(range) { const uidsByPart = {}; - const $structs = this._box.fetch(range, {struct: true}) - $structs.subscribe(({attributes}) => { + return this._box.fetchEach(range, {struct: true}, ({attributes}) => { const desiredParts = this._getDesiredMIMEParts(attributes.struct); if (desiredParts.length === 0) { return; @@ -175,13 +174,13 @@ class FetchMessagesInFolder { const key = JSON.stringify(desiredParts); uidsByPart[key] = uidsByPart[key] || []; uidsByPart[key].push(attributes.uid); - }); - - return $structs.toPromise().then(() => { + }) + .then(() => { return Promise.each(Object.keys(uidsByPart), (key) => { const uids = uidsByPart[key]; const desiredParts = JSON.parse(key); const bodies = ['HEADER'].concat(desiredParts.map(p => p.id)); + this._logger.info({ key, num_messages: uids.length, @@ -190,17 +189,14 @@ class FetchMessagesInFolder { // note: the order of UIDs in the array doesn't matter, Gmail always // returns them in ascending (oldest => newest) order. - const $body = this._box.fetch(uids, {bodies, struct: true}) - $body.subscribe((msg) => { + return this._box.fetchEach(uids, {bodies, struct: true}, (msg) => { msg.body = {}; for (const {id, mimetype} of desiredParts) { msg.body[mimetype] = msg.parts[id]; } this._processMessage(msg); }); - - return $body.toPromise(); - }) + }); }); }