Mailspring/internal_packages/thread-list/lib/thread-list-data-source.es6

92 lines
3.6 KiB
Text
Raw Normal View History

import Rx from 'rx-lite';
import {
ObservableListDataSource,
DatabaseStore,
Message,
QueryResultSet,
QuerySubscription,
} from 'nylas-exports';
const _observableForThreadMessages = (id, initialModels) => {
const subscription = new QuerySubscription(DatabaseStore.findAll(Message, {threadId: id}), {
initialModels: initialModels,
emitResultSet: true,
});
return Rx.Observable.fromNamedQuerySubscription(`message-${id}`, subscription);
};
const _flatMapJoiningMessages = ($threadsResultSet) => {
// DatabaseView leverages `QuerySubscription` for threads /and/ for the
// messages on each thread, which are passed to out as `thread.__messages`.
let $messagesResultSets = {};
// 2. when we receive a set of threads, we check to see if we have message
// observables for each thread. If threads have been added to the result set,
// we make a single database query and load /all/ the message metadata for
// the new threads at once. (This is a performance optimization -it's about
// ~80msec faster than making 100 queries for 100 new thread ids separately.)
return $threadsResultSet.flatMapLatest((threadsResultSet) => {
const missingIds = threadsResultSet.ids().filter(id => !$messagesResultSets[id]);
let promise = null;
if (missingIds.length === 0) {
promise = Promise.resolve([threadsResultSet, []]);
} else {
promise = DatabaseStore.findAll(Message, {threadId: missingIds}).then((messages) => {
return Promise.resolve([threadsResultSet, messages]);
});
}
return Rx.Observable.fromPromise(promise);
})
// 3. when that finishes, we group the loaded messsages by threadId and create
// the missing observables. Creating a query subscription would normally load
// an initial result set. To avoid that, we just hand new subscriptions the
// results we loaded in #2.
.flatMapLatest(([threadsResultSet, messagesForNewThreads]) => {
const messagesGrouped = {};
for (const message of messagesForNewThreads) {
if (messagesGrouped[message.threadId] == null) { messagesGrouped[message.threadId] = []; }
messagesGrouped[message.threadId].push(message);
}
const oldSets = $messagesResultSets;
$messagesResultSets = {};
const sets = threadsResultSet.ids().map(id => {
$messagesResultSets[id] = oldSets[id] || _observableForThreadMessages(id, messagesGrouped[id]);
return $messagesResultSets[id];
});
sets.unshift(Rx.Observable.from([threadsResultSet]));
// 4. We use `combineLatest` to merge the message observables into a single
// stream (like Promise.all). When /any/ of them emit a new result set, we
// trigger.
return Rx.Observable.combineLatest(sets);
})
.flatMapLatest(([threadsResultSet, ...messagesResultSets]) => {
const threadsWithMessages = {};
threadsResultSet.models().forEach((thread, idx) => {
const clone = new thread.constructor(thread);
clone.__messages = messagesResultSets[idx] ? messagesResultSets[idx].models() : [];
clone.__messages = clone.__messages.filter((m) => !m.isHidden())
threadsWithMessages[clone.id] = clone;
});
return Rx.Observable.from([
QueryResultSet.setByApplyingModels(threadsResultSet, threadsWithMessages),
]);
});
};
class ThreadListDataSource extends ObservableListDataSource {
constructor(subscription) {
let $resultSetObservable = Rx.Observable.fromNamedQuerySubscription('thread-list', subscription);
$resultSetObservable = _flatMapJoiningMessages($resultSetObservable);
super($resultSetObservable, subscription.replaceRange.bind(subscription));
}
}
export default ThreadListDataSource;