[iso-core] (imap-P3) Fixup IMAPConnection and IMAPBox

Summary:
This commit is an attempt to cleanup duplicated code and crufty code
inside IMAPConnection and IMAPBox

Specifically:
- It replaces `_createConnectionPromise` with a more aptly named (imo) `_withPreparedConnection` helper, which provides the user a node-imap connection that will correctly time out and handle `error` and `end` events. Most of these changes are just changing existing code to use the new interface.
- Adds a subtle change to how we handle `end` and `error` events on the connections. Previously, we manually called `this.end()` on `error`, but not on `end`. From what I could gather from the old comment documenting `_createConnectionPromise`, we should /also/ call `this.end()` on `end` because node-imap doesn't clean up correctly and can leave the connection hanging (taking care not to introduce a recursion loop by `end`ing on `end`). Additionally, it no longer listens to the events via `once` but via `on`, which should be okay given that the listeners get cleared at the end.

This /might/ fix some instances of sync freezing up (T7837).

Depends on D4035

Test Plan: manual -- this really needs some unit tests 😢

Reviewers: spang, halla, mark, evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D4036
This commit is contained in:
Juan Tejada 2017-02-23 01:34:26 -08:00
parent 18095cac3e
commit 8ade5d6486
2 changed files with 208 additions and 258 deletions

View file

@ -34,6 +34,10 @@ class IMAPBox {
})
}
_withPreparedConnection(cb) {
return this._conn._withPreparedConnection(cb)
}
/**
* @param {array|string} range - can be a single message identifier,
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
@ -43,7 +47,7 @@ class IMAPBox {
* message as it comes in
* @return {Promise} that will feed each message as it becomes ready
*/
fetchEach(range, options, forEachMessageCallback) {
async fetchEach(range, options, forEachMessageCallback) {
if (!options) {
throw new Error("IMAPBox.fetch now requires an options object.")
}
@ -51,75 +55,76 @@ class IMAPBox {
return Promise.resolve()
}
return this._conn._createConnectionPromise((resolve, reject) => {
const f = this._conn._imap.fetch(range, options);
f.on('message', (imapMessage) => {
const parts = {};
let headers = null;
let attributes = null;
imapMessage.on('attributes', (attrs) => {
attributes = attrs;
});
imapMessage.on('body', (stream, info) => {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
return this._withPreparedConnection((imap) => {
return new Promise((resolve, reject) => {
const f = imap.fetch(range, options);
f.on('message', (imapMessage) => {
const parts = {};
let headers = null;
let attributes = null;
imapMessage.on('attributes', (attrs) => {
attributes = attrs;
});
imapMessage.on('body', (stream, info) => {
const chunks = [];
stream.once('end', () => {
const full = Buffer.concat(chunks);
if (info.which === 'HEADER') {
headers = full;
} else {
parts[info.which] = full;
}
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.once('end', () => {
const full = Buffer.concat(chunks);
if (info.which === 'HEADER') {
headers = full;
} else {
parts[info.which] = full;
}
});
});
});
imapMessage.once('end', () => {
// attributes is an object containing ascii strings, but parts and
// headers are undecoded binary Buffers (since the data for mime
// parts cannot be decoded to strings without looking up charset data
// in metadata, and this function's job is only to fetch the raw data)
forEachMessageCallback({attributes, headers, parts});
});
imapMessage.once('end', () => {
// attributes is an object containing ascii strings, but parts and
// headers are undecoded binary Buffers (since the data for mime
// parts cannot be decoded to strings without looking up charset data
// in metadata, and this function's job is only to fetch the raw data)
forEachMessageCallback({attributes, headers, parts});
});
})
f.once('error', reject);
f.once('end', resolve);
})
f.once('error', reject);
f.once('end', resolve);
});
}
/**
* @return {Promise} that resolves to requested message
*/
fetchMessage(uid) {
async fetchMessage(uid) {
if (!uid) {
throw new Error("IMAPConnection.fetchMessage requires a message uid.")
}
return new Promise((resolve, reject) => {
let message;
this.fetchEach([uid], {bodies: ['HEADER', 'TEXT']}, (msg) => { message = msg; })
.then(() => resolve(message))
.catch((err) => reject(err))
})
let message;
await this.fetchEach([uid], {bodies: ['HEADER', 'TEXT']}, (msg) => { message = msg; })
return message
}
fetchMessageStream(uid, {fetchOptions, onFetchComplete} = {}) {
async fetchMessageStream(uid, {fetchOptions, onFetchComplete} = {}) {
if (!uid) {
throw new Error("IMAPConnection.fetchStream requires a message uid.")
}
if (!fetchOptions) {
throw new Error("IMAPConnection.fetchStream requires an options object.")
}
return this._conn._createConnectionPromise((resolve, reject) => {
const f = this._conn._imap.fetch(uid, fetchOptions);
f.on('message', (imapMessage) => {
imapMessage.on('body', (stream) => {
resolve(stream)
return this.__withPreparedConnection((imap) => {
return new Promise((resolve, reject) => {
const f = imap.fetch(uid, fetchOptions);
f.on('message', (imapMessage) => {
imapMessage.on('body', (stream) => {
resolve(stream)
})
})
f.once('error', reject)
f.once('end', onFetchComplete || (() => {}));
})
f.once('error', reject)
f.once('end', onFetchComplete || (() => {}));
})
}
@ -130,17 +135,19 @@ class IMAPBox {
* @return {Promise} that resolves to a map of uid -> attributes for every
* message in the range
*/
fetchUIDAttributes(range, fetchOptions = {}) {
return this._conn._createConnectionPromise((resolve, reject) => {
const attributesByUID = {};
const f = this._conn._imap.fetch(range, fetchOptions);
f.on('message', (msg) => {
msg.on('attributes', (attrs) => {
attributesByUID[attrs.uid] = attrs;
})
});
f.once('error', reject);
f.once('end', () => resolve(attributesByUID));
async fetchUIDAttributes(range, fetchOptions = {}) {
return this._withPreparedConnection((imap) => {
return new Promise((resolve, reject) => {
const attributesByUID = {};
const f = imap.fetch(range, fetchOptions);
f.on('message', (msg) => {
msg.on('attributes', (attrs) => {
attributesByUID[attrs.uid] = attrs;
})
});
f.once('error', reject);
f.once('end', () => resolve(attributesByUID));
})
});
}
@ -149,88 +156,56 @@ class IMAPBox {
throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.addFlagsAsync(range, flags)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.addFlagsAsync(range, flags))
}
delFlags(range, flags) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.delFlagsAsync(range, flags)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.delFlagsAsync(range, flags))
}
moveFromBox(range, folderName) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.moveAsync(range, folderName)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.moveAsync(range, folderName))
}
setLabels(range, labels) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.setLabelsAsync(range, labels)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.setLabelsAsync(range, labels))
}
removeLabels(range, labels) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.delLabelsAsync(range, labels)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.delLabelsAsync(range, labels))
}
append(rawMime, options) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::append`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.appendAsync(rawMime, options)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.appendAsync(rawMime, options))
}
search(criteria) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::search`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.searchAsync(criteria)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.searchAsync(criteria))
}
closeBox({expunge = true} = {}) {
if (!this._conn._imap) {
throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`)
}
return this._conn._createConnectionPromise((resolve, reject) => {
return this._conn._imap.closeBoxAsync(expunge)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.closeBoxAsync(expunge))
}
}

View file

@ -123,18 +123,6 @@ class IMAPConnection extends EventEmitter {
this._isOpeningBox = false;
}
get account() {
return this._account
}
get logger() {
return this._logger
}
get resolvedSettings() {
return this._resolvedSettings
}
async connect() {
if (!this._connectPromise) {
this._connectPromise = new Promise(async (resolve, reject) => {
@ -150,60 +138,6 @@ class IMAPConnection extends EventEmitter {
return this._connectPromise;
}
async _buildUnderlyingConnection() {
return new Promise((resolve, reject) => {
this._imap = PromiseUtils.promisifyAll(new Imap(this._resolvedSettings));
const socketTimeout = setTimeout(() => {
reject(new IMAPConnectionTimeoutError('Socket timed out'))
}, this._resolvedSettings.socketTimeout)
// Emitted when new mail arrives in the currently open mailbox.
let lastMailEventBox = null;
this._imap.on('mail', () => {
// Fix https://github.com/mscdex/node-imap/issues/585
if (this._isOpeningBox) { return }
if (!this._imap) { return }
if (lastMailEventBox === null || lastMailEventBox === this._imap._box.name) {
// Fix https://github.com/mscdex/node-imap/issues/445
this.emit('mail');
}
lastMailEventBox = this._imap._box.name
});
// Emitted if the UID validity value for the currently open mailbox
// changes during the current session.
this._imap.on('uidvalidity', () => this.emit('uidvalidity'))
// Emitted when message metadata (e.g. flags) changes externally.
this._imap.on('update', () => this.emit('update'))
this._imap.once('ready', () => {
clearTimeout(socketTimeout)
resolve(this)
});
this._imap.once('error', (err) => {
clearTimeout(socketTimeout)
this.end();
reject(convertImapError(err));
});
this._imap.once('end', () => {
clearTimeout(socketTimeout)
this._logger.debug('Underlying IMAP Connection ended');
this._connectPromise = null;
this._imap = null;
});
this._imap.on('alert', (msg) => {
this._logger.info({imap_server_msg: msg}, `IMAP server message`)
});
this._imap.connect();
});
}
end() {
if (this._imap) {
this._imap.end();
@ -213,6 +147,125 @@ class IMAPConnection extends EventEmitter {
this._connectPromise = null;
}
async _buildUnderlyingConnection() {
this._imap = PromiseUtils.promisifyAll(new Imap(this._resolvedSettings));
return this._withPreparedConnection(() => {
return new Promise((resolve) => {
// `mail` event is emitted when new mail arrives in the currently open mailbox.
let lastMailEventBox = null;
this._imap.on('mail', () => {
// Fix https://github.com/mscdex/node-imap/issues/585
if (this._isOpeningBox) { return }
if (!this._imap) { return }
if (lastMailEventBox === null || lastMailEventBox === this._imap._box.name) {
// Fix https://github.com/mscdex/node-imap/issues/445
this.emit('mail');
}
lastMailEventBox = this._imap._box.name
});
// Emitted if the UID validity value for the currently open mailbox
// changes during the current session.
this._imap.on('uidvalidity', () => this.emit('uidvalidity'))
// Emitted when message metadata (e.g. flags) changes externally.
this._imap.on('update', () => this.emit('update'))
this._imap.on('alert', (msg) => {
this._logger.info({imap_server_msg: msg}, `IMAP server message`)
});
this._imap.once('ready', () => {
resolve()
});
this._imap.once('error', () => {
this.end();
});
this._imap.once('end', () => {
this._logger.warn('Underlying IMAP connection has ended')
this.end();
});
this._imap.connect();
});
})
}
/**
* @return {Promise} that resolves/rejects when the Promise returned by the
* passed-in callback resolves or rejects, and additionally will reject when
* the IMAP connection closes, ends or times out.
* This is important for 2 main reasons:
* - node-imap can sometimes hang the current operation after the connection
* has emmitted an `end` event. For this reason, we need to manually reject
* and end() on `end` event.
* - node-imap does not seem to respect the socketTimeout setting, so it won't
* actually time out an operation after the specified timeout has passed.
* For this reason, we have to manually reject when the timeout period has
* passed.
* @param {function} callback - This callback will receive as a single arg
* a node-imap connection instance, and should return a Promise.
*/
async _withPreparedConnection(callback) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::_withPreparedConnection`)
}
let onEnded = null;
let onErrored = null;
try {
return await new Promise(async (resolve, reject) => {
const socketTimeout = setTimeout(() => {
reject(new IMAPConnectionTimeoutError('Socket timed out'))
}, this._resolvedSettings.socketTimeout)
const wrappedResolve = (result) => {
clearTimeout(socketTimeout)
resolve(result)
}
const wrappedReject = (error) => {
clearTimeout(socketTimeout)
const convertedError = convertImapError(error)
reject(convertedError)
this.end()
}
onEnded = () => {
wrappedReject(new IMAPConnectionEndedError())
};
onErrored = (error) => {
wrappedReject(error);
};
this._imap.on('error', onErrored);
this._imap.on('end', onEnded);
try {
const result = await callback(this._imap)
wrappedResolve(result)
} catch (error) {
wrappedReject(error)
}
})
} finally {
if (this._imap) {
this._imap.removeListener('error', onErrored);
this._imap.removeListener('end', onEnded);
}
}
}
getResolvedSettings() {
return this._resolvedSettings
}
getOpenBoxName() {
return (this._imap && this._imap._box) ? this._imap._box.name : null;
}
serverSupports(capability) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::serverSupports`)
@ -223,7 +276,7 @@ class IMAPConnection extends EventEmitter {
/**
* @return {Promise} that resolves to instance of IMAPBox
*/
openBox(folderName, {readOnly = false, refetchBoxInfo = false} = {}) {
async openBox(folderName, {readOnly = false, refetchBoxInfo = false} = {}) {
if (!folderName) {
throw new Error('IMAPConnection::openBox - You must provide a folder name')
}
@ -234,17 +287,14 @@ class IMAPConnection extends EventEmitter {
return Promise.resolve(new IMAPBox(this, this._imap._box));
}
this._isOpeningBox = true
return this._createConnectionPromise((resolve, reject) => {
return this._imap.openBoxAsync(folderName, readOnly)
.then((box) => {
this._isOpeningBox = false
resolve(new IMAPBox(this, box))
})
.catch((...args) => reject(...args))
return this._withPreparedConnection(async (imap) => {
const box = await imap.openBoxAsync(folderName, readOnly)
this._isOpeningBox = false
return new IMAPBox(this, box)
})
}
getLatestBoxStatus(folderName) {
async getLatestBoxStatus(folderName) {
if (!folderName) {
throw new Error('IMAPConnection::getLatestBoxStatus - You must provide a folder name')
}
@ -253,62 +303,38 @@ class IMAPConnection extends EventEmitter {
// get the latest stats from the box (e.g. latest uidnext, etc)
return this.openBox(folderName, {refetchBoxInfo: true})
}
return this._createConnectionPromise((resolve, reject) => {
return this._imap.statusAsync(folderName)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.statusAsync(folderName))
}
getBoxes() {
async getBoxes() {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::getBoxes`)
}
return this._createConnectionPromise((resolve, reject) => {
return this._imap.getBoxesAsync()
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.getBoxesAsync())
}
addBox(folderName) {
async addBox(folderName) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::addBox`)
}
return this._createConnectionPromise((resolve, reject) => {
return this._imap.addBoxAsync(folderName)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.addBoxAsync(folderName))
}
renameBox(oldFolderName, newFolderName) {
async renameBox(oldFolderName, newFolderName) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::renameBox`)
}
return this._createConnectionPromise((resolve, reject) => {
return this._imap.renameBoxAsync(oldFolderName, newFolderName)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.renameBoxAsync(oldFolderName, newFolderName))
}
delBox(folderName) {
async delBox(folderName) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::delBox`)
}
return this._createConnectionPromise((resolve, reject) => {
return this._imap.delBoxAsync(folderName)
.then((...args) => resolve(...args))
.catch((...args) => reject(...args))
})
return this._withPreparedConnection((imap) => imap.delBoxAsync(folderName))
}
getOpenBoxName() {
return (this._imap && this._imap._box) ? this._imap._box.name : null;
}
runOperation(operation, ctx) {
async runOperation(operation, ctx) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::runOperation`)
}
@ -320,57 +346,6 @@ class IMAPConnection extends EventEmitter {
});
}
/*
Equivalent to new Promise, but allows you to easily create promises
which are also rejected when the IMAP connection closes, ends or times out.
This is important because node-imap sometimes just hangs the current
fetch / action forever after emitting an `end` event, or doesn't actually
timeout the socket.
*/
_createConnectionPromise(callback) {
if (!this._imap) {
throw new IMAPConnectionNotReadyError(`IMAPConnection::_createConnectionPromise`)
}
let onEnded = null;
let onErrored = null;
return new Promise((resolve, reject) => {
const socketTimeout = setTimeout(() => {
reject(new IMAPConnectionTimeoutError('Socket timed out'))
}, this._resolvedSettings.socketTimeout)
onEnded = () => {
clearTimeout(socketTimeout)
reject(new IMAPConnectionEndedError());
};
onErrored = (error) => {
clearTimeout(socketTimeout)
this.end()
reject(convertImapError(error));
};
this._imap.once('error', onErrored);
this._imap.once('end', onEnded);
const cbResolve = (...args) => {
clearTimeout(socketTimeout)
resolve(...args)
}
const cbReject = (error) => {
clearTimeout(socketTimeout)
reject(convertImapError(error))
}
return callback(cbResolve, cbReject)
})
.finally(() => {
if (this._imap) {
this._imap.removeListener('error', onErrored);
this._imap.removeListener('end', onEnded);
}
});
}
_processNextOperation() {
if (this._currentOperation) {
return;