Mailspring/app/internal_packages/thread-list/lib/thread-list-data-source.es6

98 lines
3.8 KiB
Plaintext
Raw Normal View History

import {
Rx,
ObservableListDataSource,
DatabaseStore,
Message,
QueryResultSet,
QuerySubscription,
} from 'mailspring-exports';
const _observableForThreadMessages = (id, initialModels) => {
2017-09-27 02:33:08 +08:00
const subscription = new QuerySubscription(DatabaseStore.findAll(Message, { threadId: id }), {
initialModels: initialModels,
emitResultSet: true,
});
return Rx.Observable.fromNamedQuerySubscription(`message-${id}`, subscription);
};
2017-09-27 02:33:08 +08:00
const _flatMapJoiningMessages = $threadsResultSet => {
// DatabaseView leverages `QuerySubscription` for threads /and/ for the
// messages on each thread, which are passed to out as `thread.__messages`.
let $messagesResultSets = {};
// 2. when we receive a set of threads, we check to see if we have message
// observables for each thread. If threads have been added to the result set,
// we make a single database query and load /all/ the message metadata for
// the new threads at once. (This is a performance optimization -it's about
// ~80msec faster than making 100 queries for 100 new thread ids separately.)
2017-09-27 02:33:08 +08:00
return (
$threadsResultSet
.flatMapLatest(threadsResultSet => {
const missingIds = threadsResultSet.ids().filter(id => !$messagesResultSets[id]);
let promise = null;
if (missingIds.length === 0) {
promise = Promise.resolve([threadsResultSet, []]);
} else {
promise = DatabaseStore.findAll(Message, { threadId: missingIds }).then(messages => {
return Promise.resolve([threadsResultSet, messages]);
});
}
return Rx.Observable.fromPromise(promise);
})
// 3. when that finishes, we group the loaded messsages by threadId and create
// the missing observables. Creating a query subscription would normally load
// an initial result set. To avoid that, we just hand new subscriptions the
// results we loaded in #2.
.flatMapLatest(([threadsResultSet, messagesForNewThreads]) => {
const messagesGrouped = {};
for (const message of messagesForNewThreads) {
if (messagesGrouped[message.threadId] == null) {
messagesGrouped[message.threadId] = [];
}
messagesGrouped[message.threadId].push(message);
}
2017-09-27 02:33:08 +08:00
const oldSets = $messagesResultSets;
$messagesResultSets = {};
2017-09-27 02:33:08 +08:00
const sets = threadsResultSet.ids().map(id => {
$messagesResultSets[id] =
oldSets[id] || _observableForThreadMessages(id, messagesGrouped[id]);
return $messagesResultSets[id];
});
sets.unshift(Rx.Observable.from([threadsResultSet]));
2017-09-27 02:33:08 +08:00
// 4. We use `combineLatest` to merge the message observables into a single
// stream (like Promise.all). When /any/ of them emit a new result set, we
// trigger.
return Rx.Observable.combineLatest(sets);
})
.flatMapLatest(([threadsResultSet, ...messagesResultSets]) => {
const threadsWithMessages = {};
threadsResultSet.models().forEach((thread, idx) => {
const clone = new thread.constructor(thread);
clone.__messages = messagesResultSets[idx] ? messagesResultSets[idx].models() : [];
clone.__messages = clone.__messages.filter(m => !m.isHidden());
threadsWithMessages[clone.id] = clone;
});
2017-09-27 02:33:08 +08:00
return Rx.Observable.from([
QueryResultSet.setByApplyingModels(threadsResultSet, threadsWithMessages),
]);
})
);
};
class ThreadListDataSource extends ObservableListDataSource {
constructor(subscription) {
2017-09-27 02:33:08 +08:00
let $resultSetObservable = Rx.Observable.fromNamedQuerySubscription(
'thread-list',
subscription
);
$resultSetObservable = _flatMapJoiningMessages($resultSetObservable);
super($resultSetObservable, subscription.replaceRange.bind(subscription));
}
}
export default ThreadListDataSource;