const Rx = require('rx') const Imap = require('imap'); const _ = require('underscore'); const xoauth2 = require('xoauth2'); const EventEmitter = require('events'); class IMAPConnectionNotReadyError extends Error { constructor(funcName) { super(`${funcName} - You must call connect() first.`); // hack so that the error matches the ones used by node-imap this.source = 'socket'; } } class IMAPBox { constructor(imapConn, box) { this._imap = imapConn this._box = box return new Proxy(this, { get(target, name) { const prop = Reflect.get(target, name) if (!prop) { return Reflect.get(target._box, name) } if (_.isFunction(prop) && target._imap._box.name !== target._box.name) { return () => Promise.reject( new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`) ) } return prop }, }) } /** * @param {array|string} range - can be a single message identifier, * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), * an array of message identifiers, or an array of message identifier ranges. * @return {Observable} that will feed each message as it becomes ready */ fetch(range, options) { if (!options) { throw new Error("IMAPBox.fetch now requires an options object.") } if (range.length === 0) { return Rx.Observable.empty() } return Rx.Observable.create((observer) => { const f = this._imap.fetch(range, options); f.on('message', (imapMessage) => { const parts = {}; let headers = null; let attributes = null; imapMessage.on('attributes', (attrs) => { attributes = attrs; }); imapMessage.on('body', (stream, info) => { const chunks = []; stream.on('data', (chunk) => { chunks.push(chunk); }); stream.once('end', () => { const full = Buffer.concat(chunks).toString('utf8'); if (info.which === 'HEADER') { headers = full; } else { parts[info.which] = full; } }); }); imapMessage.once('end', () => { observer.onNext({attributes, headers, parts}); }); }) f.once('error', (error) => observer.onError(error)) f.once('end', () => observer.onCompleted()) }) } fetchStream({uid, options}) { if (!uid) { throw new Error("IMAPConnection.fetchStream requires a message uid.") } if (!options) { throw new Error("IMAPConnection.fetchStream requires an options object.") } return new Promise((resolve, reject) => { const f = this._imap.fetch(uid, options); f.on('message', (imapMessage) => { imapMessage.on('body', (stream) => { resolve(stream) }) }) f.once('error', reject) }) } /** * @return {Promise} that resolves to requested message */ fetchMessage(uid) { return this.fetch([uid], { bodies: ['HEADER', 'TEXT'], }).toPromise() } /** * @param {array|string} range - can be a single message identifier, * a message identifier range (e.g. '2504:2507' or '*' or '2504:*'), * an array of message identifiers, or an array of message identifier ranges. * @return {Promise} that resolves to a map of uid -> attributes for every * message in the range */ fetchUIDAttributes(range) { return new Promise((resolve, reject) => { const attributesByUID = {}; const f = this._imap.fetch(range, {}); f.on('message', (msg) => { msg.on('attributes', (attrs) => { attributesByUID[attrs.uid] = attrs; }) }); f.once('error', reject); f.once('end', () => resolve(attributesByUID)); }); } addFlags(range, flags) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`) } return this._imap.addFlagsAsync(range, flags) } delFlags(range, flags) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`) } return this._imap.delFlagsAsync(range, flags) } moveFromBox(range, folderName) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`) } return this._imap.moveAsync(range, folderName) } closeBox({expunge = true} = {}) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`) } return this._imap.closeBoxAsync(expunge) } } const Capabilities = { Gmail: 'X-GM-EXT-1', Quota: 'QUOTA', UIDPlus: 'UIDPLUS', Condstore: 'CONDSTORE', Search: 'ESEARCH', Sort: 'SORT', } class IMAPConnection extends EventEmitter { static connect(...args) { return new IMAPConnection(...args).connect() } constructor({db, settings, logger} = {}) { super(); if (!(settings instanceof Object)) { throw new Error("IMAPConnection: Must be instantiated with `settings`") } if (!logger) { throw new Error("IMAPConnection: Must be instantiated with `logger`") } this._logger = logger; this._db = db; this._queue = []; this._currentOperation = null; this._settings = settings; this._imap = null; this._connectPromise = null; } connect() { if (!this._connectPromise) { this._connectPromise = this._resolveIMAPSettings() .then((settings) => this._buildUnderlyingConnection(settings)) } return this._connectPromise; } _resolveIMAPSettings() { const result = { host: this._settings.imap_host, port: this._settings.imap_port, user: this._settings.imap_username, password: this._settings.imap_password, tls: this._settings.ssl_required, } if (this._settings.refresh_token) { const xoauthFields = ['client_id', 'client_secret', 'imap_username', 'refresh_token']; if (Object.keys(_.pick(this._settings, xoauthFields)).length !== 4) { return Promise.reject(new Error(`IMAPConnection: Expected ${xoauthFields.join(',')} when given refresh_token`)) } return new Promise((resolve, reject) => { xoauth2.createXOAuth2Generator({ clientId: this._settings.client_id, clientSecret: this._settings.client_secret, user: this._settings.imap_username, refreshToken: this._settings.refresh_token, }).getToken((err, token) => { if (err) { return reject(err) } delete result.password; result.xoauth2 = token; return resolve(result); }); }); } return Promise.resolve(result); } _buildUnderlyingConnection(settings) { return new Promise((resolve, reject) => { this._imap = Promise.promisifyAll(new Imap(settings)); this._imap.once('end', () => { this._logger.info('Underlying IMAP Connection ended'); this._connectPromise = null; this._imap = null; }); this._imap.on('alert', (msg) => { this._logger.info({imap_server_msg: msg}, `IMAP server message`) }) // Emitted when new mail arrives in the currently open mailbox. // Fix https://github.com/mscdex/node-imap/issues/445 let lastMailEventBox = null; this._imap.on('mail', () => { if (lastMailEventBox === this._imap._box.name) { this.emit('mail'); } lastMailEventBox = this._imap._box.name }); // Emitted if the UID validity value for the currently open mailbox // changes during the current session. this._imap.on('uidvalidity', () => this.emit('uidvalidity')) // Emitted when message metadata (e.g. flags) changes externally. this._imap.on('update', () => this.emit('update')) this._imap.once('ready', () => resolve(this)); this._imap.once('error', reject); this._imap.connect(); }); } end() { if (this._imap) { this._imap.end(); this._imap = null; } this._queue = []; this._connectPromise = null; } serverSupports(capability) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::serverSupports`) } this._imap.serverSupports(capability); } /** * @return {Promise} that resolves to instance of IMAPBox */ openBox(folderName, {readOnly = false} = {}) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::openBox`) } return this._imap.openBoxAsync(folderName, readOnly).then((box) => new IMAPBox(this._imap, box) ) } getBoxes() { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::getBoxes`) } return this._imap.getBoxesAsync() } addBox(folderName) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::addBox`) } return this._imap.addBoxAsync(folderName) } renameBox(oldFolderName, newFolderName) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::renameBox`) } return this._imap.renameBoxAsync(oldFolderName, newFolderName) } delBox(folderName) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::delBox`) } return this._imap.delBoxAsync(folderName) } runOperation(operation) { if (!this._imap) { throw new IMAPConnectionNotReadyError(`IMAPConnection::runOperation`) } return new Promise((resolve, reject) => { this._queue.push({operation, resolve, reject}); if (this._imap.state === 'authenticated' && !this._currentOperation) { this.processNextOperation(); } }); } processNextOperation() { if (this._currentOperation) { return; } this._currentOperation = this._queue.shift(); if (!this._currentOperation) { this.emit('queue-empty'); return; } const {operation, resolve, reject} = this._currentOperation; const result = operation.run(this._db, this); if (result instanceof Promise === false) { reject(new Error(`Expected ${operation.constructor.name} to return promise.`)) } result.then(() => { this._currentOperation = null; this._logger.info({ operation_type: operation.constructor.name, operation_description: operation.description(), }, `Finished sync operation`) resolve(); this.processNextOperation(); }) .catch((err) => { this._currentOperation = null; this._logger.error({ err, operation_type: operation.constructor.name, operation_description: operation.description(), }, `Sync operation errored`) reject(err); }) } } IMAPConnection.Capabilities = Capabilities; module.exports = IMAPConnection