fix(queue): Delay retries up to 30s when tasks request a retry

Summary:
This is a critical patch that fixes two problems with the task queue:

1. Tasks in Status: Retry are retried the next time processQueue is run,
   which could be pretty much immediately. Certain scenarios lead to tasks
   running in a hard loop forever.

2. Returning Task.Status.Retry set the retry flags on the task but did not
   schedule the queue to be processed again. So if only a single item in the
   queue was present, it might never be retried again until the user performed
   another action.

Test Plan: Where did the specs for TaskQueue go? There aren't many... need to write more.

Reviewers: evan

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D2762
This commit is contained in:
Ben Gotow 2016-03-18 13:25:30 -07:00
parent 55c205fe86
commit 31706d8890
3 changed files with 88 additions and 12 deletions

View file

@ -22,10 +22,16 @@ describe "TaskQueue", ->
task.queueState.isProcessing = true
task
makeRetryInFuture = (task) ->
task.queueState.retryAfter = Date.now() + 1000
task.queueState.retryDelay = 1000
task
beforeEach ->
@task = new Task()
@unstartedTask = makeUnstartedTask(new Task())
@processingTask = makeProcessing(new Task())
@retryInFutureTask = makeRetryInFuture(new Task())
afterEach ->
# Flush any throttled or debounced updates
@ -33,7 +39,7 @@ describe "TaskQueue", ->
describe "restoreQueue", ->
it "should fetch the queue from the database, reset flags and start processing", ->
queue = [@processingTask, @unstartedTask]
queue = [@processingTask, @unstartedTask, @retryInFutureTask]
spyOn(DatabaseStore, 'findJSONBlob').andCallFake => Promise.resolve(queue)
spyOn(TaskQueue, '_updateSoon')
@ -41,6 +47,8 @@ describe "TaskQueue", ->
TaskQueue._restoreQueue().then =>
expect(TaskQueue._queue).toEqual(queue)
expect(@processingTask.queueState.isProcessing).toEqual(false)
expect(@retryInFutureTask.queueState.retryAfter).toEqual(undefined)
expect(@retryInFutureTask.queueState.retryDelay).toEqual(undefined)
expect(TaskQueue._updateSoon).toHaveBeenCalled()
describe "findTask", ->
@ -169,12 +177,7 @@ describe "TaskQueue", ->
expect(TaskQueue._queue.length).toBe(2)
expect(TaskQueue._completed.length).toBe(0)
describe "process Task", ->
it "doesn't process processing tasks", ->
spyOn(@processingTask, "runRemote").andCallFake -> Promise.resolve()
TaskQueue._processTask(@processingTask)
expect(@processingTask.runRemote).not.toHaveBeenCalled()
describe "_processQueue", ->
it "doesn't process blocked tasks", ->
class BlockedByTaskA extends Task
isDependentOnTask: (other) -> other instanceof TaskSubclassA
@ -210,14 +213,62 @@ describe "TaskQueue", ->
advanceClock()
blockedTask.runRemote.callCount > 0
describe "_processTask", ->
it "doesn't process processing tasks", ->
spyOn(@processingTask, "runRemote").andCallFake -> Promise.resolve()
TaskQueue._processTask(@processingTask)
expect(@processingTask.runRemote).not.toHaveBeenCalled()
it "sets the processing bit", ->
spyOn(@unstartedTask, "runRemote").andCallFake -> Promise.resolve()
task = new Task()
task.queueState.localComplete = true
TaskQueue._queue = [task]
TaskQueue._processTask(task)
expect(task.queueState.isProcessing).toBe true
describe "when the task returns Task.Status.Retry", ->
beforeEach ->
@retryTaskWith = (qs) =>
task = new Task()
task.performRemote = =>
return Promise.resolve(Task.Status.Retry)
task.queueState.localComplete = true
task.queueState.retryDelay = qs.retryDelay
task.queueState.retryAfter = qs.retryAfter
return task
it "sets retryAfter and retryDelay", ->
task = @retryTaskWith({})
TaskQueue._queue = [task]
TaskQueue._processTask(task)
advanceClock()
expect(task.queueState.retryAfter).toBeDefined()
expect(task.queueState.retryDelay).toEqual(1000 * 1.2)
it "increases retryDelay", ->
task = @retryTaskWith({retryAfter: Date.now() - 1000, retryDelay: 2000})
TaskQueue._queue = [task]
TaskQueue._processTask(task)
advanceClock()
expect(task.queueState.retryAfter).toBeDefined()
expect(task.queueState.retryDelay).toEqual(2000 * 1.2)
it "caps retryDelay", ->
task = @retryTaskWith({retryAfter: Date.now() - 1000, retryDelay: 30000})
TaskQueue._queue = [task]
TaskQueue._processTask(task)
advanceClock()
expect(task.queueState.retryAfter).toBeDefined()
expect(task.queueState.retryDelay).toEqual(30000)
it "calls updateSoon", ->
task = @retryTaskWith({})
TaskQueue._queue = [task]
spyOn(TaskQueue, '_updateSoon')
TaskQueue._processTask(task)
advanceClock()
expect(TaskQueue._updateSoon).toHaveBeenCalled()
describe "handling task runRemote task errors", ->
spyBBRemote = jasmine.createSpy("performRemote")
spyCCRemote = jasmine.createSpy("performRemote")

View file

@ -174,17 +174,32 @@ class TaskQueue
_processQueue: =>
started = 0
if @_processQueueTimeout
clearTimeout(@_processQueueTimeout)
@_processQueueTimeout = null
now = Date.now()
reprocessIn = Number.MAX_VALUE
for task in @_queue by -1
if @_taskIsBlocked(task)
task.queueState.debugStatus = Task.DebugStatus.WaitingOnDependency
continue
else
@_processTask(task)
started += 1
if task.queueState.retryAfter and task.queueState.retryAfter > now
reprocessIn = Math.min(task.queueState.retryAfter - now, reprocessIn)
task.queueState.debugStatus = Task.DebugStatus.WaitingToRetry
continue
@_processTask(task)
started += 1
if started > 0
@trigger()
if reprocessIn isnt Number.MAX_VALUE
@_processQueueTimeout = setTimeout(@_processQueue, reprocessIn + 500)
_processTask: (task) =>
return if task.queueState.isProcessing
@ -194,7 +209,13 @@ class TaskQueue
task.queueState.isProcessing = false
@trigger()
.then (status) =>
@dequeue(task) unless status is Task.Status.Retry
if status is Task.Status.Retry
task.queueState.retryDelay = Math.round(Math.min((task.queueState.retryDelay ? 1000) * 1.2, 30000))
task.queueState.retryAfter = Date.now() + task.queueState.retryDelay
else
@dequeue(task)
@_updateSoon()
.catch (err) =>
@_seenDownstream = {}
@_notifyOfDependentError(task, err)
@ -275,6 +296,9 @@ class TaskQueue
for task in queue
task.queueState ?= {}
task.queueState.isProcessing = false
delete task.queueState['retryAfter']
delete task.queueState['retryDelay']
@_queue = queue
@_updateSoon()

View file

@ -12,6 +12,7 @@ TaskDebugStatus =
DequeuedObsolete: "DEQUEUED (Obsolete)"
DequeuedDependency: "DEQUEUED (Dependency Failure)"
WaitingOnQueue: "WAITING ON QUEUE"
WaitingToRetry: "WAITING TO RETRY"
WaitingOnDependency: "WAITING ON DEPENDENCY"
RunningLocal: "RUNNING LOCAL"
ProcessingRemote: "PROCESSING REMOTE"