mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-10-07 19:57:56 +08:00
fix(task-queue): performLocal now operates serially
This commit is contained in:
parent
ec5092c2f5
commit
5274ce3543
7 changed files with 121 additions and 6 deletions
|
@ -12,6 +12,9 @@ TaskRegistry = require('../../src/task-registry').default
|
||||||
KillsTaskA,
|
KillsTaskA,
|
||||||
BlockedByTaskA,
|
BlockedByTaskA,
|
||||||
BlockingTask,
|
BlockingTask,
|
||||||
|
Task100,
|
||||||
|
Task200,
|
||||||
|
Task300,
|
||||||
TaskAA,
|
TaskAA,
|
||||||
TaskBB} = require('./task-subclass')
|
TaskBB} = require('./task-subclass')
|
||||||
|
|
||||||
|
@ -35,6 +38,7 @@ describe "TaskQueue", ->
|
||||||
@unstartedTask = makeUnstartedTask(new Task())
|
@unstartedTask = makeUnstartedTask(new Task())
|
||||||
@processingTask = makeProcessing(new Task())
|
@processingTask = makeProcessing(new Task())
|
||||||
@retryInFutureTask = makeRetryInFuture(new Task())
|
@retryInFutureTask = makeRetryInFuture(new Task())
|
||||||
|
TaskQueue._runLocalPromise = Promise.resolve()
|
||||||
|
|
||||||
afterEach ->
|
afterEach ->
|
||||||
# Flush any throttled or debounced updates
|
# Flush any throttled or debounced updates
|
||||||
|
@ -99,8 +103,63 @@ describe "TaskQueue", ->
|
||||||
expect(TaskQueue._queue.length).toBe(1)
|
expect(TaskQueue._queue.length).toBe(1)
|
||||||
|
|
||||||
it "immediately calls runLocal", ->
|
it "immediately calls runLocal", ->
|
||||||
TaskQueue.enqueue(@unstartedTask)
|
waitsForPromise =>
|
||||||
expect(@unstartedTask.runLocal).toHaveBeenCalled()
|
TaskQueue.enqueue(@unstartedTask).then =>
|
||||||
|
expect(@unstartedTask.runLocal).toHaveBeenCalled()
|
||||||
|
|
||||||
|
it "correctly orders two task queues one after another", ->
|
||||||
|
t1 = new Task100()
|
||||||
|
t2 = new Task200()
|
||||||
|
t2b = new Task200()
|
||||||
|
t3 = new Task300()
|
||||||
|
spyOn(t1, "runLocal").andCallThrough()
|
||||||
|
spyOn(t2, "runLocal").andCallThrough()
|
||||||
|
spyOn(t2b, "runLocal").andCallThrough()
|
||||||
|
spyOn(t3, "runLocal").andCallThrough()
|
||||||
|
|
||||||
|
TaskQueue.enqueue(t1)
|
||||||
|
advanceClock(1) # Need to tick past the first Promise.resolve()
|
||||||
|
expect(t1.runLocal).toHaveBeenCalled()
|
||||||
|
expect(TaskQueue._queue.length).toBe(0)
|
||||||
|
|
||||||
|
TaskQueue.enqueue(t2)
|
||||||
|
# Blocked waiting for t1
|
||||||
|
expect(t2.runLocal).not.toHaveBeenCalled()
|
||||||
|
expect(TaskQueue._queue.length).toBe(0)
|
||||||
|
|
||||||
|
advanceClock(11) # Not enough for Task100's performLocal to clear
|
||||||
|
expect(t1.runLocal).toHaveBeenCalled()
|
||||||
|
expect(t2.runLocal).not.toHaveBeenCalled() # Still blocked on t1
|
||||||
|
expect(TaskQueue._queue.length).toBe(0)
|
||||||
|
|
||||||
|
# This clears Task100's timeout. Note performRemote has a 1000ms
|
||||||
|
# timeout.
|
||||||
|
advanceClock(100) # Clears timeouts
|
||||||
|
advanceClock(1) # Clears remaining Promises
|
||||||
|
# T1 performLocal is done now!
|
||||||
|
expect(TaskQueue._queue.length).toBe(1)
|
||||||
|
expect(TaskQueue._queue[0]).toBe(t1) # T1 on the queue
|
||||||
|
expect(t2.runLocal).toHaveBeenCalled() #T2 unblocked
|
||||||
|
|
||||||
|
# This clears Task200's timeout. Note performRemote has a 1000ms
|
||||||
|
# timeout.
|
||||||
|
advanceClock(200) # Clears timeouts
|
||||||
|
advanceClock(1) # Clears remaining promises
|
||||||
|
expect(TaskQueue._queue.length).toBe(2)
|
||||||
|
expect(TaskQueue._queue[1]).toBe(t2) # T2 on the queue
|
||||||
|
|
||||||
|
# All previous promise should have been resolved, meaning we only
|
||||||
|
# have to wait 1 tick for the freshly cleared queue to restart.
|
||||||
|
TaskQueue.enqueue(t3)
|
||||||
|
advanceClock(1)
|
||||||
|
expect(t3.runLocal).toHaveBeenCalled()
|
||||||
|
|
||||||
|
advanceClock(300) # Clears t3 performLocal
|
||||||
|
advanceClock(1) # Clears remaining promises
|
||||||
|
expect(TaskQueue._queue.length).toBe(3)
|
||||||
|
expect(TaskQueue._queue[2]).toBe(t3)
|
||||||
|
|
||||||
|
advanceClock(1500) # Clears Task300 off the queue
|
||||||
|
|
||||||
it "notifies the queue should be processed", ->
|
it "notifies the queue should be processed", ->
|
||||||
spyOn(TaskQueue, "_processQueue").andCallThrough()
|
spyOn(TaskQueue, "_processQueue").andCallThrough()
|
||||||
|
|
|
@ -56,3 +56,30 @@ export class OKTask extends Task {
|
||||||
export class BadTask extends Task {
|
export class BadTask extends Task {
|
||||||
performRemote() { return Promise.resolve('lalal') }
|
performRemote() { return Promise.resolve('lalal') }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export class Task100 extends Task {
|
||||||
|
performLocal() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 100))
|
||||||
|
}
|
||||||
|
performRemote() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 1000))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Task200 extends Task {
|
||||||
|
performLocal() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 200))
|
||||||
|
}
|
||||||
|
performRemote() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 1000))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Task300 extends Task {
|
||||||
|
performLocal() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 300))
|
||||||
|
}
|
||||||
|
performRemote() {
|
||||||
|
return new Promise((resolve) => setTimeout(resolve, 1000))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -99,6 +99,7 @@ describe('SyncbackDraftTask', function syncbackDraftTask() {
|
||||||
spyOn(this.taskC, "runLocal").andReturn(Promise.resolve());
|
spyOn(this.taskC, "runLocal").andReturn(Promise.resolve());
|
||||||
|
|
||||||
TaskQueue.enqueue(this.taskC);
|
TaskQueue.enqueue(this.taskC);
|
||||||
|
advanceClock(10)
|
||||||
|
|
||||||
// Note that taskB is gone, taskOther was untouched, and taskC was
|
// Note that taskB is gone, taskOther was untouched, and taskC was
|
||||||
// added.
|
// added.
|
||||||
|
|
|
@ -88,7 +88,7 @@ class TimeOverride
|
||||||
|
|
||||||
@_fakeSetInterval = (callback, ms) =>
|
@_fakeSetInterval = (callback, ms) =>
|
||||||
id = ++@intervalCount
|
id = ++@intervalCount
|
||||||
action = ->
|
action = =>
|
||||||
callback()
|
callback()
|
||||||
@intervalTimeouts[id] = @_fakeSetTimeout(action, ms)
|
@intervalTimeouts[id] = @_fakeSetTimeout(action, ms)
|
||||||
@intervalTimeouts[id] = @_fakeSetTimeout(action, ms)
|
@intervalTimeouts[id] = @_fakeSetTimeout(action, ms)
|
||||||
|
|
|
@ -111,6 +111,11 @@ class Contact extends Model
|
||||||
account = AccountStore.accountForEmail(@email)
|
account = AccountStore.accountForEmail(@email)
|
||||||
return account?
|
return account?
|
||||||
|
|
||||||
|
hasSameDomainAsMe: ->
|
||||||
|
for myEmail in AccountStore.emailAddresses()
|
||||||
|
return true if Utils.emailsHaveSameDomain(@email, myEmail)
|
||||||
|
return false
|
||||||
|
|
||||||
isMePhrase: ({includeAccountLabel, forceAccountLabel} = {}) ->
|
isMePhrase: ({includeAccountLabel, forceAccountLabel} = {}) ->
|
||||||
account = AccountStore.accountForEmail(@email)
|
account = AccountStore.accountForEmail(@email)
|
||||||
return null unless account
|
return null unless account
|
||||||
|
|
|
@ -242,6 +242,11 @@ class AccountStore extends NylasStore
|
||||||
return @accountForId(alias.accountId)
|
return @accountForId(alias.accountId)
|
||||||
return null
|
return null
|
||||||
|
|
||||||
|
emailAddresses: ->
|
||||||
|
addresses = _.pluck((@accounts() ? []), "emailAddress")
|
||||||
|
addresses = addresses.concat(_.pluck((@aliases() ? [])), "email")
|
||||||
|
return addresses
|
||||||
|
|
||||||
# Public: Returns the {Account} for the given account id, or null.
|
# Public: Returns the {Account} for the given account id, or null.
|
||||||
accountForId: (id) =>
|
accountForId: (id) =>
|
||||||
@_cachedGetter "accountForId:#{id}", => _.findWhere(@_accounts, {id})
|
@_cachedGetter "accountForId:#{id}", => _.findWhere(@_accounts, {id})
|
||||||
|
|
|
@ -77,6 +77,8 @@ class TaskQueue
|
||||||
@_updatePeriodicallyTimeout = null
|
@_updatePeriodicallyTimeout = null
|
||||||
@_currentSequentialId = Date.now()
|
@_currentSequentialId = Date.now()
|
||||||
|
|
||||||
|
@_runLocalPromise = Promise.resolve()
|
||||||
|
|
||||||
@_restoreQueue()
|
@_restoreQueue()
|
||||||
|
|
||||||
@listenTo Actions.queueTask, @enqueue
|
@listenTo Actions.queueTask, @enqueue
|
||||||
|
@ -137,9 +139,25 @@ class TaskQueue
|
||||||
task.sequentialId = ++@_currentSequentialId
|
task.sequentialId = ++@_currentSequentialId
|
||||||
|
|
||||||
@_dequeueObsoleteTasks(task)
|
@_dequeueObsoleteTasks(task)
|
||||||
task.runLocal().then =>
|
|
||||||
@_queue.push(task)
|
doRunLocal = =>
|
||||||
@_updateSoon()
|
task.runLocal().then =>
|
||||||
|
@_queue.push(task)
|
||||||
|
@_updateSoon()
|
||||||
|
return Promise.resolve()
|
||||||
|
|
||||||
|
# NOTE: runLocal now runs synchronously so when people build
|
||||||
|
# `performLocal` methods they can assume the entire set is atomic.
|
||||||
|
# `performLocal` very frequently has numerous reads and sets to the
|
||||||
|
# database and we don't want simultaneous tasks to make those reads
|
||||||
|
# and sets not atomic. While users could wrap their entire
|
||||||
|
# performLocals in Database transaction blocks, it's common to forget
|
||||||
|
# to do this and it will still block other tasks from accessing the
|
||||||
|
# database.
|
||||||
|
if !@_runLocalPromise.isPending()
|
||||||
|
# Reset to prevent memory leak of chain.
|
||||||
|
@_runLocalPromise = Promise.resolve()
|
||||||
|
@_runLocalPromise = @_runLocalPromise.then(doRunLocal)
|
||||||
|
|
||||||
enqueueUndoOfTaskId: (taskId) =>
|
enqueueUndoOfTaskId: (taskId) =>
|
||||||
task = _.findWhere(@_queue, {id: taskId})
|
task = _.findWhere(@_queue, {id: taskId})
|
||||||
|
|
Loading…
Add table
Reference in a new issue