import {
  Rx,
  ObservableListDataSource,
  DatabaseStore,
  Message,
  QueryResultSet,
  QuerySubscription,
} from 'nylas-exports';

const _observableForThreadMessages = (id, initialModels) => {
  const subscription = new QuerySubscription(DatabaseStore.findAll(Message, {threadId: id}), {
    initialModels: initialModels,
    emitResultSet: true,
  });
  return Rx.Observable.fromNamedQuerySubscription(`message-${id}`, subscription);
};

const _flatMapJoiningMessages = ($threadsResultSet) => {
  // DatabaseView leverages `QuerySubscription` for threads /and/ for the
  // messages on each thread, which are passed to out as `thread.__messages`.
  let $messagesResultSets = {};

  // 2. when we receive a set of threads, we check to see if we have message
  //    observables for each thread. If threads have been added to the result set,
  //    we make a single database query and load /all/ the message metadata for
  //    the new threads at once. (This is a performance optimization -it's about
  //    ~80msec faster than making 100 queries for 100 new thread ids separately.)
  return $threadsResultSet.flatMapLatest((threadsResultSet) => {
    const missingIds = threadsResultSet.ids().filter(id => !$messagesResultSets[id]);
    let promise = null;
    if (missingIds.length === 0) {
      promise = Promise.resolve([threadsResultSet, []]);
    } else {
      promise = DatabaseStore.findAll(Message, {threadId: missingIds}).then((messages) => {
        return Promise.resolve([threadsResultSet, messages]);
      });
    }
    return Rx.Observable.fromPromise(promise);
  })

  // 3. when that finishes, we group the loaded messsages by threadId and create
  //    the missing observables. Creating a query subscription would normally load
  //    an initial result set. To avoid that, we just hand new subscriptions the
  //    results we loaded in #2.
  .flatMapLatest(([threadsResultSet, messagesForNewThreads]) => {
    const messagesGrouped = {};
    for (const message of messagesForNewThreads) {
      if (messagesGrouped[message.threadId] == null) { messagesGrouped[message.threadId] = []; }
      messagesGrouped[message.threadId].push(message);
    }

    const oldSets = $messagesResultSets;
    $messagesResultSets = {};

    const sets = threadsResultSet.ids().map(id => {
      $messagesResultSets[id] = oldSets[id] || _observableForThreadMessages(id, messagesGrouped[id]);
      return $messagesResultSets[id];
    });
    sets.unshift(Rx.Observable.from([threadsResultSet]));

    // 4. We use `combineLatest` to merge the message observables into a single
    //    stream (like Promise.all).  When /any/ of them emit a new result set, we
    //    trigger.
    return Rx.Observable.combineLatest(sets);
  })

  .flatMapLatest(([threadsResultSet, ...messagesResultSets]) => {
    const threadsWithMessages = {};
    threadsResultSet.models().forEach((thread, idx) => {
      const clone = new thread.constructor(thread);
      clone.__messages = messagesResultSets[idx] ? messagesResultSets[idx].models() : [];
      clone.__messages = clone.__messages.filter((m) => !m.isHidden())
      threadsWithMessages[clone.id] = clone;
    });

    return Rx.Observable.from([
      QueryResultSet.setByApplyingModels(threadsResultSet, threadsWithMessages),
    ]);
  });
};


class ThreadListDataSource extends ObservableListDataSource {
  constructor(subscription) {
    let $resultSetObservable = Rx.Observable.fromNamedQuerySubscription('thread-list', subscription);
    $resultSetObservable = _flatMapJoiningMessages($resultSetObservable);
    super($resultSetObservable, subscription.replaceRange.bind(subscription));
  }
}

export default ThreadListDataSource;