Begin cleanup of Send Task

This commit is contained in:
Ben Gotow 2016-01-26 15:39:40 -08:00
parent c7812b9c46
commit c8bd09a260
8 changed files with 114 additions and 367 deletions

View file

@ -219,17 +219,13 @@ describe "TaskQueue", ->
expect(task.queueState.isProcessing).toBe true
describe "handling task runRemote task errors", ->
spyAACallback = jasmine.createSpy("onDependentTaskError")
spyBBRemote = jasmine.createSpy("performRemote")
spyBBCallback = jasmine.createSpy("onDependentTaskError")
spyCCRemote = jasmine.createSpy("performRemote")
spyCCCallback = jasmine.createSpy("onDependentTaskError")
beforeEach ->
testError = new Error("Test Error")
@testError = testError
class TaskAA extends Task
onDependentTaskError: spyAACallback
performRemote: ->
# We reject instead of `throw` because jasmine thinks this
# `throw` is in the context of the test instead of the context
@ -238,22 +234,12 @@ describe "TaskQueue", ->
class TaskBB extends Task
isDependentTask: (other) -> other instanceof TaskAA
onDependentTaskError: spyBBCallback
performRemote: spyBBRemote
class TaskCC extends Task
isDependentTask: (other) -> other instanceof TaskBB
onDependentTaskError: (task, err) ->
spyCCCallback(task, err)
return Task.DO_NOT_DEQUEUE_ME
performRemote: spyCCRemote
@taskAA = new TaskAA
@taskAA.queueState.localComplete = true
@taskBB = new TaskBB
@taskBB.queueState.localComplete = true
@taskCC = new TaskCC
@taskCC.queueState.localComplete = true
spyOn(TaskQueue, 'trigger')
@ -267,30 +253,3 @@ describe "TaskQueue", ->
expect(TaskQueue.dequeue).toHaveBeenCalledWith(@taskAA)
expect(spyAACallback).not.toHaveBeenCalled()
expect(@taskAA.queueState.remoteError.message).toBe "Test Error"
it "calls `onDependentTaskError` on dependent tasks", ->
spyOn(TaskQueue, 'dequeue').andCallThrough()
TaskQueue._queue = [@taskAA, @taskBB, @taskCC]
waitsForPromise =>
TaskQueue._processTask(@taskAA).then =>
expect(TaskQueue.dequeue.calls.length).toBe 2
# NOTE: The recursion goes depth-first. The leafs are called
# first
expect(TaskQueue.dequeue.calls[0].args[0]).toBe @taskBB
expect(TaskQueue.dequeue.calls[1].args[0]).toBe @taskAA
expect(spyAACallback).not.toHaveBeenCalled()
expect(spyBBCallback).toHaveBeenCalledWith(@taskAA, @testError)
expect(@taskAA.queueState.remoteError.message).toBe "Test Error"
expect(@taskBB.queueState.status).toBe Task.Status.Continue
expect(@taskBB.queueState.debugStatus).toBe Task.DebugStatus.DequeuedDependency
it "dequeues all dependent tasks except those that return `Task.DO_NOT_DEQUEUE_ME` from their callbacks", ->
spyOn(TaskQueue, 'dequeue').andCallThrough()
TaskQueue._queue = [@taskAA, @taskBB, @taskCC]
waitsForPromise =>
TaskQueue._processTask(@taskAA).then =>
expect(TaskQueue._queue).toEqual [@taskCC]
expect(spyCCCallback).toHaveBeenCalledWith(@taskBB, @testError)
expect(@taskCC.queueState.status).toBe null
expect(@taskCC.queueState.debugStatus).toBe Task.DebugStatus.JustConstructed

View file

@ -46,12 +46,10 @@ class DraftChangeSet
clearTimeout(@_timer) if @_timer
@_timer = setTimeout(@commit, 30000)
# If force is true, then we'll always run the `_onCommit` callback
# regardless if there are _pending changes or not
commit: ({force, noSyncback}={}) =>
commit: ({noSyncback}={}) =>
@_commitChain = @_commitChain.finally =>
if not force and Object.keys(@_pending).length is 0
if Object.keys(@_pending).length is 0
return Promise.resolve(true)
@_saving = @_pending

View file

@ -512,16 +512,8 @@ class DraftStore
# We do, however, need to ensure that all of the pending changes are
# committed to the Database since we'll look them up again just
# before send.
session.changes.commit(force: true, noSyncback: true).then =>
draft = session.draft()
# We unfortunately can't give the SendDraftTask the raw draft JSON
# data because there may still be pending tasks (like a
# {FileUploadTask}) that will continue to update the draft data.
opts =
threadId: draft.threadId
replyToMessageId: draft.replyToMessageId
task = new SendDraftTask(draftClientId, opts)
session.changes.commit(noSyncback: true).then =>
task = new SendDraftTask(session.draft())
Actions.queueTask(task)
# NOTE: We may be done with the session in this window, but there

View file

@ -203,12 +203,9 @@ class TaskQueue
responses = _.filter responses, (r) -> r?
responses.forEach (resp) =>
if resp.returnValue is Task.DO_NOT_DEQUEUE_ME
return
else
resp.downstreamTask.queueState.status = Task.Status.Continue
resp.downstreamTask.queueState.debugStatus = Task.DebugStatus.DequeuedDependency
@dequeue(resp.downstreamTask)
resp.downstreamTask.queueState.status = Task.Status.Continue
resp.downstreamTask.queueState.debugStatus = Task.DebugStatus.DequeuedDependency
@dequeue(resp.downstreamTask)
# Recursively notifies tasks of dependent errors
_notifyOfDependentError: (failedTask, err) ->

View file

@ -1,162 +0,0 @@
fs = require 'fs'
_ = require 'underscore'
crypto = require 'crypto'
pathUtils = require 'path'
Task = require './task'
{APIError} = require '../errors'
File = require '../models/file'
Message = require '../models/message'
Actions = require '../actions'
AccountStore = require '../stores/account-store'
DatabaseStore = require '../stores/database-store'
{isTempId} = require '../models/utils'
NylasAPI = require '../nylas-api'
Utils = require '../models/utils'
UploadCounter = 0
class FileUploadTask extends Task
constructor: (@filePath, @messageClientId) ->
super
@_startDate = Date.now()
@_startId = UploadCounter
UploadCounter += 1
@progress = null # The progress checking timer.
performLocal: ->
return Promise.reject(new Error("Must pass an absolute path to upload")) unless @filePath?.length
return Promise.reject(new Error("Must be attached to a messageClientId")) unless isTempId(@messageClientId)
Actions.uploadStateChanged @_uploadData("pending")
Promise.resolve()
performRemote: ->
Actions.uploadStateChanged @_uploadData("started")
DatabaseStore.findBy(Message, {clientId: @messageClientId}).then (draft) =>
if not draft
err = new Error("Can't find draft #{@messageClientId} in Database to upload file to")
return Promise.resolve([Task.Status.Failed, err])
@_accountId = draft.accountId
@_makeRequest()
.then @_performRemoteParseFile
.then @_performRemoteAttachFile
.then (file) =>
Actions.uploadStateChanged @_uploadData("completed")
Actions.fileUploaded(file: file, uploadData: @_uploadData("completed"))
return Promise.resolve(Task.Status.Success)
.catch APIError, (err) =>
if err.statusCode in NylasAPI.PermanentErrorCodes
msg = "There was a problem uploading this file. Please try again later."
Actions.uploadStateChanged(@_uploadData("failed"))
Actions.postNotification({message: msg, type: "error"})
return Promise.resolve([Task.Status.Failed, err])
else if err.statusCode is NylasAPI.CancelledErrorCode
Actions.uploadStateChanged(@_uploadData("aborted"))
Actions.fileAborted(@_uploadData("aborted"))
return Promise.resolve(Task.Status.Failed)
else
return Promise.resolve(Task.Status.Retry)
_makeRequest: =>
started = (req) =>
@req = req
@progress = setInterval =>
Actions.uploadStateChanged(@_uploadData("progress"))
, 250
cleanup = =>
clearInterval(@progress)
@req = null
NylasAPI.makeRequest
path: "/files"
accountId: @_accountId
method: "POST"
json: false
formData: @_formData()
started: started
timeout: 20 * 60 * 1000
.finally(cleanup)
_performRemoteParseFile: (rawResponseString) =>
# The Nylas API returns the file json wrapped in an array.
# Since we requested `json:false` the response will come back as
# a raw string.
json = JSON.parse(rawResponseString)
file = (new File).fromJSON(json[0])
Promise.resolve(file)
_performRemoteAttachFile: (file) =>
# The minute we know what file is associated with the upload, we need
# to fire an Action to notify a popout window's FileUploadStore that
# these two objects are linked. We unfortunately can't wait until
# `_attachFileToDraft` resolves, because that will resolve after the
# DB transaction is completed AND all of the callbacks have fired.
# Unfortunately in the callback chain is a render method which means
# that the upload will be left on the page for a split second before
# we know the file has been uploaded.
#
# Associating the upload with the file ahead of time can let the
# Composer know which ones to ignore when de-duping the upload/file
# listing.
Actions.linkFileToUpload(file: file, uploadData: @_uploadData("completed"))
DraftStore = require '../stores/draft-store'
DraftStore.sessionForClientId(@messageClientId).then (session) =>
files = _.clone(session.draft().files) ? []
files.push(file)
session.changes.add({files})
session.changes.commit().then ->
return file
cancel: ->
super
# Note: When you call cancel, we stop the request, which causes
# NylasAPI.makeRequest to reject with an error.
return unless @req
@req.abort()
# Helper Methods
_formData: ->
file: # Must be named `file` as per the Nylas API spec
value: fs.createReadStream(@filePath)
options:
filename: @_uploadData().fileName
# returns:
# messageClientId - The clientId of the message (draft) we're uploading to
# filePath - The full absolute local system file path
# fileSize - The size in bytes
# fileName - The basename of the file
# bytesUploaded - Current number of bytes uploaded
# state - one of "pending" "started" "progress" "completed" "aborted" "failed"
_uploadData: (state) ->
@_memoUploadData ?=
uploadTaskId: @id
startDate: @_startDate
startId: @_startId
messageClientId: @messageClientId
filePath: @filePath
fileSize: @_getFileSize(@filePath)
fileName: pathUtils.basename(@filePath)
@_memoUploadData.bytesUploaded = @_getBytesUploaded()
@_memoUploadData.state = state if state?
return _.extend({}, @_memoUploadData)
_getFileSize: (path) ->
fs.statSync(path)["size"]
_getBytesUploaded: ->
# https://github.com/request/request/issues/941
# http://stackoverflow.com/questions/12098713/upload-progress-request
@req?.req?.connection?._bytesDispatched ? 0
module.exports = FileUploadTask

View file

@ -1,4 +1,6 @@
_ = require 'underscore'
fs = require 'fs'
path = require 'path'
Task = require './task'
Actions = require '../actions'
Message = require '../models/message'
@ -7,162 +9,154 @@ TaskQueue = require '../stores/task-queue'
{APIError} = require '../errors'
SoundRegistry = require '../../sound-registry'
DatabaseStore = require '../stores/database-store'
FileUploadTask = require './file-upload-task'
class NotFoundError extends Error
constructor: -> super
class MultiRequestProgressMonitor
constructor: =>
@_requests = {}
@_expected = {}
add: (filepath, request) =>
@_requests[filepath] = request
@_expected[filepath] = fs.statSync(filepath)["size"] ? 0
remove: (filepath) =>
delete @_requests[filepath]
delete @_expected[filepath]
progress: =>
sent = 0
expected = 0
for filepath, req of @_requests
sent += @req?.req?.connection?._bytesDispatched ? 0
expected += @_expected[filepath]
return sent / expected
module.exports =
class SendDraftTask extends Task
constructor: (@draftClientId, {@threadId, @replyToMessageId}={}) ->
constructor: (@draft, @attachmentPaths) ->
@_progress = new MultiRequestProgressMonitor()
super
label: ->
"Sending draft..."
shouldDequeueOtherTask: (other) ->
other instanceof SendDraftTask and other.draftClientId is @draftClientId
isDependentTask: (other) ->
(other instanceof FileUploadTask and other.messageClientId is @draftClientId)
onDependentTaskError: (task, err) ->
if task instanceof FileUploadTask
msg = "Your message could not be sent because a file failed to upload. Please try re-uploading your file and try again."
@_notifyUserOfError(msg) if msg
other instanceof SendDraftTask and other.draft.clientId is @draft.clientId
performLocal: ->
if not @draftClientId
return Promise.reject(new Error("Attempt to call SendDraftTask.performLocal without @draftClientId."))
# It's possible that between a user requesting the draft to send and
# the queue eventualy getting around to the `performLocal`, the Draft
# object may have been deleted. This could be caused by a user
# accidentally hitting "delete" on the same draft in another popout
# window. If this happens, `performRemote` will fail when we try and
# look up the draft by its clientId.
#
# In this scenario, we don't want to send, but want to restore the
# draft and notify the user to try again. In order to safely do this
# we need to keep a backup to restore.
DatabaseStore.findBy(Message, clientId: @draftClientId).include(Message.attributes.body).then (draftModel) =>
@backupDraft = draftModel.clone()
return Promise.reject(new Error("SendDraftTask must be provided a draft.")) unless @draft
Promise.resolve()
performRemote: ->
@_fetchLatestDraft()
.then(@_makeSendRequest)
.then(@_saveNewMessage)
.then(@_deleteRemoteDraft)
.then(@_notifySuccess)
@_uploadAttachments()
.then(@_sendAndCreateMessage)
.then(@_deleteDraft)
.then(@_onSuccess)
.catch(@_onError)
_fetchLatestDraft: ->
DatabaseStore.findBy(Message, clientId: @draftClientId).include(Message.attributes.body).then (draftModel) =>
@draftAccountId = draftModel.accountId
@draftServerId = draftModel.serverId
@draftVersion = draftModel.version
if not draftModel
throw new NotFoundError("#{@draftClientId} not found")
return draftModel
.catch (err) =>
throw new NotFoundError("#{@draftClientId} not found")
_uploadAttachments: =>
Promise.all @attachmentPaths.map (filepath) =>
NylasAPI.makeRequest
path: "/files"
accountId: @draft.accountId
method: "POST"
json: false
formData:
file: # Must be named `file` as per the Nylas API spec
value: fs.createReadStream(filepath)
options:
filename: path.basename(filepath)
started: (req) =>
@_progress.add(filepath, req)
timeout: 20 * 60 * 1000
.finally: =>
@_progress.remove(filepath)
.then (file) =>
@draft.files.push(file)
_makeSendRequest: (draftModel) =>
_sendAndCreateMessage: =>
NylasAPI.makeRequest
path: "/send"
accountId: @draftAccountId
accountId: @draft.accountId
method: 'POST'
body: draftModel.toJSON()
timeout: 1000 * 60 * 5 # We cannot hang up a send - won't know if it sent
returnsModel: false
.catch (err) =>
tryAgainDraft = draftModel.clone()
# If the message you're "replying to" were deleted
if err.message?.indexOf('Invalid message public id') is 0
tryAgainDraft.replyToMessageId = null
return @_makeSendRequest(tryAgainDraft)
@draft.replyToMessageId = null
return @_sendAndCreateMessage()
# If the thread was deleted
else if err.message?.indexOf('Invalid thread') is 0
tryAgainDraft.threadId = null
tryAgainDraft.replyToMessageId = null
return @_makeSendRequest(tryAgainDraft)
else return Promise.reject(err)
@draft.threadId = null
@draft.replyToMessageId = null
return @_sendAndCreateMessage()
# The JSON returned from the server will be the new Message.
#
# Our old draft may or may not have a serverId. We update the draft with
# whatever the server returned (which includes a serverId).
#
# We then save the model again (keyed by its client_id) to indicate that
# it is no longer a draft, but rather a Message (draft: false) with a
# valid serverId.
_saveNewMessage: (newMessageJSON) =>
@message = new Message().fromJSON(newMessageJSON)
@message.clientId = @draftClientId
@message.draft = false
return DatabaseStore.inTransaction (t) =>
t.persistModel(@message)
else
return Promise.reject(err)
# We DON'T need to delete the local draft because we actually transmute
# it into a {Message} by setting the `draft` flat to `true` in the
# `_saveNewMessage` method.
.then (newMessageJSON) =>
@message = new Message().fromJSON(newMessageJSON)
@message.clientId = @draft.clientId
@message.draft = false
DatabaseStore.inTransaction (t) =>
t.persistModel(@message)
# We DON'T need to delete the local draft because we turn it into a message
# by writing the new message into the database with the same clientId.
#
# We DO, however, need to make sure that the remote draft has been
# cleaned up.
# We DO, need to make sure that the remote draft has been cleaned up.
#
# Not all drafts will have a server component. Only those that have been
# persisted by a {SyncbackDraftTask} will have a `serverId`.
_deleteRemoteDraft: =>
return Promise.resolve() unless @draftServerId
# Return if the draft hasn't been saved server-side (has no `serverId`).
return Promise.resolve() unless @draft.serverId
NylasAPI.makeRequest
path: "/drafts/#{@draftServerId}"
accountId: @draftAccountId
path: "/drafts/#{@draft.serverId}"
accountId: @draft.accountId
method: "DELETE"
body: version: @draftVersion
body:
version: @draft.version
returnsModel: false
.catch APIError, (err) =>
# If the draft failed to delete remotely, we don't really care. It
# shouldn't stop the send draft task from continuing.
console.error("Deleting the draft remotely failed", err)
Promise.resolve()
_notifySuccess: =>
_onSuccess: =>
Actions.sendDraftSuccess
draftClientId: @draftClientId
newMessage: @message
# Play the sending sound
if NylasEnv.config.get("core.sending.sounds")
SoundRegistry.playSound('send')
return Task.Status.Success
# Remove attachments we were waiting to upload
@attachmentPaths.forEach(fs.unlink)
return Promise.resolve(Task.Status.Success)
_onError: (err) =>
msg = "Your message could not be sent at this time. Please try again soon."
if err instanceof NotFoundError
msg = "The draft you are trying to send has been deleted. We have restored your draft. Please try and send again."
DatabaseStore.inTransaction (t) =>
t.persistModel(@backupDraft)
.then =>
return @_permanentError(err, msg)
else if err instanceof APIError
if err.statusCode is 500
return @_permanentError(err, msg)
else if err.statusCode in [400, 404]
NylasEnv.emitError(new Error("Sending a message responded with #{err.statusCode}!"))
return @_permanentError(err, msg)
else if err.statusCode is NylasAPI.TimeoutErrorCode
msg = "We lost internet connection just as we were trying to send your message! Please wait a little bit to see if it went through. If not, check your internet connection and try sending again."
return @_permanentError(err, msg)
else
return Promise.resolve(Task.Status.Retry)
if err instanceof APIError and err.statusCode is NylasAPI.TimeoutErrorCode
msg = "We lost internet connection just as we were trying to send your message! Please wait a little bit to see if it went through. If not, check your internet connection and try sending again."
recoverableStatusCodes = [400, 404, 500, NylasAPI.TimeoutErrorCode]
if err instanceof APIError and err.statusCode in recoverableStatusCodes
return Promise.resolve(Task.Status.Retry)
else
Actions.draftSendingFailed
threadId: @threadId
draftClientId: @draftClientId,
errorMessage: msg
NylasEnv.emitError(err)
return @_permanentError(err, msg)
_permanentError: (err, msg) =>
@_notifyUserOfError(msg)
return Promise.resolve([Task.Status.Failed, err])
_notifyUserOfError: (msg) =>
Actions.draftSendingFailed({
threadId: @threadId
draftClientId: @draftClientId,
errorMessage: msg
})
return Promise.resolve([Task.Status.Failed, err])

View file

@ -31,11 +31,6 @@ class SyncbackDraftTask extends Task
other.draftClientId is @draftClientId and
other.creationDate <= @creationDate
# We want to wait for other SyncbackDraftTasks to run, but we don't want
# to get dequeued if they fail.
onDependentTaskError: ->
return Task.DO_NOT_DEQUEUE_ME
performLocal: ->
# SyncbackDraftTask does not do anything locally. You should persist your changes
# to the local database directly or using a DraftStoreProxy, and then queue a
@ -120,15 +115,14 @@ class SyncbackDraftTask extends Task
DestroyDraftTask = require './destroy-draft'
destroy = new DestroyDraftTask(draftId: existingAccountDraft.id)
promise = TaskQueueStatusStore.waitForPerformLocal(destroy).then =>
@detatchFromRemoteID(existingAccountDraft, acct.id).then (newAccountDraft) =>
Promise.resolve(newAccountDraft)
@cloneIntoAccount(existingAccountDraft, acct.id)
Actions.queueTask(destroy)
return promise
detatchFromRemoteID: (draft, newAccountId = null) ->
cloneIntoAccount: (draft, accountId) ->
return Promise.resolve() unless draft
newDraft = new Message(draft)
newDraft.accountId = newAccountId if newAccountId
newDraft.accountId = accountId
delete newDraft.serverId
delete newDraft.version

View file

@ -145,10 +145,6 @@ class Task
@Status: TaskStatus
@DebugStatus: TaskDebugStatus
# A constant that can be returned by `onDependentTaskError` to prevent
# this task from being dequeued
@DO_NOT_DEQUEUE_ME = "DO_NOT_DEQUEUE_ME"
# Public: Override the constructor to pass initial args to your Task and
# initialize instance variables.
#
@ -455,27 +451,6 @@ class Task
# Returns `true` (is dependent on) or `false` (is not dependent on)
isDependentTask: (other) -> false
# Public: called when a dependency errors out
#
# - `task` An instance of the dependent {Task} that errored.
# - `err` The Error object (if any)
#
# If a dependent task (anything for which {Task::isDependentTask} returns
# true) resolves with `Task.Status.Failed`, then this method will be
# called.
#
# This is an opportunity to cleanup or notify users of the error.
#
# By default, since a dependency failed, **this task will be dequeued**
#
# However, if you return the special `Task.DO_NOT_DEQUEUE_ME` constant,
# this task will not get dequeued and processed in turn.
#
# Returns if you return the `Task.DO_NOT_DEQUEUE_ME` constant, then this
# task will not get dequeued. Any other return value (including `false`)
# will proceed with the default behavior and dequeue this task.
onDependentTaskError: (task, err) ->
# Public: determines which other tasks this one should dequeue.
#
# - `other` An instance of a {Task} you must test to see if it's now