mirror of
https://github.com/Foundry376/Mailspring.git
synced 2025-09-04 19:54:32 +08:00
[client-sync] Interrupt long-running syncback tasks
Summary: Interrupt retryable syncback tasks that are taking too long so that we can return control to the sync loop. The sync loop will retry the task later. This diff adds a `forceReject` param to `interrupt()` so that we can return control immediately instead of waiting for the current operation to finish (for instance, the syncback task could be stuck in an imap operation, and a normal interrupt would still have to wait for that to finish before returning control to the callee) Part of T7978 Test Plan: specs Reviewers: evan, spang, mark, juan Reviewed By: juan Differential Revision: https://phab.nylas.com/D4269
This commit is contained in:
parent
2d1a3714d8
commit
c11a7ff830
4 changed files with 115 additions and 4 deletions
|
@ -0,0 +1,51 @@
|
|||
import {Errors} from 'isomorphic-core'
|
||||
import {createLogger} from '../../../src/shared/logger'
|
||||
import SyncbackTask from '../../../src/local-sync-worker/syncback-tasks/syncback-task'
|
||||
|
||||
let syncbackTask;
|
||||
const TIMEOUT_DELAY = 10;
|
||||
let fakeSetTimeout;
|
||||
|
||||
describe("SyncbackTask", () => {
|
||||
beforeEach(() => {
|
||||
global.Logger = createLogger()
|
||||
const account = {id: 'account1'}
|
||||
const syncbackRequest = {
|
||||
status: 'NEW',
|
||||
}
|
||||
syncbackTask = new SyncbackTask(account, syncbackRequest)
|
||||
fakeSetTimeout = window.setTimeout
|
||||
window.setTimeout = window.originalSetTimeout
|
||||
})
|
||||
afterEach(() => {
|
||||
window.setTimeout = fakeSetTimeout;
|
||||
})
|
||||
describe("when it takes too long", () => {
|
||||
beforeEach(() => {
|
||||
syncbackTask._run = function* hello() {
|
||||
yield new Promise((resolve) => {
|
||||
setTimeout(resolve, TIMEOUT_DELAY + 5)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
it("is stopped if retryable", async () => {
|
||||
syncbackTask._syncbackRequest.status = "INPROGRESS-RETRYABLE"
|
||||
let error;
|
||||
try {
|
||||
await syncbackTask.run({timeoutDelay: TIMEOUT_DELAY})
|
||||
} catch (err) {
|
||||
error = err
|
||||
}
|
||||
expect(error).toBeDefined()
|
||||
expect(error instanceof Errors.RetryableError).toEqual(true)
|
||||
expect(/interrupted/i.test(error.toString())).toEqual(true)
|
||||
})
|
||||
|
||||
it("is not stopped if not retryable", async () => {
|
||||
syncbackTask._syncbackRequest.status = "INPROGRESS-NOTRETRYABLE"
|
||||
// If this does end up being stopped, it'll throw an error.
|
||||
await syncbackTask.run({timeoutDelay: TIMEOUT_DELAY})
|
||||
})
|
||||
})
|
||||
})
|
21
packages/client-sync/spec/shared/interruptible-spec.es6
Normal file
21
packages/client-sync/spec/shared/interruptible-spec.es6
Normal file
|
@ -0,0 +1,21 @@
|
|||
import Interruptible from '../../src/shared/interruptible'
|
||||
|
||||
describe("Interruptible", () => {
|
||||
describe("when interrupted with forceReject", () => {
|
||||
it("the run method rejects immediately", async () => {
|
||||
function* neverResolves() {
|
||||
yield new Promise(() => {})
|
||||
}
|
||||
const interruptible = new Interruptible()
|
||||
const promise = interruptible.run(neverResolves)
|
||||
interruptible.interrupt({forceReject: true})
|
||||
try {
|
||||
await promise;
|
||||
} catch (err) {
|
||||
expect(/interrupted/i.test(err.toString())).toEqual(true)
|
||||
}
|
||||
// The promse never resolves, so if it doesn't reject,
|
||||
// this test will timeout.
|
||||
})
|
||||
})
|
||||
})
|
|
@ -1,7 +1,13 @@
|
|||
import Interruptible from '../../shared/interruptible'
|
||||
|
||||
// TODO: Choose a more appropriate timeout once we've gathered some metrics
|
||||
const TIMEOUT_DELAY = 5 * 60 * 1000;
|
||||
|
||||
class SyncbackTask {
|
||||
constructor(account, syncbackRequest) {
|
||||
this._account = account;
|
||||
this._syncbackRequest = syncbackRequest;
|
||||
this._interruptible = new Interruptible()
|
||||
if (!this._account) {
|
||||
throw new Error("SyncbackTask requires an account")
|
||||
}
|
||||
|
@ -27,8 +33,21 @@ class SyncbackTask {
|
|||
throw new Error("Must implement `affectsImapMessageUIDs`")
|
||||
}
|
||||
|
||||
run() {
|
||||
throw new Error("Must implement a run method")
|
||||
stop = () => {
|
||||
// If we can't retry the task, we don't want to interrupt it.
|
||||
if (this._syncbackRequest.status !== "INPROGRESS-NOTRETRYABLE") {
|
||||
this._interruptible.interrupt({forceReject: true})
|
||||
}
|
||||
}
|
||||
|
||||
async * _run() { // eslint-disable-line
|
||||
throw new Error("Must implement a _run method")
|
||||
}
|
||||
|
||||
async run({timeoutDelay = TIMEOUT_DELAY} = {}) {
|
||||
const timeout = setTimeout(this.stop, timeoutDelay)
|
||||
await this._interruptible.run(this._run)
|
||||
clearTimeout(timeout)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
const {EventEmitter} = require('events')
|
||||
const {Errors} = require('isomorphic-core')
|
||||
|
||||
/**
|
||||
* Interruptible objects allow you to run and interrupt functions by using
|
||||
|
@ -37,14 +38,22 @@ class Interruptible extends EventEmitter {
|
|||
super()
|
||||
this._interrupt = false
|
||||
this._running = false
|
||||
this._rejectWithinRun = null
|
||||
}
|
||||
|
||||
interrupt() {
|
||||
interrupt({forceReject = false} = {}) {
|
||||
if (!this._running) { return Promise.resolve() }
|
||||
|
||||
// Start listening before the interrupt, so we don't miss the 'interrupted' event
|
||||
const promise = new Promise((resolve) => this.once('interrupted', resolve))
|
||||
this._interrupt = true
|
||||
|
||||
if (forceReject && this._rejectWithinRun) {
|
||||
// This will reject the `interruptible.run()` call and immediately return
|
||||
// control to the code path that is awaiting it.
|
||||
this._rejectWithinRun(new Errors.RetryableError('Forcefully interrupted'))
|
||||
}
|
||||
|
||||
return promise
|
||||
}
|
||||
|
||||
|
@ -115,7 +124,18 @@ class Interruptible extends EventEmitter {
|
|||
async run(generatorFunc, ctx, ...fnArgs) {
|
||||
this._running = true
|
||||
const generatorObj = generatorFunc.call(ctx, ...fnArgs)
|
||||
await this._runGenerator(generatorObj)
|
||||
await new Promise(async (resolve, reject) => {
|
||||
this._rejectWithinRun = (rejectValue) => {
|
||||
reject(rejectValue)
|
||||
}
|
||||
try {
|
||||
await this._runGenerator(generatorObj)
|
||||
} catch (err) {
|
||||
reject(err)
|
||||
return;
|
||||
}
|
||||
resolve()
|
||||
})
|
||||
this._interrupt = false
|
||||
this._running = false
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue