diff --git a/spec-nylas/tasks/change-mail-task-spec.coffee b/spec-nylas/tasks/change-mail-task-spec.coffee index 74044b86e..a786cc1aa 100644 --- a/spec-nylas/tasks/change-mail-task-spec.coffee +++ b/spec-nylas/tasks/change-mail-task-spec.coffee @@ -354,13 +354,67 @@ describe "ChangeMailTask", -> new Promise (resolve, reject) -> #noop spyOn(@task, '_removeLock') runs -> - @task.performRequests(Message, [@threadAMesage1]) + @task.performRequests(Thread, [@threadAMesage1]) waitsFor -> NylasAPI.makeRequest.callCount is 1 runs -> NylasAPI.makeRequest.calls[0].args[0].beforeProcessing({}) expect(@task._removeLock).toHaveBeenCalledWith(@threadAMesage1) + it "should make no more than 10 requests at once", -> + resolves = [] + spyOn(@task, '_removeLock') + spyOn(NylasAPI, 'makeRequest').andCallFake -> + new Promise (resolve, reject) -> resolves.push(resolve) + + threads = [] + threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100] + @task._restoreValues = _.map threads, (t) -> {some: 'data'} + @task.performRequests(Thread, threads) + advanceClock() + expect(resolves.length).toEqual(5) + advanceClock() + expect(resolves.length).toEqual(5) + resolves[0]() + resolves[1]() + advanceClock() + expect(resolves.length).toEqual(7) + resolves[idx]() for idx in [2...7] + advanceClock() + expect(resolves.length).toEqual(12) + + + it "should stop making requests after non-404 network errors", -> + resolves = [] + rejects = [] + spyOn(@task, '_removeLock') + spyOn(NylasAPI, 'makeRequest').andCallFake -> + new Promise (resolve, reject) -> + resolves.push(resolve) + rejects.push(reject) + + threads = [] + threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100] + @task._restoreValues = _.map threads, (t) -> {some: 'data'} + @task.performRequests(Thread, threads) + advanceClock() + expect(resolves.length).toEqual(5) + resolves[idx]() for idx in [0...4] + advanceClock() + expect(resolves.length).toEqual(9) + + # simulate request failure + reject = rejects[rejects.length - 1] + reject(new APIError(statusCode: 400)) + advanceClock() + + # simulate more requests succeeding + resolves[idx]() for idx in [5...9] + advanceClock() + + # check that no more requests have been queued + expect(resolves.length).toEqual(9) + describe "optimistic object locking", -> beforeEach -> @task = new ChangeMailTask() diff --git a/src/flux/tasks/change-mail-task.coffee b/src/flux/tasks/change-mail-task.coffee index 5a11a569f..1f5b5a121 100644 --- a/src/flux/tasks/change-mail-task.coffee +++ b/src/flux/tasks/change-mail-task.coffee @@ -7,6 +7,40 @@ DatabaseStore = require '../stores/database-store' AccountStore = require '../stores/account-store' {APIError} = require '../errors' +# MapLimit is a small helper method that implements a promise version of +# Async.mapLimit. It runs the provided fn on each item in the `input` array, +# but only runs `numberInParallel` copies of `fn` at a time, resolving +# with an output array, or rejecting with an error if any execution of +# `fn` returns an error. +mapLimit = (input, numberInParallel, fn) -> + new Promise (resolve, reject) -> + idx = 0 + inflight = 0 + output = [] + outputError = null + + startNext = -> + startIdx = idx + idx += 1 + inflight += 1 + fn(input[startIdx]) + .then (result) => + output[startIdx] = result + return if outputError + + inflight -= 1 + if idx < input.length + startNext() + else if inflight is 0 + resolve(output) + + .catch (err) => + outputError = err + reject(outputError) + + numberInParallel = Math.min(numberInParallel, input.length) + startNext() for n in [0...numberInParallel] + # The ChangeMailTask is a base class for all tasks that modify sets of threads or # messages. Subclasses implement `_changesToModel` and `_requestBodyForModel` to # define the specific transforms they provide, and override `performLocal` to @@ -130,7 +164,7 @@ class ChangeMailTask extends Task return Promise.resolve(Task.Status.Retry) performRequests: (klass, models) -> - Promise.map models, (model) => + mapLimit models, 5, (model) => # Don't bother making a web request if performLocal didn't modify this model return Promise.resolve() unless @_restoreValues[model.id]