mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-03-01 02:25:45 +08:00
feat(observables): Implementation of observables to replace some stores
Summary: Add concept of "final" to Query, clean up internals Tiny bug fixes RxJs Observables! WIP Test Plan: Run tests Reviewers: evan, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D2319
This commit is contained in:
parent
58c853ef76
commit
30c58e90a6
27 changed files with 823 additions and 249 deletions
|
@ -30,7 +30,7 @@ class EventHeader extends React.Component
|
|||
|
||||
componentDidMount: =>
|
||||
@_unlisten = DatabaseStore.listen (change) =>
|
||||
if change.objectClass is Event.name
|
||||
if @state.event and change.objectClass is Event.name
|
||||
updated = _.find change.objects, (o) => o.id is @state.event.id
|
||||
@setState({event: updated}) if updated
|
||||
@_onChange()
|
||||
|
|
|
@ -31,7 +31,6 @@ class ActivitySidebar extends React.Component
|
|||
|
||||
componentWillUnmount: =>
|
||||
unlisten() for unlisten in @_unlisteners
|
||||
@_workerUnlisten() if @_workerUnlisten
|
||||
|
||||
render: =>
|
||||
items = [@_renderNotificationActivityItems(), @_renderTaskActivityItems()]
|
||||
|
|
|
@ -51,7 +51,7 @@ class NylasSyncWorker
|
|||
@_unlisten = Actions.retryInitialSync.listen(@_onRetryInitialSync, @)
|
||||
|
||||
@_state = null
|
||||
DatabaseStore.findJSONObject("NylasSyncWorker:#{@_account.id}").then (json) =>
|
||||
DatabaseStore.findJSONBlob("NylasSyncWorker:#{@_account.id}").then (json) =>
|
||||
@_state = json ? {}
|
||||
for model, modelState of @_state
|
||||
modelState.busy = false
|
||||
|
@ -209,7 +209,7 @@ class NylasSyncWorker
|
|||
|
||||
writeState: ->
|
||||
@_writeState ?= _.debounce =>
|
||||
DatabaseStore.persistJSONObject("NylasSyncWorker:#{@_account.id}", @_state)
|
||||
DatabaseStore.persistJSONBlob("NylasSyncWorker:#{@_account.id}", @_state)
|
||||
,100
|
||||
@_writeState()
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ class RefreshingJSONCache
|
|||
@end()
|
||||
|
||||
# Look up existing data from db
|
||||
DatabaseStore.findJSONObject(@key).then (json) =>
|
||||
DatabaseStore.findJSONBlob(@key).then (json) =>
|
||||
|
||||
# Refresh immediately if json is missing or version is outdated. Otherwise,
|
||||
# compute next refresh time and schedule
|
||||
|
@ -24,7 +24,7 @@ class RefreshingJSONCache
|
|||
|
||||
reset: ->
|
||||
# Clear db value, turn off any scheduled actions
|
||||
DatabaseStore.persistJSONObject(@key, {})
|
||||
DatabaseStore.persistJSONBlob(@key, {})
|
||||
@end()
|
||||
|
||||
end: ->
|
||||
|
@ -39,7 +39,7 @@ class RefreshingJSONCache
|
|||
|
||||
# Call fetch data function, save it to the database
|
||||
@fetchData (newValue) =>
|
||||
DatabaseStore.persistJSONObject(@key, {
|
||||
DatabaseStore.persistJSONBlob(@key, {
|
||||
version: @version
|
||||
time: Date.now()
|
||||
value: newValue
|
||||
|
|
|
@ -16,8 +16,8 @@ describe "NylasSyncWorker", ->
|
|||
getThreads: (account, params, requestOptions) =>
|
||||
@apiRequests.push({account, model:'threads', params, requestOptions})
|
||||
|
||||
spyOn(DatabaseStore, 'persistJSONObject').andReturn(Promise.resolve())
|
||||
spyOn(DatabaseStore, 'findJSONObject').andCallFake (key) =>
|
||||
spyOn(DatabaseStore, 'persistJSONBlob').andReturn(Promise.resolve())
|
||||
spyOn(DatabaseStore, 'findJSONBlob').andCallFake (key) =>
|
||||
if key is "NylasSyncWorker:#{TEST_ACCOUNT_ID}"
|
||||
return Promise.resolve _.extend {}, {
|
||||
"contacts":
|
||||
|
|
|
@ -57,6 +57,7 @@
|
|||
"request": "^2.53",
|
||||
"request-progress": "^0.3",
|
||||
"runas": "^3.1",
|
||||
"rx-lite": "^4.0.7",
|
||||
"sanitize-html": "1.9.0",
|
||||
"scoped-property-store": "^0.16.2",
|
||||
"season": "^5.1",
|
||||
|
|
|
@ -95,11 +95,13 @@ describe "ModelQuery", ->
|
|||
@q.includeAll()
|
||||
expect(@q._includeJoinedData).toEqual([Message.attributes.body])
|
||||
|
||||
describe "formatResult", ->
|
||||
describe "response formatting", ->
|
||||
it "should always return a Number for counts", ->
|
||||
q = new ModelQuery(Message, @db)
|
||||
q.where({accountId: 'abcd'}).count()
|
||||
expect(q.formatResult([{count:"12"}])).toBe(12)
|
||||
|
||||
raw = [{count:"12"}]
|
||||
expect(q.formatResultObjects(q.inflateResult(raw))).toBe(12)
|
||||
|
||||
describe "sql", ->
|
||||
beforeEach ->
|
||||
|
@ -109,6 +111,12 @@ describe "ModelQuery", ->
|
|||
scenario.builder(q)
|
||||
expect(q.sql().trim()).toBe(scenario.sql.trim())
|
||||
|
||||
it "should finalize the query so no further changes can be made", ->
|
||||
q = new ModelQuery(Account, @db)
|
||||
spyOn(q, 'finalize')
|
||||
q.sql()
|
||||
expect(q.finalize).toHaveBeenCalled()
|
||||
|
||||
it "should correctly generate queries with multiple where clauses", ->
|
||||
@runScenario Account,
|
||||
builder: (q) -> q.where({emailAddress: 'ben@nylas.com'}).where({id: 2})
|
||||
|
|
36
spec/models/query-subscription-pool-spec.coffee
Normal file
36
spec/models/query-subscription-pool-spec.coffee
Normal file
|
@ -0,0 +1,36 @@
|
|||
QuerySubscriptionPool = require '../../src/flux/models/query-subscription-pool'
|
||||
DatabaseStore = require '../../src/flux/stores/database-store'
|
||||
Label = require '../../src/flux/models/label'
|
||||
|
||||
describe "QuerySubscriptionPool", ->
|
||||
beforeEach ->
|
||||
@query = DatabaseStore.findAll(Label)
|
||||
QuerySubscriptionPool._subscriptions = []
|
||||
|
||||
describe "add", ->
|
||||
it "should add a new subscription with the callback", ->
|
||||
callback = jasmine.createSpy('callback')
|
||||
QuerySubscriptionPool.add(@query, {}, callback)
|
||||
expect(QuerySubscriptionPool._subscriptions.length).toBe(1)
|
||||
subscription = QuerySubscriptionPool._subscriptions[0]
|
||||
expect(subscription.hasCallback(callback)).toBe(true)
|
||||
|
||||
it "should yield database changes to the subscription", ->
|
||||
callback = jasmine.createSpy('callback')
|
||||
QuerySubscriptionPool.add(@query, {}, callback)
|
||||
subscription = QuerySubscriptionPool._subscriptions[0]
|
||||
spyOn(subscription, 'applyChangeRecord')
|
||||
|
||||
record = {objectType: 'whateves'}
|
||||
QuerySubscriptionPool._onChange(record)
|
||||
expect(subscription.applyChangeRecord).toHaveBeenCalledWith(record)
|
||||
|
||||
describe "unsubscribe", ->
|
||||
it "should return an unsubscribe method", ->
|
||||
expect(QuerySubscriptionPool.add(@query, {}, -> ) instanceof Function).toBe(true)
|
||||
|
||||
it "should remove the subscription", ->
|
||||
unsub = QuerySubscriptionPool.add(@query, {}, -> )
|
||||
expect(QuerySubscriptionPool._subscriptions.length).toBe(1)
|
||||
unsub()
|
||||
expect(QuerySubscriptionPool._subscriptions.length).toBe(0)
|
256
spec/models/query-subscription-spec.coffee
Normal file
256
spec/models/query-subscription-spec.coffee
Normal file
|
@ -0,0 +1,256 @@
|
|||
DatabaseStore = require '../../src/flux/stores/database-store'
|
||||
QuerySubscription = require '../../src/flux/models/query-subscription'
|
||||
Thread = require '../../src/flux/models/thread'
|
||||
Label = require '../../src/flux/models/label'
|
||||
Utils = require '../../src/flux/models/utils'
|
||||
|
||||
describe "QuerySubscription", ->
|
||||
describe "constructor", ->
|
||||
it "should throw an error if the query is a count query", ->
|
||||
query = DatabaseStore.findAll(Label).count()
|
||||
expect( => new QuerySubscription(query)).toThrow()
|
||||
|
||||
it "should throw an error if a query is not provided", ->
|
||||
expect( => new QuerySubscription({})).toThrow()
|
||||
|
||||
it "should fetch an initial result set", ->
|
||||
spyOn(QuerySubscription.prototype, '_refetchResultSet')
|
||||
sub = new QuerySubscription(DatabaseStore.findAll(Label))
|
||||
expect(QuerySubscription.prototype._refetchResultSet).toHaveBeenCalled()
|
||||
|
||||
describe "applyChangeRecord", ->
|
||||
spyOn(Utils, 'generateTempId').andCallFake => ""
|
||||
|
||||
scenarios = [{
|
||||
name: "query with full set of objects (4)"
|
||||
query: DatabaseStore.findAll(Thread)
|
||||
.where(Thread.attributes.accountId.equal('a'))
|
||||
.limit(4)
|
||||
.offset(2)
|
||||
lastResultSet: [
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1),
|
||||
]
|
||||
tests: [{
|
||||
name: 'Item saved which belongs in the set'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '5', lastMessageReceivedTimestamp: 3.5)]
|
||||
type: 'persist'
|
||||
newResultSet:[
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4),
|
||||
new Thread(accountId: 'a', id: '5', lastMessageReceivedTimestamp: 3.5),
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
]
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item saved which does not match query clauses'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'b', id: '5', lastMessageReceivedTimestamp: 5)]
|
||||
type: 'persist'
|
||||
newResultSet: 'unchanged'
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item saved which does not lie in the range after sorting'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'b', id: '5', lastMessageReceivedTimestamp: -2)]
|
||||
type: 'persist'
|
||||
newResultSet: 'unchanged'
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item in set saved'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4, subject: 'hello')]
|
||||
type: 'persist'
|
||||
newResultSet:[
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4, subject: 'hello')
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1),
|
||||
]
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item in set saved, sort order changed (within range only)'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5)]
|
||||
type: 'persist'
|
||||
newResultSet:[
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5),
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1),
|
||||
]
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item in set saved, sort order changed and sorted to edge of set (impacting last)'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 6)]
|
||||
type: 'persist'
|
||||
refetchRequired: true
|
||||
},{
|
||||
name: 'Item in set saved, sort order changed and sorted to edge of set (impacting first)'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: -1)]
|
||||
type: 'persist'
|
||||
refetchRequired: true
|
||||
},{
|
||||
name: 'Item in set saved, no longer matches query clauses'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'b', id: '4', lastMessageReceivedTimestamp: 4)]
|
||||
type: 'persist'
|
||||
newResultSet: [
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1),
|
||||
]
|
||||
refetchRequired: true
|
||||
},{
|
||||
name: 'Item in set deleted'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '4')]
|
||||
type: 'unpersist'
|
||||
newResultSet: [
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1),
|
||||
]
|
||||
refetchRequired: true
|
||||
},{
|
||||
name: 'Item not in set deleted'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '5')]
|
||||
type: 'unpersist'
|
||||
newResultSet: 'unchanged'
|
||||
refetchRequired: false
|
||||
}]
|
||||
|
||||
},{
|
||||
name: "query with fewer than LIMIT objects"
|
||||
query: DatabaseStore.findAll(Thread)
|
||||
.where(Thread.attributes.accountId.equal('a'))
|
||||
.limit(4)
|
||||
.offset(2)
|
||||
lastResultSet: [
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2)
|
||||
]
|
||||
tests: [{
|
||||
name: 'Item in set saved, no longer matches query clauses'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'b', id: '4', lastMessageReceivedTimestamp: 4)]
|
||||
type: 'persist'
|
||||
newResultSet: [
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
]
|
||||
refetchRequired: false
|
||||
},{
|
||||
name: 'Item in set deleted'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '4')]
|
||||
type: 'unpersist'
|
||||
newResultSet: [
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3),
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2),
|
||||
]
|
||||
refetchRequired: false
|
||||
}]
|
||||
},{
|
||||
name: "query with ASC sort order"
|
||||
query: DatabaseStore.findAll(Thread)
|
||||
.where(Thread.attributes.accountId.equal('a'))
|
||||
.limit(4)
|
||||
.offset(2)
|
||||
.order(Thread.attributes.lastMessageReceivedTimestamp.ascending())
|
||||
lastResultSet: [
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1)
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 3)
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4)
|
||||
]
|
||||
tests: [{
|
||||
name: 'Item in set saved, sort order changed'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5)]
|
||||
type: 'persist'
|
||||
newResultSet:[
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1.5)
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 2)
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 4)
|
||||
]
|
||||
refetchRequired: false
|
||||
}]
|
||||
|
||||
},{
|
||||
name: "query with multiple sort orders"
|
||||
query: DatabaseStore.findAll(Thread)
|
||||
.where(Thread.attributes.accountId.equal('a'))
|
||||
.limit(4)
|
||||
.offset(2)
|
||||
.order([
|
||||
Thread.attributes.lastMessageReceivedTimestamp.ascending(),
|
||||
Thread.attributes.unread.descending()
|
||||
])
|
||||
lastResultSet: [
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1, unread: true)
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 1, unread: false)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: false)
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 2, unread: true)
|
||||
]
|
||||
tests: [{
|
||||
name: 'Item in set saved, secondary sort order changed'
|
||||
change:
|
||||
objectClass: Thread.name
|
||||
objects: [new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: true)]
|
||||
type: 'persist'
|
||||
newResultSet:[
|
||||
new Thread(accountId: 'a', id: '1', lastMessageReceivedTimestamp: 1, unread: true)
|
||||
new Thread(accountId: 'a', id: '3', lastMessageReceivedTimestamp: 1, unread: true)
|
||||
new Thread(accountId: 'a', id: '2', lastMessageReceivedTimestamp: 1, unread: false)
|
||||
new Thread(accountId: 'a', id: '4', lastMessageReceivedTimestamp: 2, unread: true)
|
||||
]
|
||||
refetchRequired: false
|
||||
}]
|
||||
}]
|
||||
|
||||
jasmine.unspy(Utils, 'generateTempId')
|
||||
|
||||
describe "scenarios", ->
|
||||
scenarios.forEach (scenario) =>
|
||||
scenario.tests.forEach (test) =>
|
||||
it "with #{scenario.name}, should correctly apply #{test.name}", ->
|
||||
@q = new QuerySubscription(scenario.query, -> )
|
||||
@q._lastResultSet = scenario.lastResultSet
|
||||
spyOn(@q, '_invokeCallbacks')
|
||||
spyOn(@q, '_refetchResultSet')
|
||||
@q.applyChangeRecord(test.change)
|
||||
|
||||
if test.newResultSet is 'unchanged'
|
||||
expect(@q._invokeCallbacks).not.toHaveBeenCalled()
|
||||
expect(@q._lastResultSet).toEqual(scenario.lastResultSet)
|
||||
|
||||
else if test.newResultSet
|
||||
expect(@q._invokeCallbacks).toHaveBeenCalled()
|
||||
expect(@q._lastResultSet).toEqual(test.newResultSet)
|
||||
|
||||
if test.refetchRequired
|
||||
expect(@q._refetchResultSet).toHaveBeenCalled()
|
||||
else
|
||||
expect(@q._refetchResultSet).not.toHaveBeenCalled()
|
|
@ -72,7 +72,6 @@ describe "MessageStore", ->
|
|||
|
||||
spyOn(DatabaseStore, 'findAll').andCallFake ->
|
||||
include: -> @
|
||||
waitForAnimations: -> @
|
||||
where: -> @
|
||||
then: (callback) -> callback([testMessage1, testMessage2])
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ describe "TaskQueue", ->
|
|||
describe "restoreQueue", ->
|
||||
it "should fetch the queue from the database, reset flags and start processing", ->
|
||||
queue = [@processingTask, @unstartedTask]
|
||||
spyOn(DatabaseStore, 'findJSONObject').andCallFake => Promise.resolve(queue)
|
||||
spyOn(DatabaseStore, 'findJSONBlob').andCallFake => Promise.resolve(queue)
|
||||
spyOn(TaskQueue, '_processQueue')
|
||||
|
||||
waitsForPromise =>
|
||||
|
|
|
@ -52,58 +52,13 @@ describe "ThreadCountsStore", ->
|
|||
ThreadCountsStore._onCountsChanged(payload)
|
||||
expect(WindowBridge.runInWorkWindow).toHaveBeenCalledWith('ThreadCountsStore', '_onCountsChanged', [payload])
|
||||
|
||||
describe "when a folder or label is persisted", ->
|
||||
beforeEach ->
|
||||
@lExisting = new Label(id: "l1", name: "inbox", displayName: "Inbox")
|
||||
ThreadCountsStore._categories = [@lExisting]
|
||||
|
||||
@lCreated = new Label(id: "lNew", displayName: "Hi there!")
|
||||
@lUpdated = @lExisting.clone()
|
||||
@lUpdated.displayName = "Inbox Edited"
|
||||
|
||||
spyOn(ThreadCountsStore, '_fetchCountsMissing')
|
||||
|
||||
describe "in the work window", ->
|
||||
beforeEach ->
|
||||
spyOn(NylasEnv, 'isWorkWindow').andReturn(true)
|
||||
|
||||
it "should add or update it in it's local categories cache", ->
|
||||
ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated]})
|
||||
expect(ThreadCountsStore._categories).toEqual([@lExisting, @lCreated])
|
||||
|
||||
ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lUpdated]})
|
||||
expect(ThreadCountsStore._categories).toEqual([@lUpdated, @lCreated])
|
||||
|
||||
ThreadCountsStore._categories = []
|
||||
|
||||
ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated, @lUpdated]})
|
||||
expect(ThreadCountsStore._categories).toEqual([@lCreated, @lUpdated])
|
||||
|
||||
it "should run _fetchCountsMissing", ->
|
||||
ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lUpdated]})
|
||||
expect(ThreadCountsStore._fetchCountsMissing).toHaveBeenCalled()
|
||||
|
||||
describe "in other windows", ->
|
||||
beforeEach ->
|
||||
spyOn(NylasEnv, 'isWorkWindow').andReturn(false)
|
||||
|
||||
it "should do nothing", ->
|
||||
ThreadCountsStore._onDatabaseChanged({objectClass: Label.name, objects: [@lCreated]})
|
||||
expect(ThreadCountsStore._categories).toEqual([@lExisting])
|
||||
expect(ThreadCountsStore._fetchCountsMissing).not.toHaveBeenCalled()
|
||||
|
||||
describe "when counts are persisted", ->
|
||||
it "should update it's _counts cache and trigger", ->
|
||||
newCounts = {
|
||||
'abc': 1
|
||||
}
|
||||
spyOn(ThreadCountsStore, 'trigger')
|
||||
ThreadCountsStore._onDatabaseChanged({
|
||||
objectClass: 'JSONObject',
|
||||
objects: [
|
||||
{key: ThreadCountsStore.JSONObjectKey, json: newCounts}
|
||||
]
|
||||
})
|
||||
ThreadCountsStore._onCountsBlobRead(newCounts)
|
||||
expect(ThreadCountsStore._counts).toEqual(newCounts)
|
||||
expect(ThreadCountsStore.trigger).toHaveBeenCalled()
|
||||
|
||||
|
@ -218,9 +173,9 @@ describe "ThreadCountsStore", ->
|
|||
})
|
||||
|
||||
it "should persist the new counts to the database", ->
|
||||
spyOn(DatabaseStore, 'persistJSONObject')
|
||||
spyOn(DatabaseStore, 'persistJSONBlob')
|
||||
ThreadCountsStore._saveCounts()
|
||||
expect(DatabaseStore.persistJSONObject).toHaveBeenCalledWith(ThreadCountsStore.JSONObjectKey, ThreadCountsStore._counts)
|
||||
expect(DatabaseStore.persistJSONBlob).toHaveBeenCalledWith(ThreadCountsStore.JSONBlobKey, ThreadCountsStore._counts)
|
||||
|
||||
describe "CategoryDatabaseMutationObserver", ->
|
||||
beforeEach ->
|
||||
|
|
28
src/flux/models/json-blob.coffee
Normal file
28
src/flux/models/json-blob.coffee
Normal file
|
@ -0,0 +1,28 @@
|
|||
_ = require 'underscore'
|
||||
Model = require './model'
|
||||
Attributes = require '../attributes'
|
||||
|
||||
class JSONBlob extends Model
|
||||
@attributes:
|
||||
'id': Attributes.String
|
||||
queryable: true
|
||||
modelKey: 'id'
|
||||
|
||||
'clientId': Attributes.String
|
||||
queryable: true
|
||||
modelKey: 'clientId'
|
||||
jsonKey: 'client_id'
|
||||
|
||||
'serverId': Attributes.ServerId
|
||||
modelKey: 'serverId'
|
||||
jsonKey: 'server_id'
|
||||
|
||||
'json': Attributes.Object
|
||||
modelKey: 'json'
|
||||
jsonKey: 'json'
|
||||
|
||||
Object.defineProperty @prototype, "key",
|
||||
get: -> @id
|
||||
set: (val) -> @id = val
|
||||
|
||||
module.exports = JSONBlob
|
55
src/flux/models/query-subscription-pool.coffee
Normal file
55
src/flux/models/query-subscription-pool.coffee
Normal file
|
@ -0,0 +1,55 @@
|
|||
_ = require 'underscore'
|
||||
DatabaseChangeRecord = require '../stores/database-change-record'
|
||||
QuerySubscription = require './query-subscription'
|
||||
|
||||
###
|
||||
Public: The QuerySubscriptionPool maintains a list of all of the query
|
||||
subscriptions in the app. In the future, this class will monitor performance,
|
||||
merge equivalent subscriptions, etc.
|
||||
###
|
||||
class QuerySubscriptionPool
|
||||
constructor: ->
|
||||
@_subscriptions = []
|
||||
|
||||
add: (query, options, callback) =>
|
||||
@_setup() if @_subscriptions.length is 0
|
||||
|
||||
callback._registrationPoint = @_formatRegistrationPoint((new Error).stack)
|
||||
|
||||
subscription = new QuerySubscription(query, options)
|
||||
subscription.addCallback(callback)
|
||||
@_subscriptions.push(subscription)
|
||||
|
||||
return =>
|
||||
subscription.removeCallback(callback)
|
||||
@_subscriptions = _.without(@_subscriptions, subscription)
|
||||
|
||||
printSubscriptions: =>
|
||||
@_subscriptions.forEach (sub) ->
|
||||
console.log(sub._query.sql())
|
||||
console.group()
|
||||
sub._callbacks.forEach (callback) ->
|
||||
console.log("#{callback._registrationPoint}")
|
||||
console.groupEnd()
|
||||
|
||||
_formatRegistrationPoint: (stack) ->
|
||||
stack = stack.split('\n')
|
||||
ii = 0
|
||||
seenRx = false
|
||||
while ii < stack.length
|
||||
hasRx = stack[ii].indexOf('rx.lite') isnt -1
|
||||
seenRx ||= hasRx
|
||||
break if seenRx is true and not hasRx
|
||||
ii += 1
|
||||
|
||||
return stack[ii..(ii + 4)].join('\n')
|
||||
|
||||
_setup: =>
|
||||
DatabaseStore = require '../stores/database-store'
|
||||
DatabaseStore.listen @_onChange
|
||||
|
||||
_onChange: (record) =>
|
||||
for subscription in @_subscriptions
|
||||
subscription.applyChangeRecord(record)
|
||||
|
||||
module.exports = new QuerySubscriptionPool()
|
210
src/flux/models/query-subscription.coffee
Normal file
210
src/flux/models/query-subscription.coffee
Normal file
|
@ -0,0 +1,210 @@
|
|||
_ = require 'underscore'
|
||||
DatabaseChangeRecord = require '../stores/database-change-record'
|
||||
|
||||
class QuerySubscription
|
||||
constructor: (@_query, @_options) ->
|
||||
ModelQuery = require './query'
|
||||
|
||||
if not @_query or not (@_query instanceof ModelQuery)
|
||||
throw new Error("QuerySubscription: Must be constructed with a ModelQuery. Got #{@_query}")
|
||||
|
||||
if @_query._count
|
||||
throw new Error("QuerySubscriptionPool::add - You cannot listen to count queries.")
|
||||
|
||||
@_query.finalize()
|
||||
@_limit = @_query.range().limit ? Infinity
|
||||
@_offset = @_query.range().offset ? 0
|
||||
|
||||
@_callbacks = []
|
||||
@_version = 0
|
||||
@_versionFetchInProgress = false
|
||||
@_lastResultSet = null
|
||||
@_refetchResultSet()
|
||||
|
||||
addCallback: (callback) =>
|
||||
unless callback instanceof Function
|
||||
throw new Error("QuerySubscription:addCallback - expects a function, received #{callback}")
|
||||
@_callbacks.push(callback)
|
||||
|
||||
hasCallback: (callback) =>
|
||||
@_callbacks.indexOf(callback) isnt -1
|
||||
|
||||
removeCallback: (callback) =>
|
||||
unless callback instanceof Function
|
||||
throw new Error("QuerySubscription:removeCallback - expects a function, received #{callback}")
|
||||
@_callbacks = _.without(@_callbacks, callback)
|
||||
|
||||
applyChangeRecord: (record) =>
|
||||
return unless record.objectClass is @_query.objectClass()
|
||||
return unless record.objects.length > 0
|
||||
return @_invalidatePendingResultSet() unless @_lastResultSet
|
||||
|
||||
@_lastResultSet = [].concat(@_lastResultSet)
|
||||
|
||||
if record.type is 'unpersist'
|
||||
status = @_optimisticallyRemoveModels(record.objects)
|
||||
else if record.type is 'persist'
|
||||
status = @_optimisticallyUpdateModels(record.objects)
|
||||
else
|
||||
throw new Error("QuerySubscription: Unknown change record type: #{record.type}")
|
||||
|
||||
if status.setModified
|
||||
@_invokeCallbacks()
|
||||
if status.setFetchRequired
|
||||
@_refetchResultSet()
|
||||
|
||||
_refetchResultSet: =>
|
||||
@_version += 1
|
||||
|
||||
return if @_versionFetchInProgress
|
||||
@_versionFetchInProgress = true
|
||||
fetchVersion = @_version
|
||||
|
||||
DatabaseStore = require '../stores/database-store'
|
||||
DatabaseStore.run(@_query, {format: false}).then (result) =>
|
||||
@_versionFetchInProgress = false
|
||||
if @_version is fetchVersion
|
||||
@_lastResultSet = result
|
||||
@_invokeCallbacks()
|
||||
else
|
||||
@_refetchResultSet()
|
||||
|
||||
_invalidatePendingResultSet: =>
|
||||
@_version += 1
|
||||
|
||||
_resortResultSet: =>
|
||||
sortDescriptors = @_query.orderSortDescriptors()
|
||||
@_lastResultSet.sort (a, b) ->
|
||||
for descriptor in sortDescriptors
|
||||
if descriptor.direction is 'ASC'
|
||||
direction = 1
|
||||
else if descriptor.direction is 'DESC'
|
||||
direction = -1
|
||||
else
|
||||
throw new Error("QuerySubscription: Unknown sort order: #{descriptor.direction}")
|
||||
aValue = a[descriptor.attr.modelKey]
|
||||
bValue = b[descriptor.attr.modelKey]
|
||||
return -1 * direction if aValue < bValue
|
||||
return 1 * direction if aValue > bValue
|
||||
return 0
|
||||
|
||||
_optimisticallyRemoveModels: (items) =>
|
||||
status =
|
||||
setModified: false
|
||||
setFetchRequired: false
|
||||
|
||||
lastLength = @_lastResultSet.length
|
||||
|
||||
for item in items
|
||||
idx = _.findIndex @_lastResultSet, ({id}) -> id is item.id
|
||||
if idx isnt -1
|
||||
@_lastResultSet.splice(idx, 1)
|
||||
status.setModified = true
|
||||
|
||||
# Removing items is an issue if we previosly had LIMIT items. This
|
||||
# means there are likely more items to display in the place of the one
|
||||
# we're removing and we need to re-fetch
|
||||
if lastLength is @_limit
|
||||
status.setFetchRequired = true
|
||||
|
||||
status
|
||||
|
||||
_optimisticallyUpdateModels: (items) =>
|
||||
status =
|
||||
setModified: false
|
||||
setFetchRequired: false
|
||||
|
||||
sortNecessary = false
|
||||
|
||||
# Pull attributes of the query
|
||||
sortDescriptors = @_query.orderSortDescriptors()
|
||||
|
||||
oldSetInfo =
|
||||
length: @_lastResultSet.length
|
||||
startItem: @_lastResultSet[0]
|
||||
endItem: @_lastResultSet[@_limit - 1]
|
||||
|
||||
for item in items
|
||||
idx = _.findIndex @_lastResultSet, ({id}) -> id is item.id
|
||||
itemIsInSet = idx isnt -1
|
||||
itemShouldBeInSet = item.matches(@_query.matchers())
|
||||
|
||||
if itemIsInSet and not itemShouldBeInSet
|
||||
# remove the item
|
||||
@_lastResultSet.splice(idx, 1)
|
||||
status.setModified = true
|
||||
|
||||
else if itemShouldBeInSet and not itemIsInSet
|
||||
# insert the item, re-sort if a sort order is defined
|
||||
if sortDescriptors.length > 0
|
||||
sortNecessary = true
|
||||
@_lastResultSet.push(item)
|
||||
status.setModified = true
|
||||
|
||||
else if itemIsInSet
|
||||
# update the item in the set, re-sort if a sort attribute's value has changed
|
||||
if @_itemSortOrderHasChanged(@_lastResultSet[idx], item)
|
||||
sortNecessary = true
|
||||
@_lastResultSet[idx] = item
|
||||
status.setModified = true
|
||||
|
||||
if sortNecessary
|
||||
@_resortResultSet()
|
||||
|
||||
if sortNecessary and @_itemOnEdgeHasChanged(oldSetInfo)
|
||||
status.setFetchRequired = true
|
||||
|
||||
# If items have been added, truncate the result set to the requested length
|
||||
if @_lastResultSet.length > @_limit
|
||||
@_lastResultSet.length = @_limit
|
||||
|
||||
hadMaxItems = oldSetInfo.length is @_limit
|
||||
hasLostItems = @_lastResultSet.length < oldSetInfo.length
|
||||
|
||||
if hadMaxItems and hasLostItems
|
||||
# Ex: We asked for 20 items and had 20 items. Now we have 19 items.
|
||||
# We need to pull a nw item to fill slot #20.
|
||||
status.setFetchRequired = true
|
||||
|
||||
status
|
||||
|
||||
_itemOnEdgeHasChanged: (oldSetInfo) ->
|
||||
hasPrecedingItems = @_offset > 0
|
||||
hasChangedStartItem = oldSetInfo.startItem isnt @_lastResultSet[0]
|
||||
|
||||
if hasPrecedingItems and hasChangedStartItem
|
||||
# We've changed the identity of the item at index zero. We have no way
|
||||
# of knowing if it would still sort at this position, or if another item
|
||||
# from earlier in the range should be at index zero.
|
||||
# Full re-fetch is necessary.
|
||||
return true
|
||||
|
||||
hasTrailingItems = @_lastResultSet.length is @_limit
|
||||
hasChangedEndItem = oldSetInfo.endItem isnt @_lastResultSet[@_limit - 1]
|
||||
|
||||
if hasTrailingItems and hasChangedEndItem
|
||||
# We've changed he last item in the set, and the set is at it's LIMIT length.
|
||||
# We have no way of knowing if the item should still be at this position
|
||||
# since we can't see the next item.
|
||||
# Full re-fetch is necessary.
|
||||
return true
|
||||
|
||||
_itemSortOrderHasChanged: (old, updated) ->
|
||||
for descriptor in @_query.orderSortDescriptors()
|
||||
oldSortValue = old[descriptor.attr.modelKey]
|
||||
updatedSortValue = updated[descriptor.attr.modelKey]
|
||||
|
||||
# http://stackoverflow.com/questions/4587060/determining-date-equality-in-javascript
|
||||
if not (oldSortValue >= updatedSortValue && oldSortValue <= updatedSortValue)
|
||||
return true
|
||||
|
||||
return false
|
||||
|
||||
_invokeCallbacks: =>
|
||||
set = [].concat(@_lastResultSet)
|
||||
resultForSet = @_query.formatResultObjects(set)
|
||||
|
||||
@_callbacks.forEach (callback) =>
|
||||
callback(resultForSet)
|
||||
|
||||
module.exports = QuerySubscription
|
|
@ -43,8 +43,8 @@ class ModelQuery
|
|||
@_database || = require '../stores/database-store'
|
||||
@_matchers = []
|
||||
@_orders = []
|
||||
@_singular = false
|
||||
@_waitForAnimations = false
|
||||
@_range = {}
|
||||
@_returnOne = false
|
||||
@_includeJoinedData = []
|
||||
@_count = false
|
||||
@
|
||||
|
@ -56,6 +56,8 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
where: (matchers) ->
|
||||
@_assertNotFinalized()
|
||||
|
||||
if matchers instanceof Matcher
|
||||
@_matchers.push(matchers)
|
||||
else if matchers instanceof Array
|
||||
|
@ -78,6 +80,7 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
include: (attr) ->
|
||||
@_assertNotFinalized()
|
||||
if attr instanceof AttributeJoinedData is false
|
||||
throw new Error("query.include() must be called with a joined data attribute")
|
||||
@_includeJoinedData.push(attr)
|
||||
|
@ -88,6 +91,7 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
includeAll: ->
|
||||
@_assertNotFinalized()
|
||||
for key, attr of @_klass.attributes
|
||||
@include(attr) if attr instanceof AttributeJoinedData
|
||||
@
|
||||
|
@ -99,6 +103,7 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
order: (orders) ->
|
||||
@_assertNotFinalized()
|
||||
orders = [orders] unless orders instanceof Array
|
||||
@_orders = @_orders.concat(orders)
|
||||
@
|
||||
|
@ -109,7 +114,8 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
one: ->
|
||||
@_singular = true
|
||||
@_assertNotFinalized()
|
||||
@_returnOne = true
|
||||
@
|
||||
|
||||
# Public: Limit the number of query results.
|
||||
|
@ -119,8 +125,8 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
limit: (limit) ->
|
||||
throw new Error("Cannot use limit > 2 with one()") if @_singular and limit > 1
|
||||
@_range ?= {}
|
||||
@_assertNotFinalized()
|
||||
throw new Error("Cannot use limit > 2 with one()") if @_returnOne and limit > 1
|
||||
@_range.limit = limit
|
||||
@
|
||||
|
||||
|
@ -131,7 +137,7 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
offset: (offset) ->
|
||||
@_range ?= {}
|
||||
@_assertNotFinalized()
|
||||
@_range.offset = offset
|
||||
@
|
||||
|
||||
|
@ -141,20 +147,10 @@ class ModelQuery
|
|||
# This method is chainable.
|
||||
#
|
||||
count: ->
|
||||
@_assertNotFinalized()
|
||||
@_count = true
|
||||
@
|
||||
|
||||
##
|
||||
# Public: Set the `waitForAnimations` flag - instead of waiting for animations and other important user
|
||||
# interactions to complete, the query result will be processed immediately. Use with care: forcing
|
||||
# immediate evaluation can cause glitches in animations.
|
||||
#
|
||||
# This method is chainable.
|
||||
#
|
||||
waitForAnimations: ->
|
||||
@_waitForAnimations = true
|
||||
@
|
||||
|
||||
###
|
||||
Query Execution
|
||||
###
|
||||
|
@ -173,7 +169,7 @@ class ModelQuery
|
|||
run: ->
|
||||
@_database.run(@)
|
||||
|
||||
formatResult: (result) ->
|
||||
inflateResult: (result) ->
|
||||
return null unless result
|
||||
|
||||
if @_count
|
||||
|
@ -189,15 +185,20 @@ class ModelQuery
|
|||
object[attr.modelKey] = value
|
||||
object
|
||||
catch jsonError
|
||||
throw new Error("Query could not parse the database result. Query: #{@sql()}, Row Data: #{row[0]}, Error: #{jsonError.toString()}")
|
||||
return objects[0] if @_singular
|
||||
throw new Error("Query could not parse the database result. Query: #{@sql()}, Error: #{jsonError.toString()}")
|
||||
return objects
|
||||
|
||||
formatResultObjects: (objects) ->
|
||||
return objects[0] if @_returnOne
|
||||
return objects
|
||||
|
||||
# Query SQL Building
|
||||
|
||||
# Returns a {String} with the SQL generated for the query.
|
||||
#
|
||||
sql: ->
|
||||
@finalize()
|
||||
|
||||
if @_count
|
||||
result = "COUNT(*) as count"
|
||||
else
|
||||
|
@ -206,19 +207,14 @@ class ModelQuery
|
|||
result += ", #{attr.selectSQL(@_klass)} "
|
||||
|
||||
order = if @_count then "" else @_orderClause()
|
||||
if @_singular
|
||||
limit = "LIMIT 1"
|
||||
else if @_range?.limit
|
||||
if @_range.limit?
|
||||
limit = "LIMIT #{@_range.limit}"
|
||||
else
|
||||
limit = ""
|
||||
if @_range?.offset
|
||||
if @_range.offset?
|
||||
limit += " OFFSET #{@_range.offset}"
|
||||
"SELECT #{result} FROM `#{@_klass.name}` #{@_whereClause()} #{order} #{limit}"
|
||||
|
||||
executeOptions: ->
|
||||
waitForAnimations: @_waitForAnimations
|
||||
|
||||
_whereClause: ->
|
||||
joins = []
|
||||
@_matchers.forEach (c) =>
|
||||
|
@ -240,10 +236,6 @@ class ModelQuery
|
|||
sql
|
||||
|
||||
_orderClause: ->
|
||||
if @_orders.length == 0
|
||||
natural = @_klass.naturalSortOrder()
|
||||
@_orders.push(natural) if natural
|
||||
|
||||
return "" unless @_orders.length
|
||||
|
||||
sql = " ORDER BY "
|
||||
|
@ -251,12 +243,39 @@ class ModelQuery
|
|||
sql += sort.orderBySQL(@_klass)
|
||||
sql
|
||||
|
||||
# Private: Marks the object as final, preventing any changes to the where
|
||||
# clauses, orders, etc.
|
||||
finalize: ->
|
||||
if @_orders.length is 0
|
||||
natural = @_klass.naturalSortOrder()
|
||||
@_orders.push(natural) if natural
|
||||
if @_returnOne and not @_range.limit
|
||||
@limit(1)
|
||||
@_finalized = true
|
||||
@
|
||||
|
||||
# Private: Throws an exception if the query has been frozen.
|
||||
_assertNotFinalized: ->
|
||||
if @_finalized
|
||||
throw new Error("ModelQuery: You cannot modify a query after calling `then` or `listen`")
|
||||
|
||||
# Introspection
|
||||
# (These are here to make specs easy)
|
||||
|
||||
matchers: ->
|
||||
@_matchers
|
||||
|
||||
matcherValueForModelKey: (key) ->
|
||||
matcher = _.find @_matchers, (m) -> m.attr.modelKey = key
|
||||
matcher?.val
|
||||
|
||||
range: ->
|
||||
@_range
|
||||
|
||||
orderSortDescriptors: ->
|
||||
@_orders
|
||||
|
||||
objectClass: ->
|
||||
@_klass.name
|
||||
|
||||
module.exports = ModelQuery
|
||||
|
|
|
@ -5,6 +5,7 @@ NylasAPI = require '../nylas-api'
|
|||
NylasStore = require 'nylas-store'
|
||||
DatabaseStore = require './database-store'
|
||||
AccountStore = require './account-store'
|
||||
Rx = require 'rx-lite'
|
||||
|
||||
class CategoryStore extends NylasStore
|
||||
constructor: ->
|
||||
|
@ -13,11 +14,8 @@ class CategoryStore extends NylasStore
|
|||
@_userCategories = []
|
||||
@_hiddenCategories = []
|
||||
|
||||
@listenTo DatabaseStore, @_onDBChanged
|
||||
@listenTo AccountStore, @_refreshCacheFromDB
|
||||
NylasEnv.config.observe 'core.workspace.showImportant', => @_refreshCacheFromDB()
|
||||
|
||||
@_refreshCacheFromDB()
|
||||
NylasEnv.config.observe 'core.workspace.showImportant', => @_buildQuerySubscription()
|
||||
@_buildQuerySubscription()
|
||||
|
||||
# We look for a few standard categories and display them in the Mailboxes
|
||||
# portion of the left sidebar. Note that these may not all be present on
|
||||
|
@ -131,54 +129,38 @@ class CategoryStore extends NylasStore
|
|||
getUserCategories: ->
|
||||
@_userCategories
|
||||
|
||||
_onDBChanged: (change) ->
|
||||
categoryClass = @categoryClass()
|
||||
return unless categoryClass
|
||||
_buildQuerySubscription: =>
|
||||
{Categories} = require 'nylas-observables'
|
||||
@_queryUnlisten?.dispose()
|
||||
@_queryUnlisten = Categories.forCurrentAccount().sort().subscribe(@_onCategoriesChanged)
|
||||
|
||||
if change and change.objectClass is categoryClass.name
|
||||
@_refreshCacheFromDB()
|
||||
_onCategoriesChanged: (categories) =>
|
||||
return unless categories
|
||||
|
||||
_refreshCacheFromDB: ->
|
||||
categoryClass = @categoryClass()
|
||||
account = AccountStore.current()
|
||||
return unless categoryClass
|
||||
@_categoryCache = {}
|
||||
for category in categories
|
||||
@_categoryCache[category.id] = category
|
||||
|
||||
DatabaseStore.findAll(categoryClass).where(categoryClass.attributes.accountId.equal(account.id)).then (categories=[]) =>
|
||||
categories = categories.sort (catA, catB) ->
|
||||
nameA = catA.displayName
|
||||
nameB = catB.displayName
|
||||
# Compute user categories
|
||||
@_userCategories = _.compact _.reject categories, (cat) =>
|
||||
cat.name in @StandardCategoryNames or cat.name in @HiddenCategoryNames
|
||||
|
||||
# Categories that begin with [, like [Mailbox]/For Later
|
||||
# should appear at the bottom, because they're likely autogenerated.
|
||||
nameA = "ZZZ"+nameA if nameA[0] is '['
|
||||
nameB = "ZZZ"+nameB if nameB[0] is '['
|
||||
# Compute hidden categories
|
||||
@_hiddenCategories = _.filter categories, (cat) =>
|
||||
cat.name in @HiddenCategoryNames
|
||||
|
||||
nameA.localeCompare(nameB)
|
||||
# Compute standard categories
|
||||
# Single pass to create lookup table, single pass to get ordered array
|
||||
byStandardName = {}
|
||||
for key, val of @_categoryCache
|
||||
byStandardName[val.name] = val
|
||||
|
||||
@_categoryCache = {}
|
||||
for category in categories
|
||||
@_categoryCache[category.id] = category
|
||||
if not NylasEnv.config.get('core.workspace.showImportant')
|
||||
delete byStandardName['important']
|
||||
|
||||
# Compute user categories
|
||||
@_userCategories = _.compact _.reject categories, (cat) =>
|
||||
cat.name in @StandardCategoryNames or cat.name in @HiddenCategoryNames
|
||||
@_standardCategories = _.compact @StandardCategoryNames.map (name) =>
|
||||
byStandardName[name]
|
||||
|
||||
# Compute hidden categories
|
||||
@_hiddenCategories = _.filter categories, (cat) =>
|
||||
cat.name in @HiddenCategoryNames
|
||||
|
||||
# Compute standard categories
|
||||
# Single pass to create lookup table, single pass to get ordered array
|
||||
byStandardName = {}
|
||||
for key, val of @_categoryCache
|
||||
byStandardName[val.name] = val
|
||||
|
||||
if not NylasEnv.config.get('core.workspace.showImportant')
|
||||
delete byStandardName['important']
|
||||
|
||||
@_standardCategories = _.compact @StandardCategoryNames.map (name) =>
|
||||
byStandardName[name]
|
||||
|
||||
@trigger()
|
||||
@trigger()
|
||||
|
||||
module.exports = new CategoryStore()
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
|
||||
Rx = require 'rx-lite'
|
||||
NylasStore = require 'nylas-store'
|
||||
DatabaseStore = require './database-store'
|
||||
AccountStore = require './account-store'
|
||||
|
@ -6,34 +6,21 @@ AccountStore = require './account-store'
|
|||
class ContactRankingStore extends NylasStore
|
||||
|
||||
constructor: ->
|
||||
@listenTo DatabaseStore, @_onDatabaseChanged
|
||||
@listenTo AccountStore, @_onAccountChanged
|
||||
@_value = null
|
||||
@_accountId = null
|
||||
@_refresh()
|
||||
|
||||
_onDatabaseChanged: (change) =>
|
||||
if change.objectClass is 'JSONObject' and change.objects[0].key is "ContactRankingsFor#{@_accountId}"
|
||||
@_value = change.objects[0].json.value
|
||||
{Accounts} = require 'nylas-observables'
|
||||
Accounts.forCurrentId().flatMapLatest (accountId) =>
|
||||
query = DatabaseStore.findJSONBlob("ContactRankingsFor#{accountId}")
|
||||
return Rx.Observable.fromQuery(query)
|
||||
.subscribe (json) =>
|
||||
@_value = if json? then json.value else null
|
||||
@trigger()
|
||||
|
||||
_onAccountChanged: =>
|
||||
@_refresh()
|
||||
@reset()
|
||||
@trigger()
|
||||
|
||||
value: ->
|
||||
@_value
|
||||
|
||||
reset: ->
|
||||
@_value = null
|
||||
|
||||
_refresh: =>
|
||||
return if @_accountId is AccountStore.current()?.id
|
||||
@_accountId = AccountStore.current()?.id
|
||||
DatabaseStore.findJSONObject("ContactRankingsFor#{@_accountId}").then (json) =>
|
||||
@_value = if json? then json.value else null
|
||||
@trigger()
|
||||
|
||||
|
||||
module.exports = new ContactRankingStore()
|
||||
|
|
|
@ -12,11 +12,6 @@ class DatabaseSetupQueryBuilder
|
|||
|
||||
setupQueries: ->
|
||||
queries = []
|
||||
|
||||
# Add table for storing generic JSON blobs
|
||||
queries.push("CREATE TABLE IF NOT EXISTS `JSONObject` (key TEXT PRIMARY KEY, data BLOB)")
|
||||
queries.push("CREATE UNIQUE INDEX IF NOT EXISTS `JSONObject_id` ON `JSONObject` (`key`)")
|
||||
|
||||
for key, klass of DatabaseObjectRegistry.classMap()
|
||||
queries = queries.concat @setupQueriesForTable(klass)
|
||||
return queries
|
||||
|
|
|
@ -34,6 +34,10 @@ DEBUG_MISSING_ACCOUNT_ID = false
|
|||
BEGIN_TRANSACTION = 'BEGIN TRANSACTION'
|
||||
COMMIT = 'COMMIT'
|
||||
|
||||
class JSONBlobQuery extends ModelQuery
|
||||
formatResultObjects: (objects) =>
|
||||
return objects[0]?.json || null
|
||||
|
||||
###
|
||||
Public: N1 is built on top of a custom database layer modeled after
|
||||
ActiveRecord. For many parts of the application, the database is the source
|
||||
|
@ -395,14 +399,12 @@ class DatabaseStore extends NylasStore
|
|||
#
|
||||
# Returns a {Promise} that
|
||||
# - resolves with the result of the database query.
|
||||
run: (modelQuery) =>
|
||||
{waitForAnimations} = modelQuery.executeOptions()
|
||||
#
|
||||
run: (modelQuery, options = {format: true}) =>
|
||||
@_query(modelQuery.sql(), []).then (result) =>
|
||||
if waitForAnimations
|
||||
PriorityUICoordinator.settle.then =>
|
||||
Promise.resolve(modelQuery.formatResult(result))
|
||||
else
|
||||
Promise.resolve(modelQuery.formatResult(result))
|
||||
result = modelQuery.inflateResult(result)
|
||||
result = modelQuery.formatResultObjects(result) unless options.format is false
|
||||
Promise.resolve(result)
|
||||
|
||||
# Public: Asynchronously writes `model` to the cache and triggers a change event.
|
||||
#
|
||||
|
@ -485,16 +487,13 @@ class DatabaseStore extends NylasStore
|
|||
@_runMutationHooks('afterDatabaseChange', metadata, data)
|
||||
@_accumulateAndTrigger(metadata)
|
||||
|
||||
persistJSONObject: (key, json) ->
|
||||
jsonString = serializeRegisteredObjects(json)
|
||||
@_query("REPLACE INTO `JSONObject` (`key`,`data`) VALUES (?,?)", [key, jsonString]).then =>
|
||||
@trigger(new DatabaseChangeRecord({objectClass: 'JSONObject', objects: [{key: key, json: json}], type: 'persist'}))
|
||||
persistJSONBlob: (id, json) ->
|
||||
JSONBlob = require '../models/json-blob'
|
||||
@persistModel(new JSONBlob({id, json}))
|
||||
|
||||
findJSONObject: (key) ->
|
||||
@_query("SELECT `data` FROM `JSONObject` WHERE key = ? LIMIT 1", [key]).then (results) =>
|
||||
return Promise.resolve(null) unless results[0]
|
||||
data = deserializeRegisteredObjects(results[0].data)
|
||||
Promise.resolve(data)
|
||||
findJSONBlob: (id) ->
|
||||
JSONBlob = require '../models/json-blob'
|
||||
new JSONBlobQuery(JSONBlob, @).where({id}).one()
|
||||
|
||||
# Private: Mutation hooks allow you to observe changes to the database and
|
||||
# add additional functionality before and after the REPLACE / INSERT queries.
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
_ = require 'underscore'
|
||||
Rx = require 'rx-lite'
|
||||
|
||||
Utils = require '../models/utils'
|
||||
Actions = require '../actions'
|
||||
|
@ -12,7 +13,6 @@ FocusedContentStore = require './focused-content-store'
|
|||
# A store that handles the focuses collections of and individual contacts
|
||||
class FocusedContactsStore extends NylasStore
|
||||
constructor: ->
|
||||
@listenTo DatabaseStore, @_onDatabaseChanged
|
||||
@listenTo MessageStore, @_onMessageStoreChanged
|
||||
@listenTo Actions.focusContact, @_onFocusContact
|
||||
@_clearCurrentParticipants()
|
||||
|
@ -24,14 +24,6 @@ class FocusedContactsStore extends NylasStore
|
|||
# We need to wait now for the MessageStore to grab all of the
|
||||
# appropriate messages for the given thread.
|
||||
|
||||
_onDatabaseChanged: (change) =>
|
||||
return unless @_currentFocusedContact
|
||||
return unless change and change.objectClass is 'contact'
|
||||
current = _.find change.objects, (c) => c.email is @_currentFocusedContact.email
|
||||
if current
|
||||
@_currentFocusedContact = current
|
||||
@trigger()
|
||||
|
||||
_onMessageStoreChanged: =>
|
||||
threadId = if MessageStore.itemsLoading() then null else MessageStore.threadId()
|
||||
|
||||
|
@ -61,20 +53,26 @@ class FocusedContactsStore extends NylasStore
|
|||
_clearCurrentParticipants: ->
|
||||
@_contactScores = {}
|
||||
@_currentContacts = []
|
||||
@_unsubFocusedContact?.dispose()
|
||||
@_unsubFocusedContact = null
|
||||
@_currentFocusedContact = null
|
||||
@_currentThread = null
|
||||
|
||||
_onFocusContact: (contact) =>
|
||||
if not contact
|
||||
@_currentFocusedContact = null
|
||||
@trigger()
|
||||
else
|
||||
DatabaseStore.findBy(Contact, {
|
||||
@_unsubFocusedContact?.dispose()
|
||||
@_unsubFocusedContact = null
|
||||
|
||||
if contact
|
||||
query = DatabaseStore.findBy(Contact, {
|
||||
email: contact.email,
|
||||
accountId: @_currentThread.accountId
|
||||
}).then (match) =>
|
||||
})
|
||||
@_unsubFocusedContact = Rx.Observable.fromQuery(query).subscribe (match) =>
|
||||
@_currentFocusedContact = match ? contact
|
||||
@trigger()
|
||||
else
|
||||
@_currentFocusedContact = null
|
||||
@trigger()
|
||||
|
||||
# We score everyone to determine who's the most relevant to display in
|
||||
# the sidebar.
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
_ = require 'underscore'
|
||||
Rx = require 'rx-lite'
|
||||
AccountStore = require './account-store'
|
||||
DatabaseStore = require './database-store'
|
||||
NylasStore = require 'nylas-store'
|
||||
|
@ -7,25 +8,17 @@ class NylasSyncStatusStore extends NylasStore
|
|||
|
||||
constructor: ->
|
||||
@_statesByAccount = {}
|
||||
@_subscriptions = {}
|
||||
|
||||
@listenTo AccountStore, @_onAccountsChanged
|
||||
@listenTo DatabaseStore, @_onChange
|
||||
@_onAccountsChanged()
|
||||
|
||||
_onAccountsChanged: =>
|
||||
promises = []
|
||||
AccountStore.items().forEach (item) =>
|
||||
return if @_statesByAccount[item.id]
|
||||
promises.push DatabaseStore.findJSONObject("NylasSyncWorker:#{item.id}").then (json) =>
|
||||
query = DatabaseStore.findJSONBlob("NylasSyncWorker:#{item.id}")
|
||||
@_subscriptions[item.id] ?= Rx.Observable.fromQuery(query).subscribe (json) =>
|
||||
@_statesByAccount[item.id] = json ? {}
|
||||
Promise.all(promises).then =>
|
||||
@trigger()
|
||||
|
||||
_onChange: (change) =>
|
||||
if change.objectClass is 'JSONObject' and change.objects[0].key.indexOf('NylasSyncWorker') is 0
|
||||
[worker, accountId] = change.objects[0].key.split(':')
|
||||
@_statesByAccount[accountId] = change.objects[0].json
|
||||
@trigger()
|
||||
@trigger()
|
||||
|
||||
state: =>
|
||||
@_statesByAccount
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
_ = require 'underscore'
|
||||
Rx = require 'rx-lite'
|
||||
NylasStore = require 'nylas-store'
|
||||
DatabaseStore = require './database-store'
|
||||
AccountStore = require './account-store'
|
||||
|
@ -13,15 +14,10 @@ class TaskQueueStatusStore extends NylasStore
|
|||
@_queue = []
|
||||
@_waitingLocals = []
|
||||
@_waitingRemotes = []
|
||||
@listenTo DatabaseStore, @_onChange
|
||||
|
||||
DatabaseStore.findJSONObject(TaskQueue.JSONObjectStorageKey).then (json) =>
|
||||
query = DatabaseStore.findJSONBlob(TaskQueue.JSONBlobStorageKey)
|
||||
Rx.Observable.fromQuery(query).subscribe (json) =>
|
||||
@_queue = json || []
|
||||
@trigger()
|
||||
|
||||
_onChange: (change) =>
|
||||
if change.objectClass is 'JSONObject' and change.objects[0].key is 'task-queue'
|
||||
@_queue = change.objects[0].json
|
||||
@_waitingLocals = @_waitingLocals.filter ({taskId, resolve}) =>
|
||||
task = _.findWhere(@_queue, {id: taskId})
|
||||
if not task or task.queueState.localComplete
|
||||
|
|
|
@ -14,10 +14,10 @@ DatabaseStore = require './database-store'
|
|||
{APIError,
|
||||
TimeoutError} = require '../errors'
|
||||
|
||||
JSONObjectStorageKey = 'task-queue'
|
||||
JSONBlobStorageKey = 'task-queue'
|
||||
|
||||
if not NylasEnv.isWorkWindow() and not NylasEnv.inSpecMode()
|
||||
module.exports = {JSONObjectStorageKey}
|
||||
module.exports = {JSONBlobStorageKey}
|
||||
return
|
||||
|
||||
###
|
||||
|
@ -262,7 +262,7 @@ class TaskQueue
|
|||
return _.findWhere(@_queue, id: taskOrId)
|
||||
|
||||
_restoreQueue: =>
|
||||
DatabaseStore.findJSONObject(JSONObjectStorageKey).then (queue = []) =>
|
||||
DatabaseStore.findJSONBlob(JSONBlobStorageKey).then (queue = []) =>
|
||||
# We need to set the processing bit back to false so it gets
|
||||
# re-retried upon inflation
|
||||
for task in queue
|
||||
|
@ -273,7 +273,7 @@ class TaskQueue
|
|||
|
||||
_updateSoon: =>
|
||||
@_updateSoonThrottled ?= _.throttle =>
|
||||
DatabaseStore.persistJSONObject(JSONObjectStorageKey, @_queue ? [])
|
||||
DatabaseStore.persistJSONBlob(JSONBlobStorageKey, @_queue ? [])
|
||||
_.defer =>
|
||||
@_processQueue()
|
||||
@trigger()
|
||||
|
@ -281,3 +281,4 @@ class TaskQueue
|
|||
@_updateSoonThrottled()
|
||||
|
||||
module.exports = new TaskQueue()
|
||||
module.exports.JSONBlobStorageKey = JSONBlobStorageKey
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
Reflux = require 'reflux'
|
||||
Rx = require 'rx-lite'
|
||||
_ = require 'underscore'
|
||||
NylasStore = require 'nylas-store'
|
||||
CategoryStore = require './category-store'
|
||||
|
@ -10,7 +11,7 @@ Folder = require '../models/folder'
|
|||
Label = require '../models/label'
|
||||
WindowBridge = require '../../window-bridge'
|
||||
|
||||
JSONObjectKey = 'UnreadCounts-V2'
|
||||
JSONBlobKey = 'UnreadCounts-V2'
|
||||
|
||||
class CategoryDatabaseMutationObserver
|
||||
constructor: (@_countsDidChange) ->
|
||||
|
@ -55,26 +56,29 @@ class CategoryDatabaseMutationObserver
|
|||
|
||||
class ThreadCountsStore extends NylasStore
|
||||
CategoryDatabaseMutationObserver: CategoryDatabaseMutationObserver
|
||||
JSONObjectKey: JSONObjectKey
|
||||
JSONBlobKey: JSONBlobKey
|
||||
|
||||
constructor: ->
|
||||
@_counts = {}
|
||||
@_deltas = {}
|
||||
@_categories = []
|
||||
@_saveCountsSoon ?= _.throttle(@_saveCounts, 1000)
|
||||
|
||||
@listenTo DatabaseStore, @_onDatabaseChanged
|
||||
DatabaseStore.findJSONObject(JSONObjectKey).then (json) =>
|
||||
@_counts = json ? {}
|
||||
@trigger()
|
||||
|
||||
@_observer = new CategoryDatabaseMutationObserver(@_onCountsChanged)
|
||||
DatabaseStore.addMutationHook(@_observer)
|
||||
|
||||
if NylasEnv.isWorkWindow()
|
||||
@_loadCategories().then =>
|
||||
DatabaseStore.findJSONBlob(JSONBlobKey).then(@_onCountsBlobRead)
|
||||
Rx.Observable.combineLatest(
|
||||
Rx.Observable.fromQuery(DatabaseStore.findAll(Label)),
|
||||
Rx.Observable.fromQuery(DatabaseStore.findAll(Folder))
|
||||
).subscribe ([labels, folders]) =>
|
||||
@_categories = [].concat(labels, folders)
|
||||
@_fetchCountsMissing()
|
||||
|
||||
else
|
||||
query = DatabaseStore.findJSONBlob(JSONBlobKey)
|
||||
Rx.Observable.fromQuery(query).subscribe(@_onCountsBlobRead)
|
||||
|
||||
unreadCountForCategoryId: (catId) =>
|
||||
return null if @_counts[catId] is undefined
|
||||
@_counts[catId] + (@_deltas[catId] || 0)
|
||||
|
@ -82,21 +86,6 @@ class ThreadCountsStore extends NylasStore
|
|||
unreadCounts: =>
|
||||
@_counts
|
||||
|
||||
_onDatabaseChanged: (change) =>
|
||||
if NylasEnv.isWorkWindow()
|
||||
if change.objectClass in [Folder.name, Label.name]
|
||||
for obj in change.objects
|
||||
objIdx = _.findIndex @_categories, (cat) -> cat.id is obj.id
|
||||
if objIdx isnt -1
|
||||
@_categories[objIdx] = obj
|
||||
else
|
||||
@_categories.push(obj)
|
||||
@_fetchCountsMissing()
|
||||
|
||||
else if change.objectClass is 'JSONObject' and change.objects[0].key is JSONObjectKey
|
||||
@_counts = change.objects[0].json ? {}
|
||||
@trigger()
|
||||
|
||||
_onCountsChanged: (metadata) =>
|
||||
if not NylasEnv.isWorkWindow()
|
||||
WindowBridge.runInWorkWindow("ThreadCountsStore", "_onCountsChanged", [metadata])
|
||||
|
@ -107,13 +96,9 @@ class ThreadCountsStore extends NylasStore
|
|||
@_deltas[catId] += unread
|
||||
@_saveCountsSoon()
|
||||
|
||||
_loadCategories: =>
|
||||
Promise.props({
|
||||
folders: DatabaseStore.findAll(Folder)
|
||||
labels: DatabaseStore.findAll(Label)
|
||||
}).then ({folders, labels}) =>
|
||||
@_categories = [].concat(folders, labels)
|
||||
Promise.resolve()
|
||||
_onCountsBlobRead: (json) =>
|
||||
@_counts = json ? {}
|
||||
@trigger()
|
||||
|
||||
# Fetch a count, populate it in the cache, and then call ourselves to
|
||||
# populate the next missing count.
|
||||
|
@ -147,7 +132,7 @@ class ThreadCountsStore extends NylasStore
|
|||
@_counts[key] += count
|
||||
delete @_deltas[key]
|
||||
|
||||
DatabaseStore.persistJSONObject(JSONObjectKey, @_counts)
|
||||
DatabaseStore.persistJSONBlob(JSONBlobKey, @_counts)
|
||||
@trigger()
|
||||
|
||||
_fetchCountForCategory: (cat) =>
|
||||
|
|
|
@ -55,6 +55,7 @@ class NylasExports
|
|||
@load "SearchView", 'flux/stores/search-view'
|
||||
@load "DatabaseView", 'flux/stores/database-view'
|
||||
@load "DatabaseStore", 'flux/stores/database-store'
|
||||
@load "QuerySubscriptionPool", 'flux/models/query-subscription-pool'
|
||||
|
||||
# Database Objects
|
||||
# These need to be required immeidatley to populated the
|
||||
|
@ -70,6 +71,7 @@ class NylasExports
|
|||
@require "Category", 'flux/models/category'
|
||||
@require "Calendar", 'flux/models/calendar'
|
||||
@require "Metadata", 'flux/models/metadata'
|
||||
@require "JSONBlob", 'flux/models/json-blob'
|
||||
@require "DatabaseObjectRegistry", "database-object-registry"
|
||||
@require "MailViewFilter", 'mail-view-filter'
|
||||
|
||||
|
|
70
src/global/nylas-observables.coffee
Normal file
70
src/global/nylas-observables.coffee
Normal file
|
@ -0,0 +1,70 @@
|
|||
Rx = require 'rx-lite'
|
||||
_ = require 'underscore'
|
||||
QuerySubscriptionPool = require '../flux/models/query-subscription-pool'
|
||||
AccountStore = require '../flux/stores/account-store'
|
||||
DatabaseStore = require '../flux/stores/database-store'
|
||||
|
||||
AccountOperators = {}
|
||||
|
||||
AccountObservables =
|
||||
forCurrentId: ->
|
||||
observable = Rx.Observable
|
||||
.fromStore(AccountStore)
|
||||
.map -> AccountStore.current()?.id
|
||||
.distinctUntilChanged()
|
||||
_.extend(observable, AccountOperators)
|
||||
observable
|
||||
|
||||
CategoryOperators =
|
||||
sort: ->
|
||||
@.map (categories) ->
|
||||
return categories.sort (catA, catB) ->
|
||||
nameA = catA.displayName
|
||||
nameB = catB.displayName
|
||||
|
||||
# Categories that begin with [, like [Mailbox]/For Later
|
||||
# should appear at the bottom, because they're likely autogenerated.
|
||||
nameA = "ZZZ"+nameA if nameA[0] is '['
|
||||
nameB = "ZZZ"+nameB if nameB[0] is '['
|
||||
|
||||
nameA.localeCompare(nameB)
|
||||
|
||||
CategoryObservables =
|
||||
forCurrentAccount: ->
|
||||
observable = Rx.Observable.fromStore(AccountStore).flatMapLatest ->
|
||||
return CategoryObservables.forAccount(AccountStore.current())
|
||||
_.extend(observable, CategoryOperators)
|
||||
observable
|
||||
|
||||
forAllAccounts: =>
|
||||
observable = Rx.Observable.fromQuery(DatabaseStore.findAll(categoryClass))
|
||||
_.extend(observable, CategoryOperators)
|
||||
observable
|
||||
|
||||
forAccount: (account) =>
|
||||
if account
|
||||
categoryClass = account.categoryClass()
|
||||
observable = Rx.Observable.fromQuery(DatabaseStore.findAll(categoryClass).where(categoryClass.attributes.accountId.equal(account.id)))
|
||||
else
|
||||
observable = Rx.Observable.from([])
|
||||
_.extend(observable, CategoryOperators)
|
||||
observable
|
||||
|
||||
module.exports =
|
||||
Categories: CategoryObservables
|
||||
Accounts: AccountObservables
|
||||
|
||||
# Attach a few global helpers
|
||||
|
||||
Rx.Observable.fromStore = (store) =>
|
||||
return Rx.Observable.create (observer) =>
|
||||
unsubscribe = store.listen =>
|
||||
observer.onNext(store)
|
||||
observer.onNext(store)
|
||||
return Rx.Disposable.create(unsubscribe)
|
||||
|
||||
Rx.Observable.fromQuery = (query, options) =>
|
||||
return Rx.Observable.create (observer) =>
|
||||
unsubscribe = QuerySubscriptionPool.add query, options, (result) =>
|
||||
observer.onNext(result)
|
||||
return Rx.Disposable.create(unsubscribe)
|
Loading…
Reference in a new issue