fix(change-mail-task): Limit the parallelism of API requests via change mail task

No you can't make 1,100 API requests at the same time. Thanks. New specs to make sure there aren't any regressions in this behavior.
This commit is contained in:
Ben Gotow 2015-09-27 23:42:43 -07:00
parent 1278ca2677
commit 5fd751ea17
2 changed files with 90 additions and 2 deletions

View file

@ -354,13 +354,67 @@ describe "ChangeMailTask", ->
new Promise (resolve, reject) -> #noop
spyOn(@task, '_removeLock')
runs ->
@task.performRequests(Message, [@threadAMesage1])
@task.performRequests(Thread, [@threadAMesage1])
waitsFor ->
NylasAPI.makeRequest.callCount is 1
runs ->
NylasAPI.makeRequest.calls[0].args[0].beforeProcessing({})
expect(@task._removeLock).toHaveBeenCalledWith(@threadAMesage1)
it "should make no more than 10 requests at once", ->
resolves = []
spyOn(@task, '_removeLock')
spyOn(NylasAPI, 'makeRequest').andCallFake ->
new Promise (resolve, reject) -> resolves.push(resolve)
threads = []
threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100]
@task._restoreValues = _.map threads, (t) -> {some: 'data'}
@task.performRequests(Thread, threads)
advanceClock()
expect(resolves.length).toEqual(5)
advanceClock()
expect(resolves.length).toEqual(5)
resolves[0]()
resolves[1]()
advanceClock()
expect(resolves.length).toEqual(7)
resolves[idx]() for idx in [2...7]
advanceClock()
expect(resolves.length).toEqual(12)
it "should stop making requests after non-404 network errors", ->
resolves = []
rejects = []
spyOn(@task, '_removeLock')
spyOn(NylasAPI, 'makeRequest').andCallFake ->
new Promise (resolve, reject) ->
resolves.push(resolve)
rejects.push(reject)
threads = []
threads.push new Thread(id: "#{idx}", subject: idx) for idx in [0..100]
@task._restoreValues = _.map threads, (t) -> {some: 'data'}
@task.performRequests(Thread, threads)
advanceClock()
expect(resolves.length).toEqual(5)
resolves[idx]() for idx in [0...4]
advanceClock()
expect(resolves.length).toEqual(9)
# simulate request failure
reject = rejects[rejects.length - 1]
reject(new APIError(statusCode: 400))
advanceClock()
# simulate more requests succeeding
resolves[idx]() for idx in [5...9]
advanceClock()
# check that no more requests have been queued
expect(resolves.length).toEqual(9)
describe "optimistic object locking", ->
beforeEach ->
@task = new ChangeMailTask()

View file

@ -7,6 +7,40 @@ DatabaseStore = require '../stores/database-store'
AccountStore = require '../stores/account-store'
{APIError} = require '../errors'
# MapLimit is a small helper method that implements a promise version of
# Async.mapLimit. It runs the provided fn on each item in the `input` array,
# but only runs `numberInParallel` copies of `fn` at a time, resolving
# with an output array, or rejecting with an error if any execution of
# `fn` returns an error.
mapLimit = (input, numberInParallel, fn) ->
new Promise (resolve, reject) ->
idx = 0
inflight = 0
output = []
outputError = null
startNext = ->
startIdx = idx
idx += 1
inflight += 1
fn(input[startIdx])
.then (result) =>
output[startIdx] = result
return if outputError
inflight -= 1
if idx < input.length
startNext()
else if inflight is 0
resolve(output)
.catch (err) =>
outputError = err
reject(outputError)
numberInParallel = Math.min(numberInParallel, input.length)
startNext() for n in [0...numberInParallel]
# The ChangeMailTask is a base class for all tasks that modify sets of threads or
# messages. Subclasses implement `_changesToModel` and `_requestBodyForModel` to
# define the specific transforms they provide, and override `performLocal` to
@ -130,7 +164,7 @@ class ChangeMailTask extends Task
return Promise.resolve(Task.Status.Retry)
performRequests: (klass, models) ->
Promise.map models, (model) =>
mapLimit models, 5, (model) =>
# Don't bother making a web request if performLocal didn't modify this model
return Promise.resolve() unless @_restoreValues[model.id]