From 5fd751ea178a9a6bf324f43fb75830a0c3db516f Mon Sep 17 00:00:00 2001 From: Ben Gotow Date: Sun, 27 Sep 2015 23:42:43 -0700 Subject: [PATCH] fix(change-mail-task): Limit the parallelism of API requests via change mail task No you can't make 1,100 API requests at the same time. Thanks. New specs to make sure there aren't any regressions in this behavior. --- spec-nylas/tasks/change-mail-task-spec.coffee | 56 ++++++++++++++++++- src/flux/tasks/change-mail-task.coffee | 36 +++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/spec-nylas/tasks/change-mail-task-spec.coffee b/spec-nylas/tasks/change-mail-task-spec.coffee index 74044b86e..a786cc1aa 100644 --- a/spec-nylas/tasks/change-mail-task-spec.coffee +++ b/spec-nylas/tasks/change-mail-task-spec.coffee @@ -354,13 +354,67 @@ describe "ChangeMailTask", -> new Promise (resolve, reject) -> #noop spyOn(@task, '_removeLock') runs -> - @task.performRequests(Message, [@threadAMesage1]) + @task.performRequests(Thread, [@threadAMesage1]) waitsFor -> NylasAPI.makeRequest.callCount is 1 runs -> NylasAPI.makeRequest.calls[0].args[0].beforeProcessing({}) expect(@task._removeLock).toHaveBeenCalledWith(@threadAMesage1) + it "should make no more than 10 requests at once", -> + resolves = [] + spyOn(@task, '_removeLock') + spyOn(NylasAPI, 'makeRequest').andCallFake -> + new Promise (resolve, reject) -> resolves.push(resolve) + + threads = [] + threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100] + @task._restoreValues = _.map threads, (t) -> {some: 'data'} + @task.performRequests(Thread, threads) + advanceClock() + expect(resolves.length).toEqual(5) + advanceClock() + expect(resolves.length).toEqual(5) + resolves[0]() + resolves[1]() + advanceClock() + expect(resolves.length).toEqual(7) + resolves[idx]() for idx in [2...7] + advanceClock() + expect(resolves.length).toEqual(12) + + + it "should stop making requests after non-404 network errors", -> + resolves = [] + rejects = [] + spyOn(@task, '_removeLock') + spyOn(NylasAPI, 'makeRequest').andCallFake -> + new Promise (resolve, reject) -> + resolves.push(resolve) + rejects.push(reject) + + threads = [] + threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100] + @task._restoreValues = _.map threads, (t) -> {some: 'data'} + @task.performRequests(Thread, threads) + advanceClock() + expect(resolves.length).toEqual(5) + resolves[idx]() for idx in [0...4] + advanceClock() + expect(resolves.length).toEqual(9) + + # simulate request failure + reject = rejects[rejects.length - 1] + reject(new APIError(statusCode: 400)) + advanceClock() + + # simulate more requests succeeding + resolves[idx]() for idx in [5...9] + advanceClock() + + # check that no more requests have been queued + expect(resolves.length).toEqual(9) + describe "optimistic object locking", -> beforeEach -> @task = new ChangeMailTask() diff --git a/src/flux/tasks/change-mail-task.coffee b/src/flux/tasks/change-mail-task.coffee index 5a11a569f..1f5b5a121 100644 --- a/src/flux/tasks/change-mail-task.coffee +++ b/src/flux/tasks/change-mail-task.coffee @@ -7,6 +7,40 @@ DatabaseStore = require '../stores/database-store' AccountStore = require '../stores/account-store' {APIError} = require '../errors' +# MapLimit is a small helper method that implements a promise version of +# Async.mapLimit. It runs the provided fn on each item in the `input` array, +# but only runs `numberInParallel` copies of `fn` at a time, resolving +# with an output array, or rejecting with an error if any execution of +# `fn` returns an error. +mapLimit = (input, numberInParallel, fn) -> + new Promise (resolve, reject) -> + idx = 0 + inflight = 0 + output = [] + outputError = null + + startNext = -> + startIdx = idx + idx += 1 + inflight += 1 + fn(input[startIdx]) + .then (result) => + output[startIdx] = result + return if outputError + + inflight -= 1 + if idx < input.length + startNext() + else if inflight is 0 + resolve(output) + + .catch (err) => + outputError = err + reject(outputError) + + numberInParallel = Math.min(numberInParallel, input.length) + startNext() for n in [0...numberInParallel] + # The ChangeMailTask is a base class for all tasks that modify sets of threads or # messages. Subclasses implement `_changesToModel` and `_requestBodyForModel` to # define the specific transforms they provide, and override `performLocal` to @@ -130,7 +164,7 @@ class ChangeMailTask extends Task return Promise.resolve(Task.Status.Retry) performRequests: (klass, models) -> - Promise.map models, (model) => + mapLimit models, 5, (model) => # Don't bother making a web request if performLocal didn't modify this model return Promise.resolve() unless @_restoreValues[model.id]