From 62fab52f7bf8ecb6632b91dc6d619f33ef4ee8ae Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Mon, 7 Dec 2015 16:52:46 -0800 Subject: [PATCH] feat(observables): Implementation of observables to replace some stores Summary: Add concept of "final" to Query, clean up internals Tiny bug fixes RxJs Observables! WIP Test Plan: Run tests Reviewers: evan, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D2319 --- .../events/lib/event-header.cjsx | 2 +- .../notifications/lib/activity-sidebar.cjsx | 1 - .../worker-sync/lib/nylas-sync-worker.coffee | 4 +- .../lib/refreshing-json-cache.coffee | 6 +- .../spec/nylas-sync-worker-spec.coffee | 4 +- package.json | 1 + spec/models/query-spec.coffee | 12 +- .../query-subscription-pool-spec.coffee | 36 +++ spec/models/query-subscription-spec.coffee | 256 ++++++++++++++++++ spec/stores/message-store-spec.coffee | 1 - spec/stores/task-queue-spec.coffee | 2 +- spec/stores/thread-counts-store-spec.coffee | 51 +--- src/flux/models/json-blob.coffee | 28 ++ .../models/query-subscription-pool.coffee | 55 ++++ src/flux/models/query-subscription.coffee | 210 ++++++++++++++ src/flux/models/query.coffee | 81 +++--- src/flux/stores/category-store.coffee | 74 ++--- src/flux/stores/contact-ranking-store.coffee | 27 +- .../database-setup-query-builder.coffee | 5 - src/flux/stores/database-store.coffee | 31 +-- src/flux/stores/focused-contacts-store.coffee | 28 +- .../stores/nylas-sync-status-store.coffee | 17 +- .../stores/task-queue-status-store.coffee | 10 +- src/flux/stores/task-queue.coffee | 9 +- src/flux/stores/thread-counts-store.coffee | 49 ++-- src/global/nylas-exports.coffee | 2 + src/global/nylas-observables.coffee | 70 +++++ 27 files changed, 823 insertions(+), 249 deletions(-) create mode 100644 spec/models/query-subscription-pool-spec.coffee create mode 100644 spec/models/query-subscription-spec.coffee create mode 100644 src/flux/models/json-blob.coffee create mode 100644 src/flux/models/query-subscription-pool.coffee create mode 100644 src/flux/models/query-subscription.coffee create mode 100644 src/global/nylas-observables.coffee diff --git a/internal_packages/events/lib/event-header.cjsx b/internal_packages/events/lib/event-header.cjsx index 3186a257a..7e76955e2 100644 --- a/internal_packages/events/lib/event-header.cjsx +++ b/internal_packages/events/lib/event-header.cjsx @@ -30,7 +30,7 @@ class EventHeader extends React.Component componentDidMount: => @_unlisten = DatabaseStore.listen (change) => - if change.objectClass is Event.name + if @state.event and change.objectClass is Event.name updated = _.find change.objects, (o) => o.id is @state.event.id @setState({event: updated}) if updated @_onChange() diff --git a/internal_packages/notifications/lib/activity-sidebar.cjsx b/internal_packages/notifications/lib/activity-sidebar.cjsx index ac2466d87..49b4274f3 100644 --- a/internal_packages/notifications/lib/activity-sidebar.cjsx +++ b/internal_packages/notifications/lib/activity-sidebar.cjsx @@ -31,7 +31,6 @@ class ActivitySidebar extends React.Component componentWillUnmount: => unlisten() for unlisten in @_unlisteners - @_workerUnlisten() if @_workerUnlisten render: => items = [@_renderNotificationActivityItems(), @_renderTaskActivityItems()] diff --git a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee index 9692445d6..8a39265b1 100644 --- a/internal_packages/worker-sync/lib/nylas-sync-worker.coffee +++ b/internal_packages/worker-sync/lib/nylas-sync-worker.coffee @@ -51,7 +51,7 @@ class NylasSyncWorker @_unlisten = Actions.retryInitialSync.listen(@_onRetryInitialSync, @) @_state = null - DatabaseStore.findJSONObject("NylasSyncWorker:#{@_account.id}").then (json) => + DatabaseStore.findJSONBlob("NylasSyncWorker:#{@_account.id}").then (json) => @_state = json ? {} for model, modelState of @_state modelState.busy = false @@ -209,7 +209,7 @@ class NylasSyncWorker writeState: -> @_writeState ?= _.debounce => - DatabaseStore.persistJSONObject("NylasSyncWorker:#{@_account.id}", @_state) + DatabaseStore.persistJSONBlob("NylasSyncWorker:#{@_account.id}", @_state) ,100 @_writeState() diff --git a/internal_packages/worker-sync/lib/refreshing-json-cache.coffee b/internal_packages/worker-sync/lib/refreshing-json-cache.coffee index 8930e4e51..2c800a5f6 100644 --- a/internal_packages/worker-sync/lib/refreshing-json-cache.coffee +++ b/internal_packages/worker-sync/lib/refreshing-json-cache.coffee @@ -12,7 +12,7 @@ class RefreshingJSONCache @end() # Look up existing data from db - DatabaseStore.findJSONObject(@key).then (json) => + DatabaseStore.findJSONBlob(@key).then (json) => # Refresh immediately if json is missing or version is outdated. Otherwise, # compute next refresh time and schedule @@ -24,7 +24,7 @@ class RefreshingJSONCache reset: -> # Clear db value, turn off any scheduled actions - DatabaseStore.persistJSONObject(@key, {}) + DatabaseStore.persistJSONBlob(@key, {}) @end() end: -> @@ -39,7 +39,7 @@ class RefreshingJSONCache # Call fetch data function, save it to the database @fetchData (newValue) => - DatabaseStore.persistJSONObject(@key, { + DatabaseStore.persistJSONBlob(@key, { version: @version time: Date.now() value: newValue diff --git a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee index 2bac6085e..567a27e06 100644 --- a/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee +++ b/internal_packages/worker-sync/spec/nylas-sync-worker-spec.coffee @@ -16,8 +16,8 @@ describe "NylasSyncWorker", -> getThreads: (account, params, requestOptions) => @apiRequests.push({account, model:'threads', params, requestOptions}) - spyOn(DatabaseStore, 'persistJSONObject').andReturn(Promise.resolve()) - spyOn(DatabaseStore, 'findJSONObject').andCallFake (key) => + spyOn(DatabaseStore, 'persistJSONBlob').andReturn(Promise.resolve()) + spyOn(DatabaseStore, 'findJSONBlob').andCallFake (key) => if key is "NylasSyncWorker:#{TEST_ACCOUNT_ID}" return Promise.resolve _.extend {}, { "contacts": diff --git a/package.json b/package.json index 49f82eb7b..f02c0019e 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ "request": "^2.53", "request-progress": "^0.3", "runas": "^3.1", + "rx-lite": "^4.0.7", "sanitize-html": "1.9.0", "scoped-property-store": "^0.16.2", "season": "^5.1", diff --git a/spec/models/query-spec.coffee b/spec/models/query-spec.coffee index e246dfe20..759c06374 100644 --- a/spec/models/query-spec.coffee +++ b/spec/models/query-spec.coffee @@ -95,11 +95,13 @@ describe "ModelQuery", -> @q.includeAll() expect(@q._includeJoinedData).toEqual([Message.attributes.body]) - describe "formatResult", -> + describe "response formatting", -> it "should always return a Number for counts", -> q = new ModelQuery(Message, @db) q.where({accountId: 'abcd'}).count() - expect(q.formatResult([{count:"12"}])).toBe(12) + + raw = [{count:"12"}] + expect(q.formatResultObjects(q.inflateResult(raw))).toBe(12) describe "sql", -> beforeEach -> @@ -109,6 +111,12 @@ describe "ModelQuery", -> scenario.builder(q) expect(q.sql().trim()).toBe(scenario.sql.trim()) + it "should finalize the query so no further changes can be made", -> + q = new ModelQuery(Account, @db) + spyOn(q, 'finalize') + q.sql() + expect(q.finalize).toHaveBeenCalled() + it "should correctly generate queries with multiple where clauses", -> @runScenario Account, builder: (q) -> q.where({emailAddress: 'ben@nylas.com'}).where({id: 2}) diff --git a/spec/models/query-subscription-pool-spec.coffee b/spec/models/query-subscription-pool-spec.coffee new file mode 100644 index 000000000..743e486ca --- /dev/null +++ b/spec/models/query-subscription-pool-spec.coffee @@ -0,0 +1,36 @@ +QuerySubscriptionPool = require '../../src/flux/models/query-subscription-pool' +DatabaseStore = require '../../src/flux/stores/database-store' +Label = require '../../src/flux/models/label' + +describe "QuerySubscriptionPool", -> + beforeEach -> + @query = DatabaseStore.findAll(Label) + QuerySubscriptionPool._subscriptions = [] + + describe "add", -> + it "should add a new subscription with the callback", -> + callback = jasmine.createSpy('callback') + QuerySubscriptionPool.add(@query, {}, callback) + expect(QuerySubscriptionPool._subscriptions.length).toBe(1) + subscription = QuerySubscriptionPool._subscriptions[0] + expect(subscription.hasCallback(callback)).toBe(true) + + it "should yield database changes to the subscription", -> + callback = jasmine.createSpy('callback') + QuerySubscriptionPool.add(@query, {}, callback) + subscription = QuerySubscriptionPool._subscriptions[0] + spyOn(subscription, 'applyChangeRecord') + + record = {objectType: 'whateves'} + QuerySubscriptionPool._onChange(record) + expect(subscription.applyChangeRecord).toHaveBeenCalledWith(record) + + describe "unsubscribe", -> + it "should return an unsubscribe method", -> + expect(QuerySubscriptionPool.add(@query, {}, -> ) instanceof Function).toBe(true) + + it "should remove the subscription", -> + unsub = QuerySubscriptionPool.add(@query, {}, -> ) + expect(QuerySubscriptionPool._subscriptions.length).toBe(1) + unsub() + expect(QuerySubscriptionPool._subscriptions.length).toBe(0) diff --git a/spec/models/query-subscription-spec.coffee b/spec/models/query-subscription-spec.coffee new file mode 100644 index 000000000..886da544d --- /dev/null +++ b/spec/models/query-subscription-spec.coffee @@ -0,0 +1,256 @@ +DatabaseStore = require '../../src/flux/stores/database-store' +QuerySubscription = require '../../src/flux/models/query-subscription' +Thread = require '../../src/flux/models/thread' +Label = require '../../src/flux/models/label' +Utils = require '../../src/flux/models/utils' + +describe "QuerySubscription", -> + describe "constructor", -> + it "should throw an error if the query is a count query", -> + query = DatabaseStore.findAll(Label).count() + expect( => new QuerySubscription(query)).toThrow() + + it "should throw an error if a query is not provided", -> + expect( => new QuerySubscription({})).toThrow() + + it "should fetch an initial result set", -> + spyOn(QuerySubscription.prototype, '_refetchResultSet') + sub = new QuerySubscription(DatabaseStore.findAll(Label)) + expect(QuerySubscription.prototype._refetchResultSet).toHaveBeenCalled() + + describe "applyChangeRecord", -> + spyOn(Utils, 'generateTempId').andCallFake => "" + + scenarios = [{ + name: "query with full set of objects (4)" + query: DatabaseStore.findAll(Thread) + .where(Thread.attributes.accountId.equal('a')) + .limit(4) + .offset(2) + lastResultSet: [ + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1), + ] + tests: [{ + name: 'Item saved which belongs in the set' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '5', lastMessageReceivedTimestamp: 3.5)] + type: 'persist' + newResultSet:[ + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4), + new Thread(accountId: 'a', id: '5', lastMessageReceivedTimestamp: 3.5), + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + ] + refetchRequired: false + },{ + name: 'Item saved which does not match query clauses' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'b', id: '5', lastMessageReceivedTimestamp: 5)] + type: 'persist' + newResultSet: 'unchanged' + refetchRequired: false + },{ + name: 'Item saved which does not lie in the range after sorting' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'b', id: '5', lastMessageReceivedTimestamp: -2)] + type: 'persist' + newResultSet: 'unchanged' + refetchRequired: false + },{ + name: 'Item in set saved' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4, subject: 'hello')] + type: 'persist' + newResultSet:[ + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4, subject: 'hello') + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1), + ] + refetchRequired: false + },{ + name: 'Item in set saved, sort order changed (within range only)' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5)] + type: 'persist' + newResultSet:[ + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5), + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1), + ] + refetchRequired: false + },{ + name: 'Item in set saved, sort order changed and sorted to edge of set (impacting last)' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 6)] + type: 'persist' + refetchRequired: true + },{ + name: 'Item in set saved, sort order changed and sorted to edge of set (impacting first)' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: -1)] + type: 'persist' + refetchRequired: true + },{ + name: 'Item in set saved, no longer matches query clauses' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'b', id: '4', lastMessageReceivedTimestamp: 4)] + type: 'persist' + newResultSet: [ + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1), + ] + refetchRequired: true + },{ + name: 'Item in set deleted' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '4')] + type: 'unpersist' + newResultSet: [ + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1), + ] + refetchRequired: true + },{ + name: 'Item not in set deleted' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '5')] + type: 'unpersist' + newResultSet: 'unchanged' + refetchRequired: false + }] + + },{ + name: "query with fewer than LIMIT objects" + query: DatabaseStore.findAll(Thread) + .where(Thread.attributes.accountId.equal('a')) + .limit(4) + .offset(2) + lastResultSet: [ + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2) + ] + tests: [{ + name: 'Item in set saved, no longer matches query clauses' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'b', id: '4', lastMessageReceivedTimestamp: 4)] + type: 'persist' + newResultSet: [ + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + ] + refetchRequired: false + },{ + name: 'Item in set deleted' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '4')] + type: 'unpersist' + newResultSet: [ + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3), + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2), + ] + refetchRequired: false + }] + },{ + name: "query with ASC sort order" + query: DatabaseStore.findAll(Thread) + .where(Thread.attributes.accountId.equal('a')) + .limit(4) + .offset(2) + .order(Thread.attributes.lastMessageReceivedTimestamp.ascending()) + lastResultSet: [ + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1) + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3) + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4) + ] + tests: [{ + name: 'Item in set saved, sort order changed' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5)] + type: 'persist' + newResultSet:[ + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5) + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2) + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4) + ] + refetchRequired: false + }] + + },{ + name: "query with multiple sort orders" + query: DatabaseStore.findAll(Thread) + .where(Thread.attributes.accountId.equal('a')) + .limit(4) + .offset(2) + .order([ + Thread.attributes.lastMessageReceivedTimestamp.ascending(), + Thread.attributes.unread.descending() + ]) + lastResultSet: [ + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1, unread: true) + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 1, unread: false) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: false) + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 2, unread: true) + ] + tests: [{ + name: 'Item in set saved, secondary sort order changed' + change: + objectClass: Thread.name + objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: true)] + type: 'persist' + newResultSet:[ + new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1, unread: true) + new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: true) + new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 1, unread: false) + new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 2, unread: true) + ] + refetchRequired: false + }] + }] + + jasmine.unspy(Utils, 'generateTempId') + + describe "scenarios", -> + scenarios.forEach (scenario) => + scenario.tests.forEach (test) => + it "with #{scenario.name}, should correctly apply #{test.name}", -> + @q = new QuerySubscription(scenario.query, -> ) + @q._lastResultSet = scenario.lastResultSet + spyOn(@q, '_invokeCallbacks') + spyOn(@q, '_refetchResultSet') + @q.applyChangeRecord(test.change) + + if test.newResultSet is 'unchanged' + expect(@q._invokeCallbacks).not.toHaveBeenCalled() + expect(@q._lastResultSet).toEqual(scenario.lastResultSet) + + else if test.newResultSet + expect(@q._invokeCallbacks).toHaveBeenCalled() + expect(@q._lastResultSet).toEqual(test.newResultSet) + + if test.refetchRequired + expect(@q._refetchResultSet).toHaveBeenCalled() + else + expect(@q._refetchResultSet).not.toHaveBeenCalled() diff --git a/spec/stores/message-store-spec.coffee b/spec/stores/message-store-spec.coffee index abca8f927..da7037618 100644 --- a/spec/stores/message-store-spec.coffee +++ b/spec/stores/message-store-spec.coffee @@ -72,7 +72,6 @@ describe "MessageStore", -> spyOn(DatabaseStore, 'findAll').andCallFake -> include: -> @ - waitForAnimations: -> @ where: -> @ then: (callback) -> callback([testMessage1, testMessage2]) diff --git a/spec/stores/task-queue-spec.coffee b/spec/stores/task-queue-spec.coffee index 73c7fc74d..d5b17c0bb 100644 --- a/spec/stores/task-queue-spec.coffee +++ b/spec/stores/task-queue-spec.coffee @@ -34,7 +34,7 @@ describe "TaskQueue", -> describe "restoreQueue", -> it "should fetch the queue from the database, reset flags and start processing", -> queue = [@processingTask, @unstartedTask] - spyOn(DatabaseStore, 'findJSONObject').andCallFake => Promise.resolve(queue) + spyOn(DatabaseStore, 'findJSONBlob').andCallFake => Promise.resolve(queue) spyOn(TaskQueue, '_processQueue') waitsForPromise => diff --git a/spec/stores/thread-counts-store-spec.coffee b/spec/stores/thread-counts-store-spec.coffee index 96a031e20..a8e6fc03b 100644 --- a/spec/stores/thread-counts-store-spec.coffee +++ b/spec/stores/thread-counts-store-spec.coffee @@ -52,58 +52,13 @@ describe "ThreadCountsStore", -> ThreadCountsStore._onCountsChanged(payload) expect(WindowBridge.runInWorkWindow).toHaveBeenCalledWith('ThreadCountsStore', '_onCountsChanged', [payload]) - describe "when a folder or label is persisted", -> - beforeEach -> - @lExisting = new Label(id: "l1", name: "inbox", displayName: "Inbox") - ThreadCountsStore._categories = [@lExisting] - - @lCreated = new Label(id: "lNew", displayName: "Hi there!") - @lUpdated = @lExisting.clone() - @lUpdated.displayName = "Inbox Edited" - - spyOn(ThreadCountsStore, '_fetchCountsMissing') - - describe "in the work window", -> - beforeEach -> - spyOn(NylasEnv, 'isWorkWindow').andReturn(true) - - it "should add or update it in it's local categories cache", -> - ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated]}) - expect(ThreadCountsStore._categories).toEqual([@lExisting, @lCreated]) - - ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lUpdated]}) - expect(ThreadCountsStore._categories).toEqual([@lUpdated, @lCreated]) - - ThreadCountsStore._categories = [] - - ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated, @lUpdated]}) - expect(ThreadCountsStore._categories).toEqual([@lCreated, @lUpdated]) - - it "should run _fetchCountsMissing", -> - ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lUpdated]}) - expect(ThreadCountsStore._fetchCountsMissing).toHaveBeenCalled() - - describe "in other windows", -> - beforeEach -> - spyOn(NylasEnv, 'isWorkWindow').andReturn(false) - - it "should do nothing", -> - ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated]}) - expect(ThreadCountsStore._categories).toEqual([@lExisting]) - expect(ThreadCountsStore._fetchCountsMissing).not.toHaveBeenCalled() - describe "when counts are persisted", -> it "should update it's _counts cache and trigger", -> newCounts = { 'abc': 1 } spyOn(ThreadCountsStore, 'trigger') - ThreadCountsStore._onDatabaseChanged({ - objectClass: 'JSONObject', - objects: [ - {key: ThreadCountsStore.JSONObjectKey, json: newCounts} - ] - }) + ThreadCountsStore._onCountsBlobRead(newCounts) expect(ThreadCountsStore._counts).toEqual(newCounts) expect(ThreadCountsStore.trigger).toHaveBeenCalled() @@ -218,9 +173,9 @@ describe "ThreadCountsStore", -> }) it "should persist the new counts to the database", -> - spyOn(DatabaseStore, 'persistJSONObject') + spyOn(DatabaseStore, 'persistJSONBlob') ThreadCountsStore._saveCounts() - expect(DatabaseStore.persistJSONObject).toHaveBeenCalledWith(ThreadCountsStore.JSONObjectKey, ThreadCountsStore._counts) + expect(DatabaseStore.persistJSONBlob).toHaveBeenCalledWith(ThreadCountsStore.JSONBlobKey, ThreadCountsStore._counts) describe "CategoryDatabaseMutationObserver", -> beforeEach -> diff --git a/src/flux/models/json-blob.coffee b/src/flux/models/json-blob.coffee new file mode 100644 index 000000000..12d3eee62 --- /dev/null +++ b/src/flux/models/json-blob.coffee @@ -0,0 +1,28 @@ +_ = require 'underscore' +Model = require './model' +Attributes = require '../attributes' + +class JSONBlob extends Model + @attributes: + 'id': Attributes.String + queryable: true + modelKey: 'id' + + 'clientId': Attributes.String + queryable: true + modelKey: 'clientId' + jsonKey: 'client_id' + + 'serverId': Attributes.ServerId + modelKey: 'serverId' + jsonKey: 'server_id' + + 'json': Attributes.Object + modelKey: 'json' + jsonKey: 'json' + + Object.defineProperty @prototype, "key", + get: -> @id + set: (val) -> @id = val + +module.exports = JSONBlob diff --git a/src/flux/models/query-subscription-pool.coffee b/src/flux/models/query-subscription-pool.coffee new file mode 100644 index 000000000..8a6d8eace --- /dev/null +++ b/src/flux/models/query-subscription-pool.coffee @@ -0,0 +1,55 @@ +_ = require 'underscore' +DatabaseChangeRecord = require '../stores/database-change-record' +QuerySubscription = require './query-subscription' + +### +Public: The QuerySubscriptionPool maintains a list of all of the query +subscriptions in the app. In the future, this class will monitor performance, +merge equivalent subscriptions, etc. +### +class QuerySubscriptionPool + constructor: -> + @_subscriptions = [] + + add: (query, options, callback) => + @_setup() if @_subscriptions.length is 0 + + callback._registrationPoint = @_formatRegistrationPoint((new Error).stack) + + subscription = new QuerySubscription(query, options) + subscription.addCallback(callback) + @_subscriptions.push(subscription) + + return => + subscription.removeCallback(callback) + @_subscriptions = _.without(@_subscriptions, subscription) + + printSubscriptions: => + @_subscriptions.forEach (sub) -> + console.log(sub._query.sql()) + console.group() + sub._callbacks.forEach (callback) -> + console.log("#{callback._registrationPoint}") + console.groupEnd() + + _formatRegistrationPoint: (stack) -> + stack = stack.split('\n') + ii = 0 + seenRx = false + while ii < stack.length + hasRx = stack[ii].indexOf('rx.lite') isnt -1 + seenRx ||= hasRx + break if seenRx is true and not hasRx + ii += 1 + + return stack[ii..(ii + 4)].join('\n') + + _setup: => + DatabaseStore = require '../stores/database-store' + DatabaseStore.listen @_onChange + + _onChange: (record) => + for subscription in @_subscriptions + subscription.applyChangeRecord(record) + +module.exports = new QuerySubscriptionPool() diff --git a/src/flux/models/query-subscription.coffee b/src/flux/models/query-subscription.coffee new file mode 100644 index 000000000..252e9dadc --- /dev/null +++ b/src/flux/models/query-subscription.coffee @@ -0,0 +1,210 @@ +_ = require 'underscore' +DatabaseChangeRecord = require '../stores/database-change-record' + +class QuerySubscription + constructor: (@_query, @_options) -> + ModelQuery = require './query' + + if not @_query or not (@_query instanceof ModelQuery) + throw new Error("QuerySubscription: Must be constructed with a ModelQuery. Got #{@_query}") + + if @_query._count + throw new Error("QuerySubscriptionPool::add - You cannot listen to count queries.") + + @_query.finalize() + @_limit = @_query.range().limit ? Infinity + @_offset = @_query.range().offset ? 0 + + @_callbacks = [] + @_version = 0 + @_versionFetchInProgress = false + @_lastResultSet = null + @_refetchResultSet() + + addCallback: (callback) => + unless callback instanceof Function + throw new Error("QuerySubscription:addCallback - expects a function, received #{callback}") + @_callbacks.push(callback) + + hasCallback: (callback) => + @_callbacks.indexOf(callback) isnt -1 + + removeCallback: (callback) => + unless callback instanceof Function + throw new Error("QuerySubscription:removeCallback - expects a function, received #{callback}") + @_callbacks = _.without(@_callbacks, callback) + + applyChangeRecord: (record) => + return unless record.objectClass is @_query.objectClass() + return unless record.objects.length > 0 + return @_invalidatePendingResultSet() unless @_lastResultSet + + @_lastResultSet = [].concat(@_lastResultSet) + + if record.type is 'unpersist' + status = @_optimisticallyRemoveModels(record.objects) + else if record.type is 'persist' + status = @_optimisticallyUpdateModels(record.objects) + else + throw new Error("QuerySubscription: Unknown change record type: #{record.type}") + + if status.setModified + @_invokeCallbacks() + if status.setFetchRequired + @_refetchResultSet() + + _refetchResultSet: => + @_version += 1 + + return if @_versionFetchInProgress + @_versionFetchInProgress = true + fetchVersion = @_version + + DatabaseStore = require '../stores/database-store' + DatabaseStore.run(@_query, {format: false}).then (result) => + @_versionFetchInProgress = false + if @_version is fetchVersion + @_lastResultSet = result + @_invokeCallbacks() + else + @_refetchResultSet() + + _invalidatePendingResultSet: => + @_version += 1 + + _resortResultSet: => + sortDescriptors = @_query.orderSortDescriptors() + @_lastResultSet.sort (a, b) -> + for descriptor in sortDescriptors + if descriptor.direction is 'ASC' + direction = 1 + else if descriptor.direction is 'DESC' + direction = -1 + else + throw new Error("QuerySubscription: Unknown sort order: #{descriptor.direction}") + aValue = a[descriptor.attr.modelKey] + bValue = b[descriptor.attr.modelKey] + return -1 * direction if aValue < bValue + return 1 * direction if aValue > bValue + return 0 + + _optimisticallyRemoveModels: (items) => + status = + setModified: false + setFetchRequired: false + + lastLength = @_lastResultSet.length + + for item in items + idx = _.findIndex @_lastResultSet, ({id}) -> id is item.id + if idx isnt -1 + @_lastResultSet.splice(idx, 1) + status.setModified = true + + # Removing items is an issue if we previosly had LIMIT items. This + # means there are likely more items to display in the place of the one + # we're removing and we need to re-fetch + if lastLength is @_limit + status.setFetchRequired = true + + status + + _optimisticallyUpdateModels: (items) => + status = + setModified: false + setFetchRequired: false + + sortNecessary = false + + # Pull attributes of the query + sortDescriptors = @_query.orderSortDescriptors() + + oldSetInfo = + length: @_lastResultSet.length + startItem: @_lastResultSet[0] + endItem: @_lastResultSet[@_limit - 1] + + for item in items + idx = _.findIndex @_lastResultSet, ({id}) -> id is item.id + itemIsInSet = idx isnt -1 + itemShouldBeInSet = item.matches(@_query.matchers()) + + if itemIsInSet and not itemShouldBeInSet + # remove the item + @_lastResultSet.splice(idx, 1) + status.setModified = true + + else if itemShouldBeInSet and not itemIsInSet + # insert the item, re-sort if a sort order is defined + if sortDescriptors.length > 0 + sortNecessary = true + @_lastResultSet.push(item) + status.setModified = true + + else if itemIsInSet + # update the item in the set, re-sort if a sort attribute's value has changed + if @_itemSortOrderHasChanged(@_lastResultSet[idx], item) + sortNecessary = true + @_lastResultSet[idx] = item + status.setModified = true + + if sortNecessary + @_resortResultSet() + + if sortNecessary and @_itemOnEdgeHasChanged(oldSetInfo) + status.setFetchRequired = true + + # If items have been added, truncate the result set to the requested length + if @_lastResultSet.length > @_limit + @_lastResultSet.length = @_limit + + hadMaxItems = oldSetInfo.length is @_limit + hasLostItems = @_lastResultSet.length < oldSetInfo.length + + if hadMaxItems and hasLostItems + # Ex: We asked for 20 items and had 20 items. Now we have 19 items. + # We need to pull a nw item to fill slot #20. + status.setFetchRequired = true + + status + + _itemOnEdgeHasChanged: (oldSetInfo) -> + hasPrecedingItems = @_offset > 0 + hasChangedStartItem = oldSetInfo.startItem isnt @_lastResultSet[0] + + if hasPrecedingItems and hasChangedStartItem + # We've changed the identity of the item at index zero. We have no way + # of knowing if it would still sort at this position, or if another item + # from earlier in the range should be at index zero. + # Full re-fetch is necessary. + return true + + hasTrailingItems = @_lastResultSet.length is @_limit + hasChangedEndItem = oldSetInfo.endItem isnt @_lastResultSet[@_limit - 1] + + if hasTrailingItems and hasChangedEndItem + # We've changed he last item in the set, and the set is at it's LIMIT length. + # We have no way of knowing if the item should still be at this position + # since we can't see the next item. + # Full re-fetch is necessary. + return true + + _itemSortOrderHasChanged: (old, updated) -> + for descriptor in @_query.orderSortDescriptors() + oldSortValue = old[descriptor.attr.modelKey] + updatedSortValue = updated[descriptor.attr.modelKey] + + # http://stackoverflow.com/questions/4587060/determining-date-equality-in-javascript + if not (oldSortValue >= updatedSortValue && oldSortValue <= updatedSortValue) + return true + + return false + + _invokeCallbacks: => + set = [].concat(@_lastResultSet) + resultForSet = @_query.formatResultObjects(set) + + @_callbacks.forEach (callback) => + callback(resultForSet) + +module.exports = QuerySubscription diff --git a/src/flux/models/query.coffee b/src/flux/models/query.coffee index 4ac2e944a..9e87c2434 100644 --- a/src/flux/models/query.coffee +++ b/src/flux/models/query.coffee @@ -43,8 +43,8 @@ class ModelQuery @_database || = require '../stores/database-store' @_matchers = [] @_orders = [] - @_singular = false - @_waitForAnimations = false + @_range = {} + @_returnOne = false @_includeJoinedData = [] @_count = false @ @@ -56,6 +56,8 @@ class ModelQuery # This method is chainable. # where: (matchers) -> + @_assertNotFinalized() + if matchers instanceof Matcher @_matchers.push(matchers) else if matchers instanceof Array @@ -78,6 +80,7 @@ class ModelQuery # This method is chainable. # include: (attr) -> + @_assertNotFinalized() if attr instanceof AttributeJoinedData is false throw new Error("query.include() must be called with a joined data attribute") @_includeJoinedData.push(attr) @@ -88,6 +91,7 @@ class ModelQuery # This method is chainable. # includeAll: -> + @_assertNotFinalized() for key, attr of @_klass.attributes @include(attr) if attr instanceof AttributeJoinedData @ @@ -99,6 +103,7 @@ class ModelQuery # This method is chainable. # order: (orders) -> + @_assertNotFinalized() orders = [orders] unless orders instanceof Array @_orders = @_orders.concat(orders) @ @@ -109,7 +114,8 @@ class ModelQuery # This method is chainable. # one: -> - @_singular = true + @_assertNotFinalized() + @_returnOne = true @ # Public: Limit the number of query results. @@ -119,8 +125,8 @@ class ModelQuery # This method is chainable. # limit: (limit) -> - throw new Error("Cannot use limit > 2 with one()") if @_singular and limit > 1 - @_range ?= {} + @_assertNotFinalized() + throw new Error("Cannot use limit > 2 with one()") if @_returnOne and limit > 1 @_range.limit = limit @ @@ -131,7 +137,7 @@ class ModelQuery # This method is chainable. # offset: (offset) -> - @_range ?= {} + @_assertNotFinalized() @_range.offset = offset @ @@ -141,20 +147,10 @@ class ModelQuery # This method is chainable. # count: -> + @_assertNotFinalized() @_count = true @ - ## - # Public: Set the `waitForAnimations` flag - instead of waiting for animations and other important user - # interactions to complete, the query result will be processed immediately. Use with care: forcing - # immediate evaluation can cause glitches in animations. - # - # This method is chainable. - # - waitForAnimations: -> - @_waitForAnimations = true - @ - ### Query Execution ### @@ -173,7 +169,7 @@ class ModelQuery run: -> @_database.run(@) - formatResult: (result) -> + inflateResult: (result) -> return null unless result if @_count @@ -189,15 +185,20 @@ class ModelQuery object[attr.modelKey] = value object catch jsonError - throw new Error("Query could not parse the database result. Query: #{@sql()}, Row Data: #{row[0]}, Error: #{jsonError.toString()}") - return objects[0] if @_singular + throw new Error("Query could not parse the database result. Query: #{@sql()}, Error: #{jsonError.toString()}") return objects + formatResultObjects: (objects) -> + return objects[0] if @_returnOne + return objects + # Query SQL Building # Returns a {String} with the SQL generated for the query. # sql: -> + @finalize() + if @_count result = "COUNT(*) as count" else @@ -206,19 +207,14 @@ class ModelQuery result += ", #{attr.selectSQL(@_klass)} " order = if @_count then "" else @_orderClause() - if @_singular - limit = "LIMIT 1" - else if @_range?.limit + if @_range.limit? limit = "LIMIT #{@_range.limit}" else limit = "" - if @_range?.offset + if @_range.offset? limit += " OFFSET #{@_range.offset}" "SELECT #{result} FROM `#{@_klass.name}` #{@_whereClause()} #{order} #{limit}" - executeOptions: -> - waitForAnimations: @_waitForAnimations - _whereClause: -> joins = [] @_matchers.forEach (c) => @@ -240,10 +236,6 @@ class ModelQuery sql _orderClause: -> - if @_orders.length == 0 - natural = @_klass.naturalSortOrder() - @_orders.push(natural) if natural - return "" unless @_orders.length sql = " ORDER BY " @@ -251,12 +243,39 @@ class ModelQuery sql += sort.orderBySQL(@_klass) sql + # Private: Marks the object as final, preventing any changes to the where + # clauses, orders, etc. + finalize: -> + if @_orders.length is 0 + natural = @_klass.naturalSortOrder() + @_orders.push(natural) if natural + if @_returnOne and not @_range.limit + @limit(1) + @_finalized = true + @ + + # Private: Throws an exception if the query has been frozen. + _assertNotFinalized: -> + if @_finalized + throw new Error("ModelQuery: You cannot modify a query after calling `then` or `listen`") + # Introspection # (These are here to make specs easy) + matchers: -> + @_matchers + matcherValueForModelKey: (key) -> matcher = _.find @_matchers, (m) -> m.attr.modelKey = key matcher?.val + range: -> + @_range + + orderSortDescriptors: -> + @_orders + + objectClass: -> + @_klass.name module.exports = ModelQuery diff --git a/src/flux/stores/category-store.coffee b/src/flux/stores/category-store.coffee index 8fba60b95..c80f6132a 100644 --- a/src/flux/stores/category-store.coffee +++ b/src/flux/stores/category-store.coffee @@ -5,6 +5,7 @@ NylasAPI = require '../nylas-api' NylasStore = require 'nylas-store' DatabaseStore = require './database-store' AccountStore = require './account-store' +Rx = require 'rx-lite' class CategoryStore extends NylasStore constructor: -> @@ -13,11 +14,8 @@ class CategoryStore extends NylasStore @_userCategories = [] @_hiddenCategories = [] - @listenTo DatabaseStore, @_onDBChanged - @listenTo AccountStore, @_refreshCacheFromDB - NylasEnv.config.observe 'core.workspace.showImportant', => @_refreshCacheFromDB() - - @_refreshCacheFromDB() + NylasEnv.config.observe 'core.workspace.showImportant', => @_buildQuerySubscription() + @_buildQuerySubscription() # We look for a few standard categories and display them in the Mailboxes # portion of the left sidebar. Note that these may not all be present on @@ -131,54 +129,38 @@ class CategoryStore extends NylasStore getUserCategories: -> @_userCategories - _onDBChanged: (change) -> - categoryClass = @categoryClass() - return unless categoryClass + _buildQuerySubscription: => + {Categories} = require 'nylas-observables' + @_queryUnlisten?.dispose() + @_queryUnlisten = Categories.forCurrentAccount().sort().subscribe(@_onCategoriesChanged) - if change and change.objectClass is categoryClass.name - @_refreshCacheFromDB() + _onCategoriesChanged: (categories) => + return unless categories - _refreshCacheFromDB: -> - categoryClass = @categoryClass() - account = AccountStore.current() - return unless categoryClass + @_categoryCache = {} + for category in categories + @_categoryCache[category.id] = category - DatabaseStore.findAll(categoryClass).where(categoryClass.attributes.accountId.equal(account.id)).then (categories=[]) => - categories = categories.sort (catA, catB) -> - nameA = catA.displayName - nameB = catB.displayName + # Compute user categories + @_userCategories = _.compact _.reject categories, (cat) => + cat.name in @StandardCategoryNames or cat.name in @HiddenCategoryNames - # Categories that begin with [, like [Mailbox]/For Later - # should appear at the bottom, because they're likely autogenerated. - nameA = "ZZZ"+nameA if nameA[0] is '[' - nameB = "ZZZ"+nameB if nameB[0] is '[' + # Compute hidden categories + @_hiddenCategories = _.filter categories, (cat) => + cat.name in @HiddenCategoryNames - nameA.localeCompare(nameB) + # Compute standard categories + # Single pass to create lookup table, single pass to get ordered array + byStandardName = {} + for key, val of @_categoryCache + byStandardName[val.name] = val - @_categoryCache = {} - for category in categories - @_categoryCache[category.id] = category + if not NylasEnv.config.get('core.workspace.showImportant') + delete byStandardName['important'] - # Compute user categories - @_userCategories = _.compact _.reject categories, (cat) => - cat.name in @StandardCategoryNames or cat.name in @HiddenCategoryNames + @_standardCategories = _.compact @StandardCategoryNames.map (name) => + byStandardName[name] - # Compute hidden categories - @_hiddenCategories = _.filter categories, (cat) => - cat.name in @HiddenCategoryNames - - # Compute standard categories - # Single pass to create lookup table, single pass to get ordered array - byStandardName = {} - for key, val of @_categoryCache - byStandardName[val.name] = val - - if not NylasEnv.config.get('core.workspace.showImportant') - delete byStandardName['important'] - - @_standardCategories = _.compact @StandardCategoryNames.map (name) => - byStandardName[name] - - @trigger() + @trigger() module.exports = new CategoryStore() diff --git a/src/flux/stores/contact-ranking-store.coffee b/src/flux/stores/contact-ranking-store.coffee index f7079e677..29e14cb5e 100644 --- a/src/flux/stores/contact-ranking-store.coffee +++ b/src/flux/stores/contact-ranking-store.coffee @@ -1,4 +1,4 @@ - +Rx = require 'rx-lite' NylasStore = require 'nylas-store' DatabaseStore = require './database-store' AccountStore = require './account-store' @@ -6,34 +6,21 @@ AccountStore = require './account-store' class ContactRankingStore extends NylasStore constructor: -> - @listenTo DatabaseStore, @_onDatabaseChanged - @listenTo AccountStore, @_onAccountChanged @_value = null @_accountId = null - @_refresh() - _onDatabaseChanged: (change) => - if change.objectClass is 'JSONObject' and change.objects[0].key is "ContactRankingsFor#{@_accountId}" - @_value = change.objects[0].json.value + {Accounts} = require 'nylas-observables' + Accounts.forCurrentId().flatMapLatest (accountId) => + query = DatabaseStore.findJSONBlob("ContactRankingsFor#{accountId}") + return Rx.Observable.fromQuery(query) + .subscribe (json) => + @_value = if json? then json.value else null @trigger() - _onAccountChanged: => - @_refresh() - @reset() - @trigger() - value: -> @_value reset: -> @_value = null - _refresh: => - return if @_accountId is AccountStore.current()?.id - @_accountId = AccountStore.current()?.id - DatabaseStore.findJSONObject("ContactRankingsFor#{@_accountId}").then (json) => - @_value = if json? then json.value else null - @trigger() - - module.exports = new ContactRankingStore() diff --git a/src/flux/stores/database-setup-query-builder.coffee b/src/flux/stores/database-setup-query-builder.coffee index 164a37400..b619e27cd 100644 --- a/src/flux/stores/database-setup-query-builder.coffee +++ b/src/flux/stores/database-setup-query-builder.coffee @@ -12,11 +12,6 @@ class DatabaseSetupQueryBuilder setupQueries: -> queries = [] - - # Add table for storing generic JSON blobs - queries.push("CREATE TABLE IF NOT EXISTS `JSONObject` (key TEXT PRIMARY KEY, data BLOB)") - queries.push("CREATE UNIQUE INDEX IF NOT EXISTS `JSONObject_id` ON `JSONObject` (`key`)") - for key, klass of DatabaseObjectRegistry.classMap() queries = queries.concat @setupQueriesForTable(klass) return queries diff --git a/src/flux/stores/database-store.coffee b/src/flux/stores/database-store.coffee index 0995b19da..7b897aa07 100644 --- a/src/flux/stores/database-store.coffee +++ b/src/flux/stores/database-store.coffee @@ -34,6 +34,10 @@ DEBUG_MISSING_ACCOUNT_ID = false BEGIN_TRANSACTION = 'BEGIN TRANSACTION' COMMIT = 'COMMIT' +class JSONBlobQuery extends ModelQuery + formatResultObjects: (objects) => + return objects[0]?.json || null + ### Public: N1 is built on top of a custom database layer modeled after ActiveRecord. For many parts of the application, the database is the source @@ -395,14 +399,12 @@ class DatabaseStore extends NylasStore # # Returns a {Promise} that # - resolves with the result of the database query. - run: (modelQuery) => - {waitForAnimations} = modelQuery.executeOptions() + # + run: (modelQuery, options = {format: true}) => @_query(modelQuery.sql(), []).then (result) => - if waitForAnimations - PriorityUICoordinator.settle.then => - Promise.resolve(modelQuery.formatResult(result)) - else - Promise.resolve(modelQuery.formatResult(result)) + result = modelQuery.inflateResult(result) + result = modelQuery.formatResultObjects(result) unless options.format is false + Promise.resolve(result) # Public: Asynchronously writes `model` to the cache and triggers a change event. # @@ -485,16 +487,13 @@ class DatabaseStore extends NylasStore @_runMutationHooks('afterDatabaseChange', metadata, data) @_accumulateAndTrigger(metadata) - persistJSONObject: (key, json) -> - jsonString = serializeRegisteredObjects(json) - @_query("REPLACE INTO `JSONObject` (`key`,`data`) VALUES (?,?)", [key, jsonString]).then => - @trigger(new DatabaseChangeRecord({objectClass: 'JSONObject', objects: [{key: key, json: json}], type: 'persist'})) + persistJSONBlob: (id, json) -> + JSONBlob = require '../models/json-blob' + @persistModel(new JSONBlob({id, json})) - findJSONObject: (key) -> - @_query("SELECT `data` FROM `JSONObject` WHERE key = ? LIMIT 1", [key]).then (results) => - return Promise.resolve(null) unless results[0] - data = deserializeRegisteredObjects(results[0].data) - Promise.resolve(data) + findJSONBlob: (id) -> + JSONBlob = require '../models/json-blob' + new JSONBlobQuery(JSONBlob, @).where({id}).one() # Private: Mutation hooks allow you to observe changes to the database and # add additional functionality before and after the REPLACE / INSERT queries. diff --git a/src/flux/stores/focused-contacts-store.coffee b/src/flux/stores/focused-contacts-store.coffee index 6e2e32a38..eae34ae9a 100644 --- a/src/flux/stores/focused-contacts-store.coffee +++ b/src/flux/stores/focused-contacts-store.coffee @@ -1,4 +1,5 @@ _ = require 'underscore' +Rx = require 'rx-lite' Utils = require '../models/utils' Actions = require '../actions' @@ -12,7 +13,6 @@ FocusedContentStore = require './focused-content-store' # A store that handles the focuses collections of and individual contacts class FocusedContactsStore extends NylasStore constructor: -> - @listenTo DatabaseStore, @_onDatabaseChanged @listenTo MessageStore, @_onMessageStoreChanged @listenTo Actions.focusContact, @_onFocusContact @_clearCurrentParticipants() @@ -24,14 +24,6 @@ class FocusedContactsStore extends NylasStore # We need to wait now for the MessageStore to grab all of the # appropriate messages for the given thread. - _onDatabaseChanged: (change) => - return unless @_currentFocusedContact - return unless change and change.objectClass is 'contact' - current = _.find change.objects, (c) => c.email is @_currentFocusedContact.email - if current - @_currentFocusedContact = current - @trigger() - _onMessageStoreChanged: => threadId = if MessageStore.itemsLoading() then null else MessageStore.threadId() @@ -61,20 +53,26 @@ class FocusedContactsStore extends NylasStore _clearCurrentParticipants: -> @_contactScores = {} @_currentContacts = [] + @_unsubFocusedContact?.dispose() + @_unsubFocusedContact = null @_currentFocusedContact = null @_currentThread = null _onFocusContact: (contact) => - if not contact - @_currentFocusedContact = null - @trigger() - else - DatabaseStore.findBy(Contact, { + @_unsubFocusedContact?.dispose() + @_unsubFocusedContact = null + + if contact + query = DatabaseStore.findBy(Contact, { email: contact.email, accountId: @_currentThread.accountId - }).then (match) => + }) + @_unsubFocusedContact = Rx.Observable.fromQuery(query).subscribe (match) => @_currentFocusedContact = match ? contact @trigger() + else + @_currentFocusedContact = null + @trigger() # We score everyone to determine who's the most relevant to display in # the sidebar. diff --git a/src/flux/stores/nylas-sync-status-store.coffee b/src/flux/stores/nylas-sync-status-store.coffee index f4cfa2f3c..51fbc037b 100644 --- a/src/flux/stores/nylas-sync-status-store.coffee +++ b/src/flux/stores/nylas-sync-status-store.coffee @@ -1,4 +1,5 @@ _ = require 'underscore' +Rx = require 'rx-lite' AccountStore = require './account-store' DatabaseStore = require './database-store' NylasStore = require 'nylas-store' @@ -7,25 +8,17 @@ class NylasSyncStatusStore extends NylasStore constructor: -> @_statesByAccount = {} + @_subscriptions = {} @listenTo AccountStore, @_onAccountsChanged - @listenTo DatabaseStore, @_onChange @_onAccountsChanged() _onAccountsChanged: => - promises = [] AccountStore.items().forEach (item) => - return if @_statesByAccount[item.id] - promises.push DatabaseStore.findJSONObject("NylasSyncWorker:#{item.id}").then (json) => + query = DatabaseStore.findJSONBlob("NylasSyncWorker:#{item.id}") + @_subscriptions[item.id] ?= Rx.Observable.fromQuery(query).subscribe (json) => @_statesByAccount[item.id] = json ? {} - Promise.all(promises).then => - @trigger() - - _onChange: (change) => - if change.objectClass is 'JSONObject' and change.objects[0].key.indexOf('NylasSyncWorker') is 0 - [worker, accountId] = change.objects[0].key.split(':') - @_statesByAccount[accountId] = change.objects[0].json - @trigger() + @trigger() state: => @_statesByAccount diff --git a/src/flux/stores/task-queue-status-store.coffee b/src/flux/stores/task-queue-status-store.coffee index efb167714..a76b53e4b 100644 --- a/src/flux/stores/task-queue-status-store.coffee +++ b/src/flux/stores/task-queue-status-store.coffee @@ -1,4 +1,5 @@ _ = require 'underscore' +Rx = require 'rx-lite' NylasStore = require 'nylas-store' DatabaseStore = require './database-store' AccountStore = require './account-store' @@ -13,15 +14,10 @@ class TaskQueueStatusStore extends NylasStore @_queue = [] @_waitingLocals = [] @_waitingRemotes = [] - @listenTo DatabaseStore, @_onChange - DatabaseStore.findJSONObject(TaskQueue.JSONObjectStorageKey).then (json) => + query = DatabaseStore.findJSONBlob(TaskQueue.JSONBlobStorageKey) + Rx.Observable.fromQuery(query).subscribe (json) => @_queue = json || [] - @trigger() - - _onChange: (change) => - if change.objectClass is 'JSONObject' and change.objects[0].key is 'task-queue' - @_queue = change.objects[0].json @_waitingLocals = @_waitingLocals.filter ({taskId, resolve}) => task = _.findWhere(@_queue, {id: taskId}) if not task or task.queueState.localComplete diff --git a/src/flux/stores/task-queue.coffee b/src/flux/stores/task-queue.coffee index ac2be222b..3eb5d6a5c 100644 --- a/src/flux/stores/task-queue.coffee +++ b/src/flux/stores/task-queue.coffee @@ -14,10 +14,10 @@ DatabaseStore = require './database-store' {APIError, TimeoutError} = require '../errors' -JSONObjectStorageKey = 'task-queue' +JSONBlobStorageKey = 'task-queue' if not NylasEnv.isWorkWindow() and not NylasEnv.inSpecMode() - module.exports = {JSONObjectStorageKey} + module.exports = {JSONBlobStorageKey} return ### @@ -262,7 +262,7 @@ class TaskQueue return _.findWhere(@_queue, id: taskOrId) _restoreQueue: => - DatabaseStore.findJSONObject(JSONObjectStorageKey).then (queue = []) => + DatabaseStore.findJSONBlob(JSONBlobStorageKey).then (queue = []) => # We need to set the processing bit back to false so it gets # re-retried upon inflation for task in queue @@ -273,7 +273,7 @@ class TaskQueue _updateSoon: => @_updateSoonThrottled ?= _.throttle => - DatabaseStore.persistJSONObject(JSONObjectStorageKey, @_queue ? []) + DatabaseStore.persistJSONBlob(JSONBlobStorageKey, @_queue ? []) _.defer => @_processQueue() @trigger() @@ -281,3 +281,4 @@ class TaskQueue @_updateSoonThrottled() module.exports = new TaskQueue() +module.exports.JSONBlobStorageKey = JSONBlobStorageKey diff --git a/src/flux/stores/thread-counts-store.coffee b/src/flux/stores/thread-counts-store.coffee index f6879728a..b66c1a6d0 100644 --- a/src/flux/stores/thread-counts-store.coffee +++ b/src/flux/stores/thread-counts-store.coffee @@ -1,4 +1,5 @@ Reflux = require 'reflux' +Rx = require 'rx-lite' _ = require 'underscore' NylasStore = require 'nylas-store' CategoryStore = require './category-store' @@ -10,7 +11,7 @@ Folder = require '../models/folder' Label = require '../models/label' WindowBridge = require '../../window-bridge' -JSONObjectKey = 'UnreadCounts-V2' +JSONBlobKey = 'UnreadCounts-V2' class CategoryDatabaseMutationObserver constructor: (@_countsDidChange) -> @@ -55,26 +56,29 @@ class CategoryDatabaseMutationObserver class ThreadCountsStore extends NylasStore CategoryDatabaseMutationObserver: CategoryDatabaseMutationObserver - JSONObjectKey: JSONObjectKey + JSONBlobKey: JSONBlobKey constructor: -> @_counts = {} @_deltas = {} - @_categories = [] @_saveCountsSoon ?= _.throttle(@_saveCounts, 1000) - @listenTo DatabaseStore, @_onDatabaseChanged - DatabaseStore.findJSONObject(JSONObjectKey).then (json) => - @_counts = json ? {} - @trigger() - @_observer = new CategoryDatabaseMutationObserver(@_onCountsChanged) DatabaseStore.addMutationHook(@_observer) if NylasEnv.isWorkWindow() - @_loadCategories().then => + DatabaseStore.findJSONBlob(JSONBlobKey).then(@_onCountsBlobRead) + Rx.Observable.combineLatest( + Rx.Observable.fromQuery(DatabaseStore.findAll(Label)), + Rx.Observable.fromQuery(DatabaseStore.findAll(Folder)) + ).subscribe ([labels, folders]) => + @_categories = [].concat(labels, folders) @_fetchCountsMissing() + else + query = DatabaseStore.findJSONBlob(JSONBlobKey) + Rx.Observable.fromQuery(query).subscribe(@_onCountsBlobRead) + unreadCountForCategoryId: (catId) => return null if @_counts[catId] is undefined @_counts[catId] + (@_deltas[catId] || 0) @@ -82,21 +86,6 @@ class ThreadCountsStore extends NylasStore unreadCounts: => @_counts - _onDatabaseChanged: (change) => - if NylasEnv.isWorkWindow() - if change.objectClass in [Folder.name, Label.name] - for obj in change.objects - objIdx = _.findIndex @_categories, (cat) -> cat.id is obj.id - if objIdx isnt -1 - @_categories[objIdx] = obj - else - @_categories.push(obj) - @_fetchCountsMissing() - - else if change.objectClass is 'JSONObject' and change.objects[0].key is JSONObjectKey - @_counts = change.objects[0].json ? {} - @trigger() - _onCountsChanged: (metadata) => if not NylasEnv.isWorkWindow() WindowBridge.runInWorkWindow("ThreadCountsStore", "_onCountsChanged", [metadata]) @@ -107,13 +96,9 @@ class ThreadCountsStore extends NylasStore @_deltas[catId] += unread @_saveCountsSoon() - _loadCategories: => - Promise.props({ - folders: DatabaseStore.findAll(Folder) - labels: DatabaseStore.findAll(Label) - }).then ({folders, labels}) => - @_categories = [].concat(folders, labels) - Promise.resolve() + _onCountsBlobRead: (json) => + @_counts = json ? {} + @trigger() # Fetch a count, populate it in the cache, and then call ourselves to # populate the next missing count. @@ -147,7 +132,7 @@ class ThreadCountsStore extends NylasStore @_counts[key] += count delete @_deltas[key] - DatabaseStore.persistJSONObject(JSONObjectKey, @_counts) + DatabaseStore.persistJSONBlob(JSONBlobKey, @_counts) @trigger() _fetchCountForCategory: (cat) => diff --git a/src/global/nylas-exports.coffee b/src/global/nylas-exports.coffee index 4572c9d2b..e64fc79aa 100644 --- a/src/global/nylas-exports.coffee +++ b/src/global/nylas-exports.coffee @@ -55,6 +55,7 @@ class NylasExports @load "SearchView", 'flux/stores/search-view' @load "DatabaseView", 'flux/stores/database-view' @load "DatabaseStore", 'flux/stores/database-store' + @load "QuerySubscriptionPool", 'flux/models/query-subscription-pool' # Database Objects # These need to be required immeidatley to populated the @@ -70,6 +71,7 @@ class NylasExports @require "Category", 'flux/models/category' @require "Calendar", 'flux/models/calendar' @require "Metadata", 'flux/models/metadata' + @require "JSONBlob", 'flux/models/json-blob' @require "DatabaseObjectRegistry", "database-object-registry" @require "MailViewFilter", 'mail-view-filter' diff --git a/src/global/nylas-observables.coffee b/src/global/nylas-observables.coffee new file mode 100644 index 000000000..4eb11337a --- /dev/null +++ b/src/global/nylas-observables.coffee @@ -0,0 +1,70 @@ +Rx = require 'rx-lite' +_ = require 'underscore' +QuerySubscriptionPool = require '../flux/models/query-subscription-pool' +AccountStore = require '../flux/stores/account-store' +DatabaseStore = require '../flux/stores/database-store' + +AccountOperators = {} + +AccountObservables = + forCurrentId: -> + observable = Rx.Observable + .fromStore(AccountStore) + .map -> AccountStore.current()?.id + .distinctUntilChanged() + _.extend(observable, AccountOperators) + observable + +CategoryOperators = + sort: -> + @.map (categories) -> + return categories.sort (catA, catB) -> + nameA = catA.displayName + nameB = catB.displayName + + # Categories that begin with [, like [Mailbox]/For Later + # should appear at the bottom, because they're likely autogenerated. + nameA = "ZZZ"+nameA if nameA[0] is '[' + nameB = "ZZZ"+nameB if nameB[0] is '[' + + nameA.localeCompare(nameB) + +CategoryObservables = + forCurrentAccount: -> + observable = Rx.Observable.fromStore(AccountStore).flatMapLatest -> + return CategoryObservables.forAccount(AccountStore.current()) + _.extend(observable, CategoryOperators) + observable + + forAllAccounts: => + observable = Rx.Observable.fromQuery(DatabaseStore.findAll(categoryClass)) + _.extend(observable, CategoryOperators) + observable + + forAccount: (account) => + if account + categoryClass = account.categoryClass() + observable = Rx.Observable.fromQuery(DatabaseStore.findAll(categoryClass).where(categoryClass.attributes.accountId.equal(account.id))) + else + observable = Rx.Observable.from([]) + _.extend(observable, CategoryOperators) + observable + +module.exports = + Categories: CategoryObservables + Accounts: AccountObservables + +# Attach a few global helpers + +Rx.Observable.fromStore = (store) => + return Rx.Observable.create (observer) => + unsubscribe = store.listen => + observer.onNext(store) + observer.onNext(store) + return Rx.Disposable.create(unsubscribe) + +Rx.Observable.fromQuery = (query, options) => + return Rx.Observable.create (observer) => + unsubscribe = QuerySubscriptionPool.add query, options, (result) => + observer.onNext(result) + return Rx.Disposable.create(unsubscribe)