mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-09-07 21:24:24 +08:00
Update IMAPConnection api + error handling fixes + misc
- `IMAPConnection::openBox` now returns a Promise that resolves to an IMAPBox, and IMAPBox contains all of the `fetch` operations. This makes the dependency between fetch operations and the currently open mailbox explicit rather than implicit and by forcing the operations to be called on a box instance and hopefully prevent errors. It will also throw an error if the constraint is no longer satisfied. - `fetch` operations now return an observable stream of messages (or Promise for single value), while preserving the same logic of the original implementation. You can use `.toPromise()` on the observable to get a Promise that resolves when the observable stream has completely drained. - Fixes error handling in a few places and renames some variables
This commit is contained in:
parent
63f48ae817
commit
dd350a5081
5 changed files with 219 additions and 172 deletions
|
@ -96,7 +96,7 @@ module.exports = (server) => {
|
|||
const {settings, email, provider, name} = request.payload;
|
||||
|
||||
if (provider === 'imap') {
|
||||
connectionChecks.push(new IMAPConnection(dbStub, settings).connect())
|
||||
connectionChecks.push(IMAPConnection.connect(dbStub, settings))
|
||||
}
|
||||
|
||||
Promise.all(connectionChecks).then(() => {
|
||||
|
@ -188,7 +188,7 @@ module.exports = (server) => {
|
|||
}
|
||||
|
||||
Promise.all([
|
||||
new IMAPConnection({}, Object.assign({}, settings, credentials)).connect(),
|
||||
IMAPConnection.connect({}, Object.assign({}, settings, credentials)),
|
||||
])
|
||||
.then(() =>
|
||||
buildAccountWith({name: profile.name, email: profile.email, settings, credentials})
|
||||
|
|
|
@ -60,23 +60,22 @@ module.exports = (server) => {
|
|||
const {headers: {accept}} = request
|
||||
const {params: {id}} = request
|
||||
const account = request.auth.credentials
|
||||
const query = db.Message.findOne({where: {id}})
|
||||
|
||||
if (accept === 'message/rfc822') {
|
||||
query.then((message) => {
|
||||
// TODO message not found
|
||||
message.fetchRaw({account, db})
|
||||
db.Message.findOne({where: {id}})
|
||||
.then((message) => {
|
||||
if (!message) {
|
||||
return reply.notFound(`Message ${id} not found`)
|
||||
}
|
||||
if (accept === 'message/rfc822') {
|
||||
return message.fetchRaw({account, db})
|
||||
.then((rawMessage) => reply(rawMessage))
|
||||
.catch((error) => {
|
||||
console.log('Error fetching raw message: ', error)
|
||||
reply(error)
|
||||
})
|
||||
})
|
||||
} else {
|
||||
query.then((message) => {
|
||||
reply(Serialization.jsonStringify(message));
|
||||
})
|
||||
}
|
||||
}
|
||||
return reply(Serialization.jsonStringify(message));
|
||||
})
|
||||
.catch((error) => {
|
||||
console.log('Error fetching message: ', error)
|
||||
reply(error)
|
||||
})
|
||||
})
|
||||
},
|
||||
})
|
||||
|
|
|
@ -1,7 +1,106 @@
|
|||
const Rx = require('rx')
|
||||
const Imap = require('imap');
|
||||
const EventEmitter = require('events');
|
||||
const xoauth2 = require("xoauth2");
|
||||
const _ = require('underscore');
|
||||
const xoauth2 = require("xoauth2");
|
||||
const EventEmitter = require('events');
|
||||
|
||||
|
||||
class IMAPBox {
|
||||
|
||||
constructor(imapConn, box) {
|
||||
this._imap = imapConn
|
||||
this._box = box
|
||||
return new Proxy(this, {
|
||||
get(target, name) {
|
||||
const prop = Reflect.get(target, name)
|
||||
if (!prop) {
|
||||
return Reflect.get(target._box, name)
|
||||
}
|
||||
if (_.isFunction(prop) && target._imap._box.name !== target._box.name) {
|
||||
return () => Promise.reject(
|
||||
new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`)
|
||||
)
|
||||
}
|
||||
return prop
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Observable} that will feed each message as it becomes ready
|
||||
*/
|
||||
fetch(range) {
|
||||
if (range.length === 0) {
|
||||
return Rx.Observable.empty()
|
||||
}
|
||||
return Rx.Observable.create((observer) => {
|
||||
const f = this._imap.fetch(range, {
|
||||
bodies: ['HEADER', 'TEXT'],
|
||||
});
|
||||
f.on('message', (imapMessage) => {
|
||||
let attributes = null;
|
||||
let body = null;
|
||||
let headers = null;
|
||||
imapMessage.on('attributes', (attrs) => {
|
||||
attributes = attrs;
|
||||
});
|
||||
imapMessage.on('body', (stream, info) => {
|
||||
const chunks = [];
|
||||
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.once('end', () => {
|
||||
const full = Buffer.concat(chunks).toString('utf8');
|
||||
if (info.which === 'HEADER') {
|
||||
headers = full;
|
||||
}
|
||||
if (info.which === 'TEXT') {
|
||||
body = full;
|
||||
}
|
||||
});
|
||||
});
|
||||
imapMessage.once('end', () => {
|
||||
observer.onNext({attributes, headers, body});
|
||||
});
|
||||
})
|
||||
f.once('error', (error) => observer.onError(error))
|
||||
f.once('end', () => observer.onCompleted())
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {Promise} that resolves to requested message
|
||||
*/
|
||||
fetchMessage(uid) {
|
||||
return this.fetch([uid]).toPromise()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {array|string} range - can be a single message identifier,
|
||||
* a message identifier range (e.g. '2504:2507' or '*' or '2504:*'),
|
||||
* an array of message identifiers, or an array of message identifier ranges.
|
||||
* @return {Promise} that resolves to a map of uid -> attributes for every
|
||||
* message in the range
|
||||
*/
|
||||
fetchUIDAttributes(range) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const attributesByUID = {};
|
||||
const f = this._imap.fetch(range, {});
|
||||
f.on('message', (msg) => {
|
||||
msg.on('attributes', (attrs) => {
|
||||
attributesByUID[attrs.uid] = attrs;
|
||||
})
|
||||
});
|
||||
f.once('error', reject);
|
||||
f.once('end', () => resolve(attributesByUID));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
const Capabilities = {
|
||||
Gmail: 'X-GM-EXT-1',
|
||||
|
@ -12,22 +111,45 @@ const Capabilities = {
|
|||
Sort: 'SORT',
|
||||
}
|
||||
|
||||
const EnsureConnected = [
|
||||
'openBox',
|
||||
'getBoxes',
|
||||
'serverSupports',
|
||||
'runOperation',
|
||||
'processNextOperation',
|
||||
'end',
|
||||
]
|
||||
|
||||
class IMAPConnection extends EventEmitter {
|
||||
|
||||
static connect(...args) {
|
||||
return new IMAPConnection(...args).connect()
|
||||
}
|
||||
|
||||
constructor(db, settings) {
|
||||
super();
|
||||
|
||||
this._db = db;
|
||||
this._queue = [];
|
||||
this._current = null;
|
||||
this._currentOperation = null;
|
||||
this._settings = settings;
|
||||
this._imap = null
|
||||
|
||||
return new Proxy(this, {
|
||||
get(target, name) {
|
||||
if (EnsureConnected.includes(name) && !target._imap) {
|
||||
return () => Promise.reject(
|
||||
new Error(`IMAPConnection::${name} - You need to call connect() first.`)
|
||||
)
|
||||
}
|
||||
return Reflect.get(target, name)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
connect() {
|
||||
if (!this._connectPromise) {
|
||||
this._connectPromise = this._resolveIMAPSettings().then((settings) =>
|
||||
this._buildUnderlyingConnection(settings)
|
||||
)
|
||||
this._connectPromise = this._resolveIMAPSettings()
|
||||
.then((settings) => this._buildUnderlyingConnection(settings))
|
||||
}
|
||||
return this._connectPromise;
|
||||
}
|
||||
|
@ -92,7 +214,7 @@ class IMAPConnection extends EventEmitter {
|
|||
|
||||
// Emitted when message metadata (e.g. flags) changes externally.
|
||||
this._imap.on('update', () => this.emit('update'))
|
||||
this._imap.once('ready', resolve);
|
||||
this._imap.once('ready', () => resolve(this));
|
||||
this._imap.once('error', reject);
|
||||
this._imap.connect();
|
||||
});
|
||||
|
@ -103,137 +225,52 @@ class IMAPConnection extends EventEmitter {
|
|||
this._imap.end();
|
||||
}
|
||||
|
||||
serverSupports(cap) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.serverSupports: You need to call connect() first.")
|
||||
}
|
||||
this._imap.serverSupports(cap);
|
||||
serverSupports(capability) {
|
||||
this._imap.serverSupports(capability);
|
||||
}
|
||||
|
||||
openBox(box) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.openBox: You need to call connect() first.")
|
||||
}
|
||||
return this._imap.openBoxAsync(box, true);
|
||||
/**
|
||||
* @return {Promise} that resolves to instance of IMAPBox
|
||||
*/
|
||||
openBox(categoryName, {readOnly = true} = {}) {
|
||||
return this._imap.openBoxAsync(categoryName, readOnly)
|
||||
.then((box) => new IMAPBox(this._imap, box))
|
||||
}
|
||||
|
||||
getBoxes() {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.getBoxes: You need to call connect() first.")
|
||||
}
|
||||
return this._imap.getBoxesAsync();
|
||||
}
|
||||
|
||||
fetch(range, messageCallback) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.fetch: You need to call connect() first.")
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
const f = this._imap.fetch(range, {
|
||||
bodies: ['HEADER', 'TEXT'],
|
||||
});
|
||||
f.on('message', (msg) =>
|
||||
this._receiveMessage(msg, messageCallback)
|
||||
)
|
||||
f.once('error', reject);
|
||||
f.once('end', resolve);
|
||||
});
|
||||
}
|
||||
|
||||
fetchMessages(uids, messageCallback) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.fetchMessages: You need to call connect() first.")
|
||||
}
|
||||
if (uids.length === 0) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return this.fetch(uids, messageCallback);
|
||||
}
|
||||
|
||||
fetchUIDAttributes(range) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.fetchUIDAttributes: You need to call connect() first.")
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
const latestUIDAttributes = {};
|
||||
const f = this._imap.fetch(range, {});
|
||||
f.on('message', (msg) => {
|
||||
msg.on('attributes', (attrs) => {
|
||||
latestUIDAttributes[attrs.uid] = attrs;
|
||||
})
|
||||
});
|
||||
f.once('error', reject);
|
||||
f.once('end', () => {
|
||||
resolve(latestUIDAttributes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
_receiveMessage(msg, callback) {
|
||||
let attributes = null;
|
||||
let body = null;
|
||||
let headers = null;
|
||||
|
||||
msg.on('attributes', (attrs) => {
|
||||
attributes = attrs;
|
||||
});
|
||||
msg.on('body', (stream, info) => {
|
||||
const chunks = [];
|
||||
|
||||
stream.on('data', (chunk) => {
|
||||
chunks.push(chunk);
|
||||
});
|
||||
stream.once('end', () => {
|
||||
const full = Buffer.concat(chunks).toString('utf8');
|
||||
if (info.which === 'HEADER') {
|
||||
headers = full;
|
||||
}
|
||||
if (info.which === 'TEXT') {
|
||||
body = full;
|
||||
}
|
||||
});
|
||||
});
|
||||
msg.once('end', () => {
|
||||
callback(attributes, headers, body);
|
||||
});
|
||||
return this._imap.getBoxesAsync()
|
||||
}
|
||||
|
||||
runOperation(operation) {
|
||||
if (!this._imap) {
|
||||
throw new Error("IMAPConnection.runOperation: You need to call connect() first.")
|
||||
}
|
||||
return new Promise((resolve, reject) => {
|
||||
this._queue.push({operation, resolve, reject});
|
||||
if (this._imap.state === 'authenticated' && !this._current) {
|
||||
if (this._imap.state === 'authenticated' && !this._currentOperation) {
|
||||
this.processNextOperation();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
processNextOperation() {
|
||||
if (this._current) { return; }
|
||||
|
||||
this._current = this._queue.shift();
|
||||
|
||||
if (!this._current) {
|
||||
if (this._currentOperation) { return; }
|
||||
this._currentOperation = this._queue.shift();
|
||||
if (!this._currentOperation) {
|
||||
this.emit('queue-empty');
|
||||
return;
|
||||
}
|
||||
|
||||
const {operation, resolve, reject} = this._current;
|
||||
|
||||
const {operation, resolve, reject} = this._currentOperation;
|
||||
console.log(`Starting task ${operation.description()}`)
|
||||
const result = operation.run(this._db, this);
|
||||
if (result instanceof Promise === false) {
|
||||
throw new Error(`Expected ${operation.constructor.name} to return promise.`);
|
||||
}
|
||||
result.catch((err) => {
|
||||
this._current = null;
|
||||
this._currentOperation = null;
|
||||
console.error(err);
|
||||
reject();
|
||||
})
|
||||
.then(() => {
|
||||
this._current = null;
|
||||
this._currentOperation = null;
|
||||
console.log(`Finished task ${operation.description()}`)
|
||||
resolve();
|
||||
})
|
||||
|
@ -242,7 +279,6 @@ class IMAPConnection extends EventEmitter {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
IMAPConnection.Capabilities = Capabilities;
|
||||
|
||||
module.exports = IMAPConnection
|
||||
|
|
|
@ -40,23 +40,21 @@ module.exports = (sequelize, Sequelize) => {
|
|||
},
|
||||
instanceMethods: {
|
||||
fetchRaw({account, db}) {
|
||||
return this.getCategory()
|
||||
.then((category) => {
|
||||
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
|
||||
const conn = new IMAPConnection(db, settings)
|
||||
return conn.connect()
|
||||
.then(() => conn.openBox(category.name))
|
||||
.then(() => {
|
||||
return new Promise((resolve) => {
|
||||
conn.fetchMessages([this.CategoryUID], (attributes, headers, body) => {
|
||||
resolve(`${headers}${body}`)
|
||||
})
|
||||
})
|
||||
})
|
||||
.then((raw) => {
|
||||
conn.end()
|
||||
return raw
|
||||
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
|
||||
return Promise.props({
|
||||
category: this.getCategory(),
|
||||
connection: IMAPConnection.connect(db, settings),
|
||||
})
|
||||
.then(({category, connection}) => {
|
||||
return connection.openBox(category.name)
|
||||
.then((imapBox) => imapBox.fetchMessage(this.CategoryUID))
|
||||
.then((message) => {
|
||||
if (message) {
|
||||
return Promise.resolve(`${message.headers}${message.body}`)
|
||||
}
|
||||
return Promise.reject(new Error(`Unable to fetch raw message for Message ${this.id}`))
|
||||
})
|
||||
.finally(() => connection.end())
|
||||
})
|
||||
},
|
||||
toJSON: function toJSON() {
|
||||
|
|
|
@ -7,6 +7,9 @@ const MessageFlagAttributes = ['id', 'CategoryUID', 'unread', 'starred']
|
|||
|
||||
class FetchMessagesInCategory {
|
||||
constructor(category, options) {
|
||||
this._imap = null
|
||||
this._box = null
|
||||
this._db = null
|
||||
this._category = category;
|
||||
this._options = options;
|
||||
if (!this._category) {
|
||||
|
@ -73,7 +76,7 @@ class FetchMessagesInCategory {
|
|||
console.log(` -- found ${changedMessages.length} flag changes`)
|
||||
|
||||
return Promise.props({
|
||||
creates: this._imap.fetchMessages(createdUIDs, this._processMessage.bind(this)),
|
||||
creates: this._fetchMessagesAndQueueForProcessing(createdUIDs),
|
||||
updates: this._db.sequelize.transaction((transaction) =>
|
||||
Promise.all(changedMessages.map(m => m.save({
|
||||
fields: MessageFlagAttributes,
|
||||
|
@ -109,9 +112,14 @@ class FetchMessagesInCategory {
|
|||
);
|
||||
}
|
||||
|
||||
_processMessage(attributes, headers, body) {
|
||||
const {Message, accountId} = this._db;
|
||||
_fetchMessagesAndQueueForProcessing(range) {
|
||||
const messagesObservable = this._box.fetch(range)
|
||||
messagesObservable.subscribe(this._processMessage.bind(this))
|
||||
return messagesObservable.toPromise()
|
||||
}
|
||||
|
||||
_processMessage({attributes, headers, body}) {
|
||||
const {Message, accountId} = this._db;
|
||||
const hash = Message.hashForHeaders(headers);
|
||||
const values = {
|
||||
hash: hash,
|
||||
|
@ -134,16 +142,16 @@ class FetchMessagesInCategory {
|
|||
}
|
||||
|
||||
_openMailboxAndEnsureValidity() {
|
||||
return this._imap.openBox(this._category.name, true).then((box) => {
|
||||
this._box = box;
|
||||
|
||||
return this._imap.openBox(this._category.name)
|
||||
.then((box) => {
|
||||
if (box.persistentUIDs === false) {
|
||||
throw new Error("Mailbox does not support persistentUIDs.")
|
||||
return Promise.reject(new Error("Mailbox does not support persistentUIDs."))
|
||||
}
|
||||
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
|
||||
return this._recoverFromUIDInvalidity();
|
||||
return this._recoverFromUIDInvalidity()
|
||||
.then(() => Promise.resolve(box))
|
||||
}
|
||||
return Promise.resolve();
|
||||
return Promise.resolve(box);
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -167,14 +175,14 @@ class FetchMessagesInCategory {
|
|||
range = `${savedSyncState.uidnext}:*`
|
||||
}
|
||||
|
||||
return this._imap.fetch(range, this._processMessage.bind(this)).then(() => {
|
||||
return this._fetchMessagesAndQueueForProcessing(range).then(() => {
|
||||
console.log(` - finished fetching unseen messages`);
|
||||
return this.updateCategorySyncState({
|
||||
uidnext: boxSyncState.uidnext,
|
||||
uidvalidity: boxSyncState.uidvalidity,
|
||||
timeFetchedUnseen: Date.now(),
|
||||
});
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
_shouldRunDeepScan() {
|
||||
|
@ -184,21 +192,25 @@ class FetchMessagesInCategory {
|
|||
|
||||
_runDeepScan(range) {
|
||||
const {Message} = this._db;
|
||||
return this._imap.fetchUIDAttributes(range).then((remoteUIDAttributes) =>
|
||||
return this._box.fetchUIDAttributes(range)
|
||||
.then((remoteUIDAttributes) =>
|
||||
Message.findAll({
|
||||
where: {CategoryId: this._category.id},
|
||||
attributes: MessageFlagAttributes,
|
||||
}).then((localMessageAttributes) =>
|
||||
})
|
||||
.then((localMessageAttributes) => (
|
||||
Promise.props({
|
||||
upserts: this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes),
|
||||
deletes: this._removeDeletedMessages(remoteUIDAttributes, localMessageAttributes),
|
||||
})
|
||||
).then(() => {
|
||||
))
|
||||
.then(() => {
|
||||
console.log(` - finished fetching changes to messages ${range}`);
|
||||
return this.updateCategorySyncState({
|
||||
highestmodseq: this._box.highestmodseq,
|
||||
timeDeepScan: Date.now(),
|
||||
timeShallowScan: Date.now(),
|
||||
});
|
||||
})
|
||||
})
|
||||
);
|
||||
}
|
||||
|
@ -206,7 +218,6 @@ class FetchMessagesInCategory {
|
|||
_fetchChangesToMessages() {
|
||||
const {highestmodseq} = this._category.syncState;
|
||||
const nextHighestmodseq = this._box.highestmodseq;
|
||||
|
||||
const range = `${this._getLowerBoundUID(this._options.limit)}:*`;
|
||||
|
||||
console.log(` - fetching changes to messages ${range}`)
|
||||
|
@ -216,30 +227,33 @@ class FetchMessagesInCategory {
|
|||
}
|
||||
|
||||
let shallowFetch = null;
|
||||
|
||||
if (this._imap.serverSupports(Capabilities.Condstore)) {
|
||||
if (nextHighestmodseq === highestmodseq) {
|
||||
console.log(" --- highestmodseq matches, nothing more to fetch")
|
||||
return Promise.resolve();
|
||||
}
|
||||
shallowFetch = this._imap.fetchUIDAttributes(range, {changedsince: highestmodseq});
|
||||
shallowFetch = this._box.fetchUIDAttributes(range, {changedsince: highestmodseq});
|
||||
} else {
|
||||
shallowFetch = this._imap.fetchUIDAttributes(`${this._getLowerBoundUID(1000)}:*`);
|
||||
shallowFetch = this._box.fetchUIDAttributes(`${this._getLowerBoundUID(1000)}:*`);
|
||||
}
|
||||
|
||||
return shallowFetch.then((remoteUIDAttributes) =>
|
||||
return shallowFetch
|
||||
.then((remoteUIDAttributes) => (
|
||||
this._db.Message.findAll({
|
||||
where: {CategoryId: this._category.id},
|
||||
attributes: MessageFlagAttributes,
|
||||
}).then((localMessageAttributes) =>
|
||||
})
|
||||
.then((localMessageAttributes) => (
|
||||
this._createAndUpdateMessages(remoteUIDAttributes, localMessageAttributes)
|
||||
).then(() => {
|
||||
))
|
||||
.then(() => {
|
||||
console.log(` - finished fetching changes to messages ${range}`);
|
||||
return this.updateCategorySyncState({
|
||||
highestmodseq: nextHighestmodseq,
|
||||
timeShallowScan: Date.now(),
|
||||
});
|
||||
})
|
||||
})
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
updateCategorySyncState(newState) {
|
||||
|
@ -255,11 +269,11 @@ class FetchMessagesInCategory {
|
|||
this._imap = imap;
|
||||
|
||||
return this._openMailboxAndEnsureValidity()
|
||||
.then(() =>
|
||||
this._fetchUnseenMessages()
|
||||
).then(() =>
|
||||
this._fetchChangesToMessages()
|
||||
)
|
||||
.then((box) => {
|
||||
this._box = box
|
||||
return this._fetchUnseenMessages()
|
||||
.then(() => this._fetchChangesToMessages())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue