diff --git a/packages/isomorphic-core/src/imap-connection.js b/packages/isomorphic-core/src/imap-connection.js index f1ff3fffe..0a3286ea6 100644 --- a/packages/isomorphic-core/src/imap-connection.js +++ b/packages/isomorphic-core/src/imap-connection.js @@ -262,18 +262,18 @@ class IMAPConnection extends EventEmitter { } const {operation, resolve, reject} = this._currentOperation; - const result = operation.run(this._db, this); - if (result.constructor.name !== "Promise") { + const resultPromise = operation.run(this._db, this); + if (resultPromise.constructor.name !== "Promise") { reject(new Error(`Expected ${operation.constructor.name} to return promise.`)) } - result.then(() => { + resultPromise.then((maybeResult) => { this._currentOperation = null; this._logger.info({ operation_type: operation.constructor.name, operation_description: operation.description(), }, `Finished sync operation`) - resolve(); + resolve(maybeResult); this.processNextOperation(); }) .catch((err) => { diff --git a/packages/local-sync/src/local-api/routes/threads.js b/packages/local-sync/src/local-api/routes/threads.js index 0daed2f4c..af793715f 100644 --- a/packages/local-sync/src/local-api/routes/threads.js +++ b/packages/local-sync/src/local-api/routes/threads.js @@ -1,4 +1,5 @@ const Joi = require('joi'); +const stream = require('stream'); const _ = require('underscore'); const Serialization = require('../serialization'); const {createSyncbackRequest} = require('../route-helpers') @@ -69,8 +70,13 @@ module.exports = (server) => { const account = request.auth.credentials; const db = await request.getAccountDatabase(); const client = searchClientForAccount(account); - const threads = await client.searchThreads(db, request.query.q, request.query.limit); - reply(`${JSON.stringify(threads)}\n`); + const source = await client.searchThreads(db, request.query.q, request.query.limit); + + const outputStream = stream.Readable(); + outputStream._read = () => { return }; + const disposable = source.subscribe((str) => outputStream.push(str)); + request.on("disconnect", () => disposable.dispose()); + reply(outputStream); }, }); diff --git a/packages/local-sync/src/local-api/search.js b/packages/local-sync/src/local-api/search.js index 004e88713..3d2e1cfb9 100644 --- a/packages/local-sync/src/local-api/search.js +++ b/packages/local-sync/src/local-api/search.js @@ -1,9 +1,36 @@ const request = require('request'); const _ = require('underscore'); +const Rx = require('rx'); +const {IMAPConnection} = require('isomorphic-core') + +const getThreadsForMessages = (db, messages, limit) => { + const {Message, Folder, Label, Thread} = db; + const threadIds = _.uniq(messages.map((m) => m.threadId)); + return Thread.findAll({ + where: {id: threadIds}, + include: [ + {model: Folder}, + {model: Label}, + { + model: Message, + as: 'messages', + attributes: _.without(Object.keys(Message.attributes), 'body'), + include: [ + {model: Folder}, + ], + }, + ], + limit: limit, + order: [['lastMessageReceivedDate', 'DESC']], + }); +}; class GmailSearchClient { - constructor(accountToken) { - this.accountToken = accountToken; + constructor(account) { + const credentials = account.decryptedCredentials(); + this.accountToken = account.bearerToken(credentials.xoauth2); + this.account = account; + this._logger = global.Logger.forAccount(this.account); } // Note that the Gmail API returns message IDs in hex format. So for @@ -53,7 +80,7 @@ class GmailSearchClient { if (numTries >= maxTries) { // If we've been through the loop 10 times, it means we got a request // a crazy-high offset --- raise an error. - console.error('Too many results:', results.length); + this._logger.error('Too many results:', results.length); reject(new Error('Too many results')); return; } @@ -112,39 +139,147 @@ class GmailSearchClient { return []; } - const {Message, Folder, Label, Thread} = db; + const {Message} = db; const messages = await Message.findAll({ where: {gMsgId: {$in: messageIds}}, }); - const threadIds = _.uniq(messages.map((m) => m.threadId)); - const threads = await Thread.findAll({ - where: {id: threadIds}, - include: [ - {model: Folder}, - {model: Label}, - { - model: Message, - as: 'messages', - attributes: _.without(Object.keys(Message.attributes), 'body'), - include: [ - {model: Folder}, - ], - }, - ], - limit: limit, - order: [['lastMessageReceivedDate', 'DESC']], + const stringifiedThreads = getThreadsForMessages(db, messages, limit) + .then((threads) => `${JSON.stringify(threads)}\n`); + return Rx.Observable.fromPromise(stringifiedThreads); + } +} + +class SearchFolder { + constructor(folder, criteria) { + this.folder = folder; + this.criteria = criteria; + } + + description() { + return 'IMAP folder search'; + } + + run(db, imap) { + return imap.openBox(this.folder.name).then((box) => { + return box.search(this.criteria); + }); + } +} + +class ImapSearchClient { + constructor(account) { + this.account = account; + this._conn = null; + this._logger = global.Logger.forAccount(this.account); + } + + async ensureConnection() { + if (this._conn) { + return await this._conn.connect(); + } + const settings = this.account.connectionSettings; + const credentials = this.account.decryptedCredentials(); + + if (!settings || !settings.imap_host) { + throw new Error("ensureConnection: There are no IMAP connection settings for this account."); + } + if (!credentials) { + throw new Error("ensureConnection: There are no IMAP connection credentials for this account."); + } + + const conn = new IMAPConnection({ + db: this._db, + settings: Object.assign({}, settings, credentials), + logger: this._logger, + }); + + this._conn = conn; + return await this._conn.connect(); + } + + closeConnection() { + if (this._conn) { + this._conn.end(); + } + } + + async _search(db, query) { + await this.ensureConnection(); + + // We want to start the search with the 'inbox', 'sent' and 'archive' + // folders, if they exist. + const {Folder} = db; + let folders = await Folder.findAll({ + where: { + accountId: this.account.id, + role: ['inbox', 'sent', 'archive'], + }, + }); + + const accountFolders = await Folder.findAll({ + where: { + accountId: this.account.id, + id: {$notIn: folders.map((f) => f.id)}, + }, + }); + + folders = folders.concat(accountFolders); + + const criteria = [['TEXT', query]]; + return Rx.Observable.create((observer) => { + const chain = folders.reduce((acc, folder) => { + return acc.then((uids) => { + if (uids.length > 0) { + observer.onNext(uids); + } + return this._searchFolder(folder, criteria); + }); + }, Promise.resolve([])); + + chain.then((uids) => { + if (uids.length > 0) { + observer.onNext(uids); + } + observer.onCompleted(); + }).finally(() => this.closeConnection()); + }); + } + + _searchFolder(folder, criteria) { + return this._conn.runOperation(new SearchFolder(folder, criteria)).catch((error) => { + this._logger.error(`Search error: ${error}`); + return Promise.resolve([]); + }); + } + + async searchThreads(db, query, limit) { + const {Message} = db; + return (await this._search(db, query)).flatMap((uids) => { + return Message.findAll({ + where: { + accountId: this.account.id, + folderImapUID: uids, + }, + }); + }).flatMap((messages) => { + return getThreadsForMessages(db, messages, limit); + }).flatMap((threads) => { + if (threads.length > 0) { + return `${JSON.stringify(threads)}\n`; + } + return '\n'; }); - return threads; } } module.exports.searchClientForAccount = (account) => { switch (account.provider) { case 'gmail': { - const credentials = account.decryptedCredentials(); - const accountToken = account.bearerToken(credentials.xoauth2); - return new GmailSearchClient(accountToken); + return new GmailSearchClient(account); + } + case 'imap': { + return new ImapSearchClient(account); } default: { throw new Error(`Unsupported provider for search endpoint: ${account.provider}`);