From 5274ce3543f45b4948be6b54b116342c42bf31bd Mon Sep 17 00:00:00 2001 From: Evan Morikawa Date: Wed, 21 Sep 2016 16:43:16 -0400 Subject: [PATCH] fix(task-queue): performLocal now operates serially --- spec/stores/task-queue-spec.coffee | 63 ++++++++++++++++++++++++- spec/stores/task-subclass.es6 | 27 +++++++++++ spec/tasks/syncback-draft-task-spec.es6 | 1 + spec/time-override.coffee | 2 +- src/flux/models/contact.coffee | 5 ++ src/flux/stores/account-store.coffee | 5 ++ src/flux/stores/task-queue.coffee | 24 ++++++++-- 7 files changed, 121 insertions(+), 6 deletions(-) diff --git a/spec/stores/task-queue-spec.coffee b/spec/stores/task-queue-spec.coffee index c7b92779d..0080d742e 100644 --- a/spec/stores/task-queue-spec.coffee +++ b/spec/stores/task-queue-spec.coffee @@ -12,6 +12,9 @@ TaskRegistry = require('../../src/task-registry').default KillsTaskA, BlockedByTaskA, BlockingTask, + Task100, + Task200, + Task300, TaskAA, TaskBB} = require('./task-subclass') @@ -35,6 +38,7 @@ describe "TaskQueue", -> @unstartedTask = makeUnstartedTask(new Task()) @processingTask = makeProcessing(new Task()) @retryInFutureTask = makeRetryInFuture(new Task()) + TaskQueue._runLocalPromise = Promise.resolve() afterEach -> # Flush any throttled or debounced updates @@ -99,8 +103,63 @@ describe "TaskQueue", -> expect(TaskQueue._queue.length).toBe(1) it "immediately calls runLocal", -> - TaskQueue.enqueue(@unstartedTask) - expect(@unstartedTask.runLocal).toHaveBeenCalled() + waitsForPromise => + TaskQueue.enqueue(@unstartedTask).then => + expect(@unstartedTask.runLocal).toHaveBeenCalled() + + it "correctly orders two task queues one after another", -> + t1 = new Task100() + t2 = new Task200() + t2b = new Task200() + t3 = new Task300() + spyOn(t1, "runLocal").andCallThrough() + spyOn(t2, "runLocal").andCallThrough() + spyOn(t2b, "runLocal").andCallThrough() + spyOn(t3, "runLocal").andCallThrough() + + TaskQueue.enqueue(t1) + advanceClock(1) # Need to tick past the first Promise.resolve() + expect(t1.runLocal).toHaveBeenCalled() + expect(TaskQueue._queue.length).toBe(0) + + TaskQueue.enqueue(t2) + # Blocked waiting for t1 + expect(t2.runLocal).not.toHaveBeenCalled() + expect(TaskQueue._queue.length).toBe(0) + + advanceClock(11) # Not enough for Task100's performLocal to clear + expect(t1.runLocal).toHaveBeenCalled() + expect(t2.runLocal).not.toHaveBeenCalled() # Still blocked on t1 + expect(TaskQueue._queue.length).toBe(0) + + # This clears Task100's timeout. Note performRemote has a 1000ms + # timeout. + advanceClock(100) # Clears timeouts + advanceClock(1) # Clears remaining Promises + # T1 performLocal is done now! + expect(TaskQueue._queue.length).toBe(1) + expect(TaskQueue._queue[0]).toBe(t1) # T1 on the queue + expect(t2.runLocal).toHaveBeenCalled() #T2 unblocked + + # This clears Task200's timeout. Note performRemote has a 1000ms + # timeout. + advanceClock(200) # Clears timeouts + advanceClock(1) # Clears remaining promises + expect(TaskQueue._queue.length).toBe(2) + expect(TaskQueue._queue[1]).toBe(t2) # T2 on the queue + + # All previous promise should have been resolved, meaning we only + # have to wait 1 tick for the freshly cleared queue to restart. + TaskQueue.enqueue(t3) + advanceClock(1) + expect(t3.runLocal).toHaveBeenCalled() + + advanceClock(300) # Clears t3 performLocal + advanceClock(1) # Clears remaining promises + expect(TaskQueue._queue.length).toBe(3) + expect(TaskQueue._queue[2]).toBe(t3) + + advanceClock(1500) # Clears Task300 off the queue it "notifies the queue should be processed", -> spyOn(TaskQueue, "_processQueue").andCallThrough() diff --git a/spec/stores/task-subclass.es6 b/spec/stores/task-subclass.es6 index 7c3fd5061..24554621d 100644 --- a/spec/stores/task-subclass.es6 +++ b/spec/stores/task-subclass.es6 @@ -56,3 +56,30 @@ export class OKTask extends Task { export class BadTask extends Task { performRemote() { return Promise.resolve('lalal') } } + +export class Task100 extends Task { + performLocal() { + return new Promise((resolve) => setTimeout(resolve, 100)) + } + performRemote() { + return new Promise((resolve) => setTimeout(resolve, 1000)) + } +} + +export class Task200 extends Task { + performLocal() { + return new Promise((resolve) => setTimeout(resolve, 200)) + } + performRemote() { + return new Promise((resolve) => setTimeout(resolve, 1000)) + } +} + +export class Task300 extends Task { + performLocal() { + return new Promise((resolve) => setTimeout(resolve, 300)) + } + performRemote() { + return new Promise((resolve) => setTimeout(resolve, 1000)) + } +} diff --git a/spec/tasks/syncback-draft-task-spec.es6 b/spec/tasks/syncback-draft-task-spec.es6 index f5435d689..59f498ab5 100644 --- a/spec/tasks/syncback-draft-task-spec.es6 +++ b/spec/tasks/syncback-draft-task-spec.es6 @@ -99,6 +99,7 @@ describe('SyncbackDraftTask', function syncbackDraftTask() { spyOn(this.taskC, "runLocal").andReturn(Promise.resolve()); TaskQueue.enqueue(this.taskC); + advanceClock(10) // Note that taskB is gone, taskOther was untouched, and taskC was // added. diff --git a/spec/time-override.coffee b/spec/time-override.coffee index e301c6add..b68cf55db 100644 --- a/spec/time-override.coffee +++ b/spec/time-override.coffee @@ -88,7 +88,7 @@ class TimeOverride @_fakeSetInterval = (callback, ms) => id = ++@intervalCount - action = -> + action = => callback() @intervalTimeouts[id] = @_fakeSetTimeout(action, ms) @intervalTimeouts[id] = @_fakeSetTimeout(action, ms) diff --git a/src/flux/models/contact.coffee b/src/flux/models/contact.coffee index c2d7f70da..60a6cc616 100644 --- a/src/flux/models/contact.coffee +++ b/src/flux/models/contact.coffee @@ -111,6 +111,11 @@ class Contact extends Model account = AccountStore.accountForEmail(@email) return account? + hasSameDomainAsMe: -> + for myEmail in AccountStore.emailAddresses() + return true if Utils.emailsHaveSameDomain(@email, myEmail) + return false + isMePhrase: ({includeAccountLabel, forceAccountLabel} = {}) -> account = AccountStore.accountForEmail(@email) return null unless account diff --git a/src/flux/stores/account-store.coffee b/src/flux/stores/account-store.coffee index 85b527f06..07ad23456 100644 --- a/src/flux/stores/account-store.coffee +++ b/src/flux/stores/account-store.coffee @@ -242,6 +242,11 @@ class AccountStore extends NylasStore return @accountForId(alias.accountId) return null + emailAddresses: -> + addresses = _.pluck((@accounts() ? []), "emailAddress") + addresses = addresses.concat(_.pluck((@aliases() ? [])), "email") + return addresses + # Public: Returns the {Account} for the given account id, or null. accountForId: (id) => @_cachedGetter "accountForId:#{id}", => _.findWhere(@_accounts, {id}) diff --git a/src/flux/stores/task-queue.coffee b/src/flux/stores/task-queue.coffee index 788dcf23f..6abfbc3e8 100644 --- a/src/flux/stores/task-queue.coffee +++ b/src/flux/stores/task-queue.coffee @@ -77,6 +77,8 @@ class TaskQueue @_updatePeriodicallyTimeout = null @_currentSequentialId = Date.now() + @_runLocalPromise = Promise.resolve() + @_restoreQueue() @listenTo Actions.queueTask, @enqueue @@ -137,9 +139,25 @@ class TaskQueue task.sequentialId = ++@_currentSequentialId @_dequeueObsoleteTasks(task) - task.runLocal().then => - @_queue.push(task) - @_updateSoon() + + doRunLocal = => + task.runLocal().then => + @_queue.push(task) + @_updateSoon() + return Promise.resolve() + + # NOTE: runLocal now runs synchronously so when people build + # `performLocal` methods they can assume the entire set is atomic. + # `performLocal` very frequently has numerous reads and sets to the + # database and we don't want simultaneous tasks to make those reads + # and sets not atomic. While users could wrap their entire + # performLocals in Database transaction blocks, it's common to forget + # to do this and it will still block other tasks from accessing the + # database. + if !@_runLocalPromise.isPending() + # Reset to prevent memory leak of chain. + @_runLocalPromise = Promise.resolve() + @_runLocalPromise = @_runLocalPromise.then(doRunLocal) enqueueUndoOfTaskId: (taskId) => task = _.findWhere(@_queue, {id: taskId})