Add sync worker error handling

- Handles sync errors in a single place. For now, if error is not a
socket error, will treat as a permanent error, save the error to the
account object, and prevent any other syncing until the error is cleared
from the account object
- Adds a NylasError class that can be extended and serialized. Adds it
to global namespace on all packages and replaces all uses of regular
Error
This commit is contained in:
Juan Tejada 2016-06-27 16:01:21 -07:00
parent cf421cbb2d
commit ed749e0f51
14 changed files with 95 additions and 101 deletions

View file

@ -2,13 +2,7 @@
"parser": "babel-eslint",
"extends": "airbnb",
"globals": {
"NylasEnv": false,
"$n": false,
"waitsForPromise": false,
"advanceClock": false,
"TEST_ACCOUNT_ID": false,
"TEST_ACCOUNT_NAME": false,
"TEST_ACCOUNT_EMAIL": false
"NylasError": false,
},
"env": {
"browser": true,

View file

@ -9,6 +9,7 @@ const fs = require('fs');
const path = require('path');
global.Promise = require('bluebird');
global.NylasError = require('nylas-core').NylasError;
const server = new Hapi.Server();
server.connection({ port: process.env.PORT || 5100 });

View file

@ -35,7 +35,7 @@ class DatabaseConnector {
_sequelizeForAccount(accountId) {
if (!accountId) {
return Promise.reject(new Error(`You need to pass an accountId to init the database!`))
return Promise.reject(new NylasError(`You need to pass an accountId to init the database!`))
}
const sequelize = new Sequelize(accountId, '', '', {
storage: path.join(STORAGE_DIR, `a-${accountId}.sqlite`),

View file

@ -1,9 +1,9 @@
const Sequelize = require('sequelize');
module.exports = {
JSONType: (fieldName) => ({
JSONType: (fieldName, {defaultValue = '{}'} = {}) => ({
type: Sequelize.STRING,
defaultValue: '{}',
defaultValue,
get: function get() {
return JSON.parse(this.getDataValue(fieldName))
},
@ -11,9 +11,9 @@ module.exports = {
this.setDataValue(fieldName, JSON.stringify(val));
},
}),
JSONARRAYType: (fieldName) => ({
JSONARRAYType: (fieldName, {defaultValue = '[]'} = {}) => ({
type: Sequelize.STRING,
defaultValue: '[]',
defaultValue,
get: function get() {
return JSON.parse(this.getDataValue(fieldName))
},

View file

@ -18,7 +18,7 @@ class IMAPBox {
}
if (_.isFunction(prop) && target._imap._box.name !== target._box.name) {
return () => Promise.reject(
new Error(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`)
new NylasError(`IMAPBox::${name} - Can't operate on a mailbox that is no longer open on the current IMAPConnection.`)
)
}
return prop
@ -116,7 +116,6 @@ const EnsureConnected = [
'getBoxes',
'serverSupports',
'runOperation',
'processNextOperation',
'end',
]
@ -138,7 +137,7 @@ class IMAPConnection extends EventEmitter {
get(target, name) {
if (EnsureConnected.includes(name) && !target._imap) {
return () => Promise.reject(
new Error(`IMAPConnection::${name} - You need to call connect() first.`)
new NylasError(`IMAPConnection::${name} - You need to call connect() first.`)
)
}
return Reflect.get(target, name)
@ -166,7 +165,7 @@ class IMAPConnection extends EventEmitter {
if (this._settings.refresh_token) {
const xoauthFields = ['client_id', 'client_secret', 'imap_username', 'refresh_token'];
if (Object.keys(_.pick(this._settings, xoauthFields)).length !== 4) {
return Promise.reject(new Error(`IMAPConnection: Expected ${xoauthFields.join(',')} when given refresh_token`))
return Promise.reject(new NylasError(`IMAPConnection: Expected ${xoauthFields.join(',')} when given refresh_token`))
}
return new Promise((resolve, reject) => {
xoauth2.createXOAuth2Generator({
@ -263,22 +262,20 @@ class IMAPConnection extends EventEmitter {
console.log(`Starting task ${operation.description()}`)
const result = operation.run(this._db, this);
if (result instanceof Promise === false) {
reject(new Error(`Expected ${operation.constructor.name} to return promise.`))
reject(new NylasError(`Expected ${operation.constructor.name} to return promise.`))
}
result
.then(() => {
this._currentOperation = null;
console.log(`Finished task ${operation.description()}`)
resolve();
this.processNextOperation();
})
.catch((err) => {
this._currentOperation = null;
console.log(`Task errored: ${operation.description()}`)
reject(err);
})
.finally(() => {
this.processNextOperation();
});
}
}
IMAPConnection.Capabilities = Capabilities;

View file

@ -1,4 +1,5 @@
global.Promise = require('bluebird');
global.NylasError = require('./nylas-error');
module.exports = {
Provider: {
@ -10,5 +11,6 @@ module.exports = {
IMAPConnection: require('./imap-connection'),
SyncPolicy: require('./sync-policy'),
SchedulerUtils: require('./scheduler-utils'),
ExtendableError: require('./extendable-error'),
Config: require(`./config/${process.env.ENV || 'development'}`),
NylasError,
}

View file

@ -1,5 +1,6 @@
const crypto = require('crypto');
const IMAPConnection = require('../../imap-connection')
const NylasError = require('../../nylas-error')
const {JSONType, JSONARRAYType} = require('../../database-types');
@ -52,7 +53,7 @@ module.exports = (sequelize, Sequelize) => {
if (message) {
return Promise.resolve(`${message.headers}${message.body}`)
}
return Promise.reject(new Error(`Unable to fetch raw message for Message ${this.id}`))
return Promise.reject(new NylasError(`Unable to fetch raw message for Message ${this.id}`))
})
.finally(() => connection.end())
})

View file

@ -1,5 +1,5 @@
const crypto = require('crypto');
const {JSONType, JSONARRAYType} = require('../../database-types');
const {JSONType} = require('../../database-types');
const {DB_ENCRYPTION_ALGORITHM, DB_ENCRYPTION_PASSWORD} = process.env;
@ -11,7 +11,7 @@ module.exports = (sequelize, Sequelize) => {
connectionSettings: JSONType('connectionSettings'),
connectionCredentials: Sequelize.STRING,
syncPolicy: JSONType('syncPolicy'),
syncErrors: JSONARRAYType('syncErrors'),
syncError: JSONType('syncError', {defaultValue: null}),
}, {
classMethods: {
associate: ({AccountToken}) => {
@ -25,17 +25,17 @@ module.exports = (sequelize, Sequelize) => {
email_address: this.emailAddress,
connection_settings: this.connectionSettings,
sync_policy: this.syncPolicy,
sync_errors: this.syncErrors,
sync_error: this.syncError,
}
},
errored: function errored() {
return this.syncErrors.length > 0
return this.syncError != null
},
setCredentials: function setCredentials(json) {
if (!(json instanceof Object)) {
throw new Error("Call setCredentials with JSON!")
throw new NylasError("Call setCredentials with JSON!")
}
const cipher = crypto.createCipher(DB_ENCRYPTION_ALGORITHM, DB_ENCRYPTION_PASSWORD)
let crypted = cipher.update(JSON.stringify(json), 'utf8', 'hex')

View file

@ -1,4 +1,4 @@
class ExtendableError extends Error {
class NylasError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
@ -15,4 +15,4 @@ class ExtendableError extends Error {
}
}
module.exports = ExtendableError
module.exports = NylasError

View file

@ -1,10 +1,11 @@
const Hapi = require('hapi');
const HapiWebSocket = require('hapi-plugin-websocket');
const Inert = require('inert');
const {DatabaseConnector, PubsubConnector, SchedulerUtils} = require(`nylas-core`);
const {DatabaseConnector, PubsubConnector, SchedulerUtils, NylasError} = require(`nylas-core`);
const {forEachAccountList} = SchedulerUtils;
global.Promise = require('bluebird');
global.NylasError = NylasError;
const server = new Hapi.Server();
server.connection({ port: process.env.PORT / 1 + 1 || 5101 });

View file

@ -1,7 +1,8 @@
const {DatabaseConnector} = require(`nylas-core`)
const {DatabaseConnector, NylasError} = require(`nylas-core`)
const {processors} = require('./processors')
global.Promise = require('bluebird');
global.NylasError = NylasError;
// List of the attributes of Message that the processor should be allowed to change.
// The message may move between folders, get starred, etc. while it's being

View file

@ -1,6 +1,6 @@
global.Promise = require('bluebird');
const {DatabaseConnector} = require(`nylas-core`)
const {DatabaseConnector, NylasError} = require(`nylas-core`)
const SyncProcessManager = require('./sync-process-manager');
const manager = new SyncProcessManager();
@ -18,4 +18,5 @@ DatabaseConnector.forShared().then((db) => {
});
});
global.NylasError = NylasError;
global.manager = manager;

View file

@ -13,7 +13,7 @@ class FetchMessagesInCategory {
this._category = category;
this._options = options;
if (!this._category) {
throw new Error("FetchMessagesInCategory requires a category")
throw new NylasError("FetchMessagesInCategory requires a category")
}
}
@ -145,7 +145,7 @@ class FetchMessagesInCategory {
return this._imap.openBox(this._category.name)
.then((box) => {
if (box.persistentUIDs === false) {
return Promise.reject(new Error("Mailbox does not support persistentUIDs."))
return Promise.reject(new NylasError("Mailbox does not support persistentUIDs."))
}
if (box.uidvalidity !== this._category.syncState.uidvalidity) {
return this._recoverFromUIDInvalidity()

View file

@ -4,19 +4,12 @@ const {
IMAPConnection,
PubsubConnector,
DatabaseConnector,
ExtendableError,
} = require('nylas-core');
const FetchCategoryList = require('./imap/fetch-category-list')
const FetchMessagesInCategory = require('./imap/fetch-messages-in-category')
const SyncbackTaskFactory = require('./syncback-task-factory')
class SyncAllCategoriesError extends ExtendableError {
constructor(message, failures) {
super(message)
this.failures = failures
}
}
class SyncWorker {
constructor(account, db) {
@ -38,7 +31,12 @@ class SyncWorker {
cleanup() {
this._destroyed = true;
this._listener.dispose();
this.closeConnection()
}
closeConnection() {
this._conn.end();
this._conn = null
}
onAccountChanged() {
@ -57,14 +55,18 @@ class SyncWorker {
if (afterSync === 'idle') {
return this.getInboxCategory()
.then((inboxCategory) => this._conn.openBox(inboxCategory.name))
.then(() => console.log("SyncWorker: - Idling on inbox category"))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
.catch((error) => {
this.closeConnection()
console.error('SyncWorker: - Unhandled error while attempting to idle on Inbox after sync: ', error)
})
} else if (afterSync === 'close') {
console.log("SyncWorker: - Closing connection");
this._conn.end();
this._conn = null;
return Promise.resolve()
console.log('SyncWorker: - Closing connection');
} else {
console.warn(`SyncWorker: - Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
return Promise.reject(new Error(`onSyncDidComplete: Unknown afterSync behavior: ${afterSync}`))
this.closeConnection()
return Promise.resolve()
}
onConnectionIdleUpdate() {
@ -79,30 +81,28 @@ class SyncWorker {
if (this._conn) {
return this._conn.connect();
}
return new Promise((resolve, reject) => {
const settings = this._account.connectionSettings;
const credentials = this._account.decryptedCredentials();
const settings = this._account.connectionSettings;
const credentials = this._account.decryptedCredentials();
if (!settings || !settings.imap_host) {
return reject(new Error("ensureConnection: There are no IMAP connection settings for this account."))
}
if (!credentials) {
return reject(new Error("ensureConnection: There are no IMAP connection credentials for this account."))
}
if (!settings || !settings.imap_host) {
return Promise.reject(new NylasError("ensureConnection: There are no IMAP connection settings for this account."))
}
if (!credentials) {
return Promise.reject(new NylasError("ensureConnection: There are no IMAP connection credentials for this account."))
}
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
conn.on('mail', () => {
this.onConnectionIdleUpdate();
})
conn.on('update', () => {
this.onConnectionIdleUpdate();
})
conn.on('queue-empty', () => {
});
this._conn = conn;
return resolve(this._conn.connect());
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
conn.on('mail', () => {
this.onConnectionIdleUpdate();
})
conn.on('update', () => {
this.onConnectionIdleUpdate();
})
conn.on('queue-empty', () => {
});
this._conn = conn;
return this._conn.connect();
}
fetchCategoryList() {
@ -113,7 +113,6 @@ class SyncWorker {
return Promise.resolve();
// TODO
const {SyncbackRequest, accountId, Account} = this._db;
return Account.find({where: {id: accountId}}).then((account) => {
return Promise.each(SyncbackRequest.findAll().then((reqs = []) =>
reqs.map((request) => {
@ -144,52 +143,49 @@ class SyncWorker {
}
}
// TODO Don't accumulate errors, just bail on the first error and clear
// the queue and the connection
const failures = []
return Promise.all(categoriesToSync.map((cat) =>
this._conn.runOperation(new FetchMessagesInCategory(cat, folderSyncOptions))
.catch((error) => failures.push({error, category: cat.name}))
))
.then(() => {
if (failures.length > 0) {
const error = new SyncAllCategoriesError(
`Failed to sync all categories for ${this._account.emailAddress}`, failures
)
return Promise.reject(error)
}
return Promise.resolve()
})
});
}
performSync() {
return this.fetchCategoryList()
.then(() => this.syncbackMessageActions())
.then(() => this.syncAllCategories())
}
syncNow() {
clearTimeout(this._syncTimer);
if (this._account.errored()) {
console.log(`SyncWorker: Account ${this._account.emailAddress} is in error state - Skipping sync`)
return
}
this.ensureConnection()
.then(this.fetchCategoryList.bind(this))
.then(this.syncbackMessageActions.bind(this))
.then(this.syncAllCategories.bind(this))
.catch((error) => {
// TODO
// Distinguish between temporary and critical errors
// Update account sync state for critical errors
// Handle connection errors separately
console.log('----------------------------------')
console.log('Erroring where you are supposed to')
console.log(error)
console.log('----------------------------------')
})
.then(() => this.performSync())
.then(() => this.onSyncDidComplete())
.catch((error) => this.onSyncError(error))
.finally(() => {
this._lastSyncTime = Date.now()
this.onSyncDidComplete()
.catch((error) => (
console.error('SyncWorker.syncNow: Unhandled error while cleaning up after sync: ', error)
))
.finally(() => this.scheduleNextSync())
});
this.scheduleNextSync()
})
}
onSyncError(error) {
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} `, error)
this.closeConnection()
if (error.source === 'socket') {
// Continue to retry if it was a network error
return Promise.resolve()
}
this._account.syncError = error
return this._account.save()
}
scheduleNextSync() {
if (this._account.errored()) { return }
SchedulerUtils.checkIfAccountIsActive(this._account.id).then((active) => {
const {intervals} = this._account.syncPolicy;
const interval = active ? intervals.active : intervals.inactive;