mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-03-01 18:44:01 +08:00
Remove use of Rx.toPromise(), which wasn't behaving as expected
This commit is contained in:
parent
b44272621d
commit
ac627f1580
5 changed files with 254 additions and 186 deletions
156
packages/nylas-core/imap-box.js
Normal file
156
packages/nylas-core/imap-box.js
Normal file
|
@ -0,0 +1,156 @@
|
|||
const _ = require('underscore');
|
||||
|
||||
const {
|
||||
IMAPConnectionNotReadyError,
|
||||
} = require('./imap-errors');
|
||||
|
||||
class IMAPBox {
|
||||
constructor(imapConn, box) {
|
||||
this._conn = imapConn
|
||||
this._box = box
|
||||
|
||||
return new Proxy(this, {
|
||||
get(target, name) {
|
||||
const prop = Reflect.get(target, name)
|
||||
if (!prop) {
|
||||
return Reflect.get(target._box, name)
|
||||
}
|
||||
if (_.isFunction(prop) && target._conn._imap._box.name !== target._box.name) {
|
||||
return () => Promise.reject(
|
||||
new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`)
|
||||
)
|
||||
}
|
||||
return prop
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Observable} that will feed each message as it becomes ready
|
||||
*/
|
||||
fetchEach(range, options, forEachMessageCallback) {
|
||||
if (!options) {
|
||||
throw new Error("IMAPBox.fetch now requires an options object.")
|
||||
}
|
||||
if (range.length === 0) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
return this._conn.createConnectionPromise((resolve, reject) => {
|
||||
const f = this._conn._imap.fetch(range, options);
|
||||
f.on('message', (imapMessage) => {
|
||||
const parts = {};
|
||||
let headers = null;
|
||||
let attributes = null;
|
||||
imapMessage.on('attributes', (attrs) => {
|
||||
attributes = attrs;
|
||||
});
|
||||
imapMessage.on('body', (stream, info) => {
|
||||
const chunks = [];
|
||||
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
stream.once('end', () => {
|
||||
const full = Buffer.concat(chunks).toString('utf8');
|
||||
if (info.which === 'HEADER') {
|
||||
headers = full;
|
||||
} else {
|
||||
parts[info.which] = full;
|
||||
}
|
||||
});
|
||||
});
|
||||
imapMessage.once('end', () => {
|
||||
forEachMessageCallback({attributes, headers, parts});
|
||||
});
|
||||
})
|
||||
f.once('error', reject);
|
||||
f.once('end', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {Promise} that resolves to requested message
|
||||
*/
|
||||
fetchMessage(uid) {
|
||||
if (!uid) {
|
||||
throw new Error("IMAPConnection.fetchMessage requires a message uid.")
|
||||
}
|
||||
return this.fetchEach([uid], {
|
||||
bodies: ['HEADER', 'TEXT'],
|
||||
})
|
||||
}
|
||||
|
||||
fetchMessageStream(uid, options) {
|
||||
if (!uid) {
|
||||
throw new Error("IMAPConnection.fetchStream requires a message uid.")
|
||||
}
|
||||
if (!options) {
|
||||
throw new Error("IMAPConnection.fetchStream requires an options object.")
|
||||
}
|
||||
return this._conn.createConnectionPromise((resolve, reject) => {
|
||||
const f = this._conn._imap.fetch(uid, options);
|
||||
f.on('message', (imapMessage) => {
|
||||
imapMessage.on('body', (stream) => {
|
||||
resolve(stream)
|
||||
})
|
||||
})
|
||||
f.once('error', reject)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Promise} that resolves to a map of uid -> attributes for every
|
||||
* message in the range
|
||||
*/
|
||||
fetchUIDAttributes(range) {
|
||||
return this._conn.createConnectionPromise((resolve, reject) => {
|
||||
const attributesByUID = {};
|
||||
const f = this._conn._imap.fetch(range, {});
|
||||
f.on('message', (msg) => {
|
||||
msg.on('attributes', (attrs) => {
|
||||
attributesByUID[attrs.uid] = attrs;
|
||||
})
|
||||
});
|
||||
f.once('error', reject);
|
||||
f.once('end', () => resolve(attributesByUID));
|
||||
});
|
||||
}
|
||||
|
||||
addFlags(range, flags) {
|
||||
if (!this._conn._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`)
|
||||
}
|
||||
return this._conn._imap.addFlagsAsync(range, flags)
|
||||
}
|
||||
|
||||
delFlags(range, flags) {
|
||||
if (!this._conn._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`)
|
||||
}
|
||||
return this._conn._imap.delFlagsAsync(range, flags)
|
||||
}
|
||||
|
||||
moveFromBox(range, folderName) {
|
||||
if (!this._conn._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
|
||||
}
|
||||
return this._conn._imap.moveAsync(range, folderName)
|
||||
}
|
||||
|
||||
closeBox({expunge = true} = {}) {
|
||||
if (!this._conn._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`)
|
||||
}
|
||||
return this._conn._imap.closeBoxAsync(expunge)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = IMAPBox;
|
|
@ -1,163 +1,13 @@
|
|||
const Rx = require('rx')
|
||||
const Imap = require('imap');
|
||||
const _ = require('underscore');
|
||||
const xoauth2 = require('xoauth2');
|
||||
const EventEmitter = require('events');
|
||||
|
||||
class IMAPConnectionNotReadyError extends Error {
|
||||
constructor(funcName) {
|
||||
super(`${funcName} - You must call connect() first.`);
|
||||
|
||||
// hack so that the error matches the ones used by node-imap
|
||||
this.source = 'socket';
|
||||
}
|
||||
}
|
||||
|
||||
class IMAPBox {
|
||||
|
||||
constructor(imapConn, box) {
|
||||
this._imap = imapConn
|
||||
this._box = box
|
||||
return new Proxy(this, {
|
||||
get(target, name) {
|
||||
const prop = Reflect.get(target, name)
|
||||
if (!prop) {
|
||||
return Reflect.get(target._box, name)
|
||||
}
|
||||
if (_.isFunction(prop) && target._imap._box.name !== target._box.name) {
|
||||
return () => Promise.reject(
|
||||
new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`)
|
||||
)
|
||||
}
|
||||
return prop
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Observable} that will feed each message as it becomes ready
|
||||
*/
|
||||
fetch(range, options) {
|
||||
if (!options) {
|
||||
throw new Error("IMAPBox.fetch now requires an options object.")
|
||||
}
|
||||
if (range.length === 0) {
|
||||
return Rx.Observable.empty()
|
||||
}
|
||||
return Rx.Observable.create((observer) => {
|
||||
const f = this._imap.fetch(range, options);
|
||||
f.on('message', (imapMessage) => {
|
||||
const parts = {};
|
||||
let headers = null;
|
||||
let attributes = null;
|
||||
imapMessage.on('attributes', (attrs) => {
|
||||
attributes = attrs;
|
||||
});
|
||||
imapMessage.on('body', (stream, info) => {
|
||||
const chunks = [];
|
||||
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
stream.once('end', () => {
|
||||
const full = Buffer.concat(chunks).toString('utf8');
|
||||
if (info.which === 'HEADER') {
|
||||
headers = full;
|
||||
} else {
|
||||
parts[info.which] = full;
|
||||
}
|
||||
});
|
||||
});
|
||||
imapMessage.once('end', () => {
|
||||
observer.onNext({attributes, headers, parts});
|
||||
});
|
||||
})
|
||||
f.once('error', (error) => observer.onError(error))
|
||||
f.once('end', () => observer.onCompleted())
|
||||
})
|
||||
}
|
||||
|
||||
fetchStream({uid, options}) {
|
||||
if (!uid) {
|
||||
throw new Error("IMAPConnection.fetchStream requires a message uid.")
|
||||
}
|
||||
if (!options) {
|
||||
throw new Error("IMAPConnection.fetchStream requires an options object.")
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
const f = this._imap.fetch(uid, options);
|
||||
f.on('message', (imapMessage) => {
|
||||
imapMessage.on('body', (stream) => {
|
||||
resolve(stream)
|
||||
})
|
||||
})
|
||||
f.once('error', reject)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {Promise} that resolves to requested message
|
||||
*/
|
||||
fetchMessage(uid) {
|
||||
return this.fetch([uid], {
|
||||
bodies: ['HEADER', 'TEXT'],
|
||||
}).toPromise()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Promise} that resolves to a map of uid -> attributes for every
|
||||
* message in the range
|
||||
*/
|
||||
fetchUIDAttributes(range) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const attributesByUID = {};
|
||||
const f = this._imap.fetch(range, {});
|
||||
f.on('message', (msg) => {
|
||||
msg.on('attributes', (attrs) => {
|
||||
attributesByUID[attrs.uid] = attrs;
|
||||
})
|
||||
});
|
||||
f.once('error', reject);
|
||||
f.once('end', () => resolve(attributesByUID));
|
||||
});
|
||||
}
|
||||
|
||||
addFlags(range, flags) {
|
||||
if (!this._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`)
|
||||
}
|
||||
return this._imap.addFlagsAsync(range, flags)
|
||||
}
|
||||
|
||||
delFlags(range, flags) {
|
||||
if (!this._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`)
|
||||
}
|
||||
return this._imap.delFlagsAsync(range, flags)
|
||||
}
|
||||
|
||||
moveFromBox(range, folderName) {
|
||||
if (!this._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
|
||||
}
|
||||
return this._imap.moveAsync(range, folderName)
|
||||
}
|
||||
|
||||
closeBox({expunge = true} = {}) {
|
||||
if (!this._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`)
|
||||
}
|
||||
return this._imap.closeBoxAsync(expunge)
|
||||
}
|
||||
}
|
||||
|
||||
const IMAPBox = require('./imap-box');
|
||||
const {
|
||||
IMAPConnectionNotReadyError,
|
||||
IMAPConnectionEndedError,
|
||||
} = require('./imap-errors');
|
||||
|
||||
const Capabilities = {
|
||||
Gmail: 'X-GM-EXT-1',
|
||||
|
@ -195,8 +45,9 @@ class IMAPConnection extends EventEmitter {
|
|||
|
||||
connect() {
|
||||
if (!this._connectPromise) {
|
||||
this._connectPromise = this._resolveIMAPSettings()
|
||||
.then((settings) => this._buildUnderlyingConnection(settings))
|
||||
this._connectPromise = this._resolveIMAPSettings().then((settings) =>
|
||||
this._buildUnderlyingConnection(settings)
|
||||
);
|
||||
}
|
||||
return this._connectPromise;
|
||||
}
|
||||
|
@ -237,16 +88,6 @@ class IMAPConnection extends EventEmitter {
|
|||
return new Promise((resolve, reject) => {
|
||||
this._imap = Promise.promisifyAll(new Imap(settings));
|
||||
|
||||
this._imap.once('end', () => {
|
||||
this._logger.info('Underlying IMAP Connection ended');
|
||||
this._connectPromise = null;
|
||||
this._imap = null;
|
||||
});
|
||||
|
||||
this._imap.on('alert', (msg) => {
|
||||
this._logger.info({imap_server_msg: msg}, `IMAP server message`)
|
||||
})
|
||||
|
||||
// Emitted when new mail arrives in the currently open mailbox.
|
||||
// Fix https://github.com/mscdex/node-imap/issues/445
|
||||
let lastMailEventBox = null;
|
||||
|
@ -263,8 +104,26 @@ class IMAPConnection extends EventEmitter {
|
|||
|
||||
// Emitted when message metadata (e.g. flags) changes externally.
|
||||
this._imap.on('update', () => this.emit('update'))
|
||||
this._imap.once('ready', () => resolve(this));
|
||||
this._imap.once('error', reject);
|
||||
|
||||
this._imap.once('ready', () => {
|
||||
resolve(this)
|
||||
});
|
||||
|
||||
this._imap.once('error', (err) => {
|
||||
this.end();
|
||||
reject(err);
|
||||
});
|
||||
|
||||
this._imap.once('end', () => {
|
||||
this._logger.info('Underlying IMAP Connection ended');
|
||||
this._connectPromise = null;
|
||||
this._imap = null;
|
||||
});
|
||||
|
||||
this._imap.on('alert', (msg) => {
|
||||
this._logger.info({imap_server_msg: msg}, `IMAP server message`)
|
||||
});
|
||||
|
||||
this._imap.connect();
|
||||
});
|
||||
}
|
||||
|
@ -293,7 +152,7 @@ class IMAPConnection extends EventEmitter {
|
|||
throw new IMAPConnectionNotReadyError(`IMAPConnection::openBox`)
|
||||
}
|
||||
return this._imap.openBoxAsync(folderName, readOnly).then((box) =>
|
||||
new IMAPBox(this._imap, box)
|
||||
new IMAPBox(this, box)
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -337,6 +196,45 @@ class IMAPConnection extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
/*
|
||||
Equivalent to new Promise, but allows you to easily create promises
|
||||
which are also rejected when the IMAP connection is closed or ends.
|
||||
This is important because node-imap sometimes just hangs the current
|
||||
fetch / action forever after emitting an `end` event.
|
||||
*/
|
||||
createConnectionPromise(callback) {
|
||||
if (!this._imap) {
|
||||
throw new IMAPConnectionNotReadyError(`IMAPConnection::createConnectionPromise`)
|
||||
}
|
||||
|
||||
let onEnded = null;
|
||||
let onErrored = null;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
let returned = false;
|
||||
onEnded = () => {
|
||||
returned = true;
|
||||
reject(new IMAPConnectionEndedError());
|
||||
};
|
||||
onErrored = (error) => {
|
||||
returned = true;
|
||||
reject(error || new Error("Unspecified IMAP error."));
|
||||
};
|
||||
|
||||
this._imap.once('error', onEnded);
|
||||
this._imap.once('end', onErrored);
|
||||
|
||||
const cresolve = (...args) => (!returned ? resolve(...args) : null)
|
||||
const creject = (...args) => (!returned ? reject(...args) : null)
|
||||
return callback(cresolve, creject)
|
||||
}).finally(() => {
|
||||
if (this._imap) {
|
||||
this._imap.removeListener('error', onEnded);
|
||||
this._imap.removeListener('end', onErrored);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
processNextOperation() {
|
||||
if (this._currentOperation) {
|
||||
return;
|
||||
|
|
21
packages/nylas-core/imap-errors.js
Normal file
21
packages/nylas-core/imap-errors.js
Normal file
|
@ -0,0 +1,21 @@
|
|||
|
||||
// "Source" is a hack so that the error matches the ones used by node-imap
|
||||
|
||||
class IMAPConnectionNotReadyError extends Error {
|
||||
constructor(funcName) {
|
||||
super(`${funcName} - You must call connect() first.`);
|
||||
this.source = 'socket';
|
||||
}
|
||||
}
|
||||
|
||||
class IMAPConnectionEndedError extends Error {
|
||||
constructor(msg = "The IMAP Connection was ended.") {
|
||||
super(msg);
|
||||
this.source = 'socket';
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
IMAPConnectionNotReadyError,
|
||||
IMAPConnectionEndedError,
|
||||
};
|
|
@ -24,12 +24,9 @@ module.exports = (sequelize, Sequelize) => {
|
|||
.then(({message, connection}) => {
|
||||
return message.getFolder()
|
||||
.then((folder) => connection.openBox(folder.name))
|
||||
.then((imapBox) => imapBox.fetchStream({
|
||||
uid: message.folderImapUID,
|
||||
options: {
|
||||
bodies: [this.partId],
|
||||
struct: true,
|
||||
},
|
||||
.then((imapBox) => imapBox.fetchMessageStream(message.folderImapUID, {
|
||||
bodies: [this.partId],
|
||||
struct: true,
|
||||
}))
|
||||
.then((stream) => {
|
||||
if (stream) {
|
||||
|
|
|
@ -166,8 +166,7 @@ class FetchMessagesInFolder {
|
|||
_fetchMessagesAndQueueForProcessing(range) {
|
||||
const uidsByPart = {};
|
||||
|
||||
const $structs = this._box.fetch(range, {struct: true})
|
||||
$structs.subscribe(({attributes}) => {
|
||||
return this._box.fetchEach(range, {struct: true}, ({attributes}) => {
|
||||
const desiredParts = this._getDesiredMIMEParts(attributes.struct);
|
||||
if (desiredParts.length === 0) {
|
||||
return;
|
||||
|
@ -175,13 +174,13 @@ class FetchMessagesInFolder {
|
|||
const key = JSON.stringify(desiredParts);
|
||||
uidsByPart[key] = uidsByPart[key] || [];
|
||||
uidsByPart[key].push(attributes.uid);
|
||||
});
|
||||
|
||||
return $structs.toPromise().then(() => {
|
||||
})
|
||||
.then(() => {
|
||||
return Promise.each(Object.keys(uidsByPart), (key) => {
|
||||
const uids = uidsByPart[key];
|
||||
const desiredParts = JSON.parse(key);
|
||||
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
|
||||
|
||||
this._logger.info({
|
||||
key,
|
||||
num_messages: uids.length,
|
||||
|
@ -190,17 +189,14 @@ class FetchMessagesInFolder {
|
|||
// note: the order of UIDs in the array doesn't matter, Gmail always
|
||||
// returns them in ascending (oldest => newest) order.
|
||||
|
||||
const $body = this._box.fetch(uids, {bodies, struct: true})
|
||||
$body.subscribe((msg) => {
|
||||
return this._box.fetchEach(uids, {bodies, struct: true}, (msg) => {
|
||||
msg.body = {};
|
||||
for (const {id, mimetype} of desiredParts) {
|
||||
msg.body[mimetype] = msg.parts[id];
|
||||
}
|
||||
this._processMessage(msg);
|
||||
});
|
||||
|
||||
return $body.toPromise();
|
||||
})
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue