[local-sync] Implement /thread/search endpoint for generic IMAP

Summary: See title

Test Plan: Run locally

Reviewers: juan, evan

Reviewed By: juan, evan

Maniphest Tasks: T7281

Differential Revision: https://phab.nylas.com/D3498
This commit is contained in:
Mark Hahnenberg 2016-12-12 17:35:25 -08:00
parent 138c79bf71
commit cfc8d3e315
3 changed files with 172 additions and 31 deletions

View file

@ -262,18 +262,18 @@ class IMAPConnection extends EventEmitter {
}
const {operation, resolve, reject} = this._currentOperation;
const result = operation.run(this._db, this);
if (result.constructor.name !== "Promise") {
const resultPromise = operation.run(this._db, this);
if (resultPromise.constructor.name !== "Promise") {
reject(new Error(`Expected ${operation.constructor.name} to return promise.`))
}
result.then(() => {
resultPromise.then((maybeResult) => {
this._currentOperation = null;
this._logger.info({
operation_type: operation.constructor.name,
operation_description: operation.description(),
}, `Finished sync operation`)
resolve();
resolve(maybeResult);
this.processNextOperation();
})
.catch((err) => {

View file

@ -1,4 +1,5 @@
const Joi = require('joi');
const stream = require('stream');
const _ = require('underscore');
const Serialization = require('../serialization');
const {createSyncbackRequest} = require('../route-helpers')
@ -69,8 +70,13 @@ module.exports = (server) => {
const account = request.auth.credentials;
const db = await request.getAccountDatabase();
const client = searchClientForAccount(account);
const threads = await client.searchThreads(db, request.query.q, request.query.limit);
reply(`${JSON.stringify(threads)}\n`);
const source = await client.searchThreads(db, request.query.q, request.query.limit);
const outputStream = stream.Readable();
outputStream._read = () => { return };
const disposable = source.subscribe((str) => outputStream.push(str));
request.on("disconnect", () => disposable.dispose());
reply(outputStream);
},
});

View file

@ -1,9 +1,36 @@
const request = require('request');
const _ = require('underscore');
const Rx = require('rx');
const {IMAPConnection} = require('isomorphic-core')
const getThreadsForMessages = (db, messages, limit) => {
const {Message, Folder, Label, Thread} = db;
const threadIds = _.uniq(messages.map((m) => m.threadId));
return Thread.findAll({
where: {id: threadIds},
include: [
{model: Folder},
{model: Label},
{
model: Message,
as: 'messages',
attributes: _.without(Object.keys(Message.attributes), 'body'),
include: [
{model: Folder},
],
},
],
limit: limit,
order: [['lastMessageReceivedDate', 'DESC']],
});
};
class GmailSearchClient {
constructor(accountToken) {
this.accountToken = accountToken;
constructor(account) {
const credentials = account.decryptedCredentials();
this.accountToken = account.bearerToken(credentials.xoauth2);
this.account = account;
this._logger = global.Logger.forAccount(this.account);
}
// Note that the Gmail API returns message IDs in hex format. So for
@ -53,7 +80,7 @@ class GmailSearchClient {
if (numTries >= maxTries) {
// If we've been through the loop 10 times, it means we got a request
// a crazy-high offset --- raise an error.
console.error('Too many results:', results.length);
this._logger.error('Too many results:', results.length);
reject(new Error('Too many results'));
return;
}
@ -112,39 +139,147 @@ class GmailSearchClient {
return [];
}
const {Message, Folder, Label, Thread} = db;
const {Message} = db;
const messages = await Message.findAll({
where: {gMsgId: {$in: messageIds}},
});
const threadIds = _.uniq(messages.map((m) => m.threadId));
const threads = await Thread.findAll({
where: {id: threadIds},
include: [
{model: Folder},
{model: Label},
{
model: Message,
as: 'messages',
attributes: _.without(Object.keys(Message.attributes), 'body'),
include: [
{model: Folder},
],
},
],
limit: limit,
order: [['lastMessageReceivedDate', 'DESC']],
const stringifiedThreads = getThreadsForMessages(db, messages, limit)
.then((threads) => `${JSON.stringify(threads)}\n`);
return Rx.Observable.fromPromise(stringifiedThreads);
}
}
class SearchFolder {
constructor(folder, criteria) {
this.folder = folder;
this.criteria = criteria;
}
description() {
return 'IMAP folder search';
}
run(db, imap) {
return imap.openBox(this.folder.name).then((box) => {
return box.search(this.criteria);
});
}
}
class ImapSearchClient {
constructor(account) {
this.account = account;
this._conn = null;
this._logger = global.Logger.forAccount(this.account);
}
async ensureConnection() {
if (this._conn) {
return await this._conn.connect();
}
const settings = this.account.connectionSettings;
const credentials = this.account.decryptedCredentials();
if (!settings || !settings.imap_host) {
throw new Error("ensureConnection: There are no IMAP connection settings for this account.");
}
if (!credentials) {
throw new Error("ensureConnection: There are no IMAP connection credentials for this account.");
}
const conn = new IMAPConnection({
db: this._db,
settings: Object.assign({}, settings, credentials),
logger: this._logger,
});
this._conn = conn;
return await this._conn.connect();
}
closeConnection() {
if (this._conn) {
this._conn.end();
}
}
async _search(db, query) {
await this.ensureConnection();
// We want to start the search with the 'inbox', 'sent' and 'archive'
// folders, if they exist.
const {Folder} = db;
let folders = await Folder.findAll({
where: {
accountId: this.account.id,
role: ['inbox', 'sent', 'archive'],
},
});
const accountFolders = await Folder.findAll({
where: {
accountId: this.account.id,
id: {$notIn: folders.map((f) => f.id)},
},
});
folders = folders.concat(accountFolders);
const criteria = [['TEXT', query]];
return Rx.Observable.create((observer) => {
const chain = folders.reduce((acc, folder) => {
return acc.then((uids) => {
if (uids.length > 0) {
observer.onNext(uids);
}
return this._searchFolder(folder, criteria);
});
}, Promise.resolve([]));
chain.then((uids) => {
if (uids.length > 0) {
observer.onNext(uids);
}
observer.onCompleted();
}).finally(() => this.closeConnection());
});
}
_searchFolder(folder, criteria) {
return this._conn.runOperation(new SearchFolder(folder, criteria)).catch((error) => {
this._logger.error(`Search error: ${error}`);
return Promise.resolve([]);
});
}
async searchThreads(db, query, limit) {
const {Message} = db;
return (await this._search(db, query)).flatMap((uids) => {
return Message.findAll({
where: {
accountId: this.account.id,
folderImapUID: uids,
},
});
}).flatMap((messages) => {
return getThreadsForMessages(db, messages, limit);
}).flatMap((threads) => {
if (threads.length > 0) {
return `${JSON.stringify(threads)}\n`;
}
return '\n';
});
return threads;
}
}
module.exports.searchClientForAccount = (account) => {
switch (account.provider) {
case 'gmail': {
const credentials = account.decryptedCredentials();
const accountToken = account.bearerToken(credentials.xoauth2);
return new GmailSearchClient(accountToken);
return new GmailSearchClient(account);
}
case 'imap': {
return new ImapSearchClient(account);
}
default: {
throw new Error(`Unsupported provider for search endpoint: ${account.provider}`);