Mailspring/packages/nylas-sync/sync-worker.js

251 lines
7.3 KiB
JavaScript
Raw Normal View History

const {
SchedulerUtils,
IMAPConnection,
PubsubConnector,
DatabaseConnector,
MessageTypes,
} = require('nylas-core');
const {
jsonError,
} = require('./sync-utils')
const {CLAIM_DURATION} = SchedulerUtils;
const FetchFolderList = require('./imap/fetch-category-list')
const FetchMessagesInFolder = require('./imap/fetch-messages-in-category')
2016-06-24 06:46:52 +08:00
const SyncbackTaskFactory = require('./syncback-task-factory')
2016-06-19 18:02:32 +08:00
2016-06-28 01:27:38 +08:00
2016-06-20 15:19:16 +08:00
class SyncWorker {
constructor(account, db, onExpired) {
2016-06-21 08:33:23 +08:00
this._db = db;
this._conn = null;
this._account = account;
this._startTime = Date.now();
this._lastSyncTime = null;
this._onExpired = onExpired;
2016-06-19 18:02:32 +08:00
2016-06-21 08:33:23 +08:00
this._syncTimer = null;
this._expirationTimer = null;
this._destroyed = false;
2016-06-19 18:02:32 +08:00
2016-07-09 06:36:50 +08:00
this.syncNow({reason: 'Initial'});
2016-06-30 02:44:30 +08:00
this._onMessage = this._onMessage.bind(this);
this._listener = PubsubConnector.observeAccount(account.id).subscribe(this._onMessage)
}
cleanup() {
this._destroyed = true;
this._listener.dispose();
this.closeConnection()
}
closeConnection() {
2016-07-02 02:36:02 +08:00
if (this._conn) {
this._conn.end();
}
2016-06-21 08:33:23 +08:00
}
2016-06-19 18:02:32 +08:00
2016-06-29 08:12:45 +08:00
_onMessage(msg) {
2016-06-30 02:44:30 +08:00
const {type} = JSON.parse(msg);
switch (type) {
case MessageTypes.ACCOUNT_UPDATED:
this._onAccountUpdated(); break;
case MessageTypes.SYNCBACK_REQUESTED:
2016-07-09 06:36:50 +08:00
this.syncNow({reason: 'Syncback Action Queued'}); break;
default:
throw new Error(`Invalid message: ${msg}`)
}
}
_onAccountUpdated() {
2016-07-09 07:59:00 +08:00
if (!this.isWaitingForNextSync()) {
return;
2016-07-09 06:36:50 +08:00
}
2016-07-09 07:59:00 +08:00
this._getAccount().then((account) => {
this._account = account;
this.syncNow({reason: 'Account Modification'});
});
2016-06-29 06:44:29 +08:00
}
_onConnectionIdleUpdate() {
2016-07-09 07:59:00 +08:00
if (!this.isWaitingForNextSync()) {
return;
}
2016-07-09 06:36:50 +08:00
this.syncNow({reason: 'IMAP IDLE Fired'});
}
2016-06-29 06:44:29 +08:00
_getAccount() {
return DatabaseConnector.forShared().then(({Account}) =>
Account.find({where: {id: this._account.id}})
);
2016-06-21 08:33:23 +08:00
}
_getIdleFolder() {
return this._db.Folder.find({where: {role: ['all', 'inbox']}})
2016-06-21 08:33:23 +08:00
}
ensureConnection() {
if (this._conn) {
return this._conn.connect();
}
const settings = this._account.connectionSettings;
const credentials = this._account.decryptedCredentials();
if (!settings || !settings.imap_host) {
return Promise.reject(new Error("ensureConnection: There are no IMAP connection settings for this account."))
}
if (!credentials) {
return Promise.reject(new Error("ensureConnection: There are no IMAP connection credentials for this account."))
}
2016-06-21 08:33:23 +08:00
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
conn.on('mail', () => {
this._onConnectionIdleUpdate();
})
conn.on('update', () => {
this._onConnectionIdleUpdate();
})
conn.on('queue-empty', () => {
});
this._conn = conn;
return this._conn.connect();
2016-06-21 08:33:23 +08:00
}
2016-06-24 04:15:30 +08:00
syncbackMessageActions() {
2016-06-29 08:12:45 +08:00
const where = {where: {status: "NEW"}, limit: 100};
return this._db.SyncbackRequest.findAll(where)
.map((req) => SyncbackTaskFactory.create(this._account, req))
2016-06-29 10:02:12 +08:00
.each(this.runSyncbackTask.bind(this))
}
runSyncbackTask(task) {
2016-06-30 01:11:53 +08:00
const syncbackRequest = task.syncbackRequestObject()
2016-06-29 10:02:12 +08:00
return this._conn.runOperation(task)
2016-06-30 02:44:30 +08:00
.then(() => {
syncbackRequest.status = "SUCCEEDED"
})
2016-06-29 10:07:49 +08:00
.catch((error) => {
2016-06-30 01:11:53 +08:00
syncbackRequest.error = error
syncbackRequest.status = "FAILED"
2016-07-09 07:59:00 +08:00
})
.finally(() => syncbackRequest.save())
2016-06-24 04:15:30 +08:00
}
2016-06-28 01:27:38 +08:00
syncAllCategories() {
const {Folder} = this._db;
const {folderSyncOptions} = this._account.syncPolicy;
2016-06-21 08:33:23 +08:00
return Folder.findAll().then((categories) => {
const priority = ['inbox', 'all', 'drafts', 'sent', 'spam', 'trash'].reverse();
const categoriesToSync = categories.sort((a, b) =>
(priority.indexOf(a.role) - priority.indexOf(b.role)) * -1
2016-06-21 08:33:23 +08:00
)
return Promise.all(categoriesToSync.map((cat) =>
this._conn.runOperation(new FetchMessagesInFolder(cat, folderSyncOptions))
2016-06-24 02:20:47 +08:00
))
2016-06-20 15:19:16 +08:00
});
2016-06-19 18:02:32 +08:00
}
2016-06-21 08:33:23 +08:00
2016-07-09 06:36:50 +08:00
syncNow({reason} = {}) {
2016-06-21 08:33:23 +08:00
clearTimeout(this._syncTimer);
2016-07-09 06:36:50 +08:00
this._syncTimer = null;
2016-06-29 06:37:16 +08:00
if (!process.env.SYNC_AFTER_ERRORS && this._account.errored()) {
2016-07-09 06:36:50 +08:00
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) is in error state - Skipping sync`)
return
}
2016-07-09 06:36:50 +08:00
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) sync started (${reason})`)
2016-06-24 02:20:47 +08:00
this.ensureConnection()
.then(() => this._account.update({syncError: null}))
2016-07-09 07:59:00 +08:00
.then(() => this.syncbackMessageActions())
.then(() => this._conn.runOperation(new FetchFolderList(this._account.provider)))
.then(() => this.syncAllCategories())
.then(() => this.onSyncDidComplete())
.catch((error) => this.onSyncError(error))
2016-06-24 02:20:47 +08:00
.finally(() => {
this._lastSyncTime = Date.now()
this.scheduleNextSync()
})
}
onSyncError(error) {
2016-07-09 06:36:50 +08:00
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} (${this._account.id})`, error)
this.closeConnection()
2016-07-09 07:59:00 +08:00
if (error.source.includes('socket') || error.source.includes('timeout')) {
// Continue to retry if it was a network error
return Promise.resolve()
}
2016-07-09 07:59:00 +08:00
this._account.syncError = jsonError(error)
return this._account.save()
2016-06-21 08:33:23 +08:00
}
onSyncDidComplete() {
const {afterSync} = this._account.syncPolicy;
2016-07-02 04:15:49 +08:00
if (!this._account.firstSyncCompletedAt) {
this._account.firstSyncCompletedAt = Date.now()
}
2016-07-08 05:30:51 +08:00
const now = Date.now();
const syncGraphTimeLength = 60 * 30; // 30 minutes, should be the same as SyncGraph.config.timeLength
let lastSyncCompletions = [...this._account.lastSyncCompletions]
2016-07-08 05:30:51 +08:00
lastSyncCompletions = [now, ...lastSyncCompletions]
while (now - lastSyncCompletions[lastSyncCompletions.length - 1] > 1000 * syncGraphTimeLength) {
lastSyncCompletions.pop();
}
2016-07-08 05:30:51 +08:00
this._account.lastSyncCompletions = lastSyncCompletions
this._account.save()
console.log('Syncworker: Completed sync cycle')
if (afterSync === 'idle') {
return this._getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
2016-07-09 07:59:00 +08:00
isWaitingForNextSync() {
2016-07-09 06:36:50 +08:00
return this._syncTimer != null;
}
2016-06-21 08:33:23 +08:00
scheduleNextSync() {
if (Date.now() - this._startTime > CLAIM_DURATION) {
console.log("SyncWorker: - Has held account for more than CLAIM_DURATION, returning to pool.");
this.cleanup();
this._onExpired();
return;
}
SchedulerUtils.checkIfAccountIsActive(this._account.id).then((active) => {
const {intervals} = this._account.syncPolicy;
const interval = active ? intervals.active : intervals.inactive;
if (interval) {
const target = this._lastSyncTime + interval;
console.log(`SyncWorker: Account ${active ? 'active' : 'inactive'}. Next sync scheduled for ${new Date(target).toLocaleString()}`);
this._syncTimer = setTimeout(() => {
2016-07-09 06:36:50 +08:00
this.syncNow({reason: 'Scheduled'});
}, target - Date.now());
}
});
2016-06-21 08:33:23 +08:00
}
2016-06-19 18:02:32 +08:00
}
module.exports = SyncWorker;