task runner

This commit is contained in:
Andris Reinman 2018-10-11 11:48:12 +03:00
parent f6797583fe
commit 96329aa85a
13 changed files with 364 additions and 281 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
define({ "name": "wildduck", "version": "1.0.0", "description": "WildDuck API docs", "title": "WildDuck API", "url": "https://api.wildduck.email", "sampleUrl": false, "defaultVersion": "0.0.0", "apidoc": "0.3.0", "generator": { "name": "apidoc", "time": "2018-10-08T08:02:16.146Z", "url": "http://apidocjs.com", "version": "0.17.6" } });
define({ "name": "wildduck", "version": "1.0.0", "description": "WildDuck API docs", "title": "WildDuck API", "url": "https://api.wildduck.email", "sampleUrl": false, "defaultVersion": "0.0.0", "apidoc": "0.3.0", "generator": { "name": "apidoc", "time": "2018-10-11T08:48:03.541Z", "url": "http://apidocjs.com", "version": "0.17.6" } });

View file

@ -1 +1 @@
{ "name": "wildduck", "version": "1.0.0", "description": "WildDuck API docs", "title": "WildDuck API", "url": "https://api.wildduck.email", "sampleUrl": false, "defaultVersion": "0.0.0", "apidoc": "0.3.0", "generator": { "name": "apidoc", "time": "2018-10-08T08:02:16.146Z", "url": "http://apidocjs.com", "version": "0.17.6" } }
{ "name": "wildduck", "version": "1.0.0", "description": "WildDuck API docs", "title": "WildDuck API", "url": "https://api.wildduck.email", "sampleUrl": false, "defaultVersion": "0.0.0", "apidoc": "0.3.0", "generator": { "name": "apidoc", "time": "2018-10-11T08:48:03.541Z", "url": "http://apidocjs.com", "version": "0.17.6" } }

View file

@ -393,6 +393,13 @@ indexes:
user: 1
_id: -1
- collection: archived
index:
name: user_messages_archived
key:
user: 1
archived: 1
- collection: archived
index:
name: retention_time

View file

@ -3475,6 +3475,7 @@ module.exports = (db, server, messageHandler) => {
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
user,
start,
end

View file

@ -20,6 +20,9 @@ module.exports = {
JUNK_RETENTION: 30 * 24 * 3600 * 1000,
// how long to keep messages of deleted users before deleting
DELETED_USER_MESSAGE_RETENTION: 14 * 24 * 3600 * 1000,
MAILBOX_COUNTER_TTL: 24 * 3600,
// how much plaintext to store in a full text indexed field
@ -73,5 +76,8 @@ module.exports = {
TASK_STARTUP_INTERVAL: 1 * 60 * 1000,
// if no tasks were found, wait 1 minute
TASK_IDLE_INTERVAL: 1 * 60 * 1000
TASK_IDLE_INTERVAL: 1 * 60 * 1000,
TASK_LOCK_INTERVAL: 1 * 60 * 60 * 1000,
TASK_UPDATE_INTERVAL: 10 * 60 * 1000
};

View file

@ -66,8 +66,12 @@ class MessageHandler {
query.user = options.user;
if (options.specialUse) {
query.specialUse = options.specialUse;
} else {
} else if (options.path) {
query.path = options.path;
} else {
let err = new Error('Mailbox is missing');
err.imapResponse = 'TRYCREATE';
return callback(err);
}
}
}
@ -1079,12 +1083,14 @@ class MessageHandler {
put(messageData, callback) {
let getMailbox = next => {
this.getMailbox({ mailbox: messageData.mailbox }, (err, mailboxData) => {
if (err && !err.imapResponse) {
if (err && err.imapResponse !== 'TRYCREATE') {
return callback(err);
}
if (mailboxData) {
return next(null, mailboxData);
}
this.getMailbox(
{
query: {

116
lib/tasks/restore.js Normal file
View file

@ -0,0 +1,116 @@
'use strict';
const log = require('npmlog');
const db = require('../db');
module.exports = (taskData, options, callback) => {
const messageHandler = options.messageHandler;
let cursor = db.database.collection('archived').find({
user: taskData.user,
archived: {
$gte: taskData.start,
$lte: taskData.end
}
});
db.users.findOne({ _id: taskData.user }, (err, userData) => {
if (err) {
log.error('Tasks', 'task=restore id=%s user=%s error=%s', taskData._id, taskData.user, err.message);
return callback(err);
}
if (!userData) {
// no such user anymore
log.error('Tasks', 'task=restore id=%s user=%s error=%s', taskData._id, taskData.user, 'No such user');
return callback(null, true);
}
db.database
.collection('mailboxes')
.find({
user: taskData.user
})
.toArray((err, mailboxesList) => {
if (err) {
log.error('Tasks', 'task=restore id=%s user=%s error=%s', taskData._id, taskData.user, err.message);
return callback(err);
}
let mailboxes = new Map();
let trashMailbox;
let inboxMailbox;
mailboxes = (mailboxesList || []).forEach(mailboxData => {
mailboxes.set(mailboxData._id.toString(), mailboxData);
if (mailboxData.specialUse === '\\Trash') {
trashMailbox = mailboxData._id;
} else if (mailboxData.path === 'INBOX') {
inboxMailbox = mailboxData._id;
}
});
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
log.error('Tasks', 'task=restore id=%s user=%s error=%s', taskData._id, taskData.user, err.message);
return callback(err);
}
if (!messageData) {
return cursor.close(() => callback(null, true));
}
// move messages from Trash and non-existing mailboxes to INBOX
if (messageData.mailbox && (messageData.mailbox.equals(trashMailbox) || !mailboxes.has(messageData.mailbox.toString()))) {
messageData.mailbox = inboxMailbox;
}
delete messageData.archived;
delete messageData.exp;
delete messageData.rdate;
log.info(
'Tasks',
'task=restore id=%s user=%s message=%s action=restoring target=%s',
taskData._id,
taskData.user,
messageData._id,
messageData.mailbox
);
messageHandler.put(messageData, (err, response) => {
if (err) {
log.error('Tasks', 'task=restore id=%s user=%s message=%s error=%s', taskData._id, taskData.user, messageData._id, err.message);
return setTimeout(processNext, 5000);
} else if (!response) {
log.error(
'Tasks',
'task=restore id=%s user=%s message=%s error=%s',
taskData._id,
taskData.user,
messageData._id,
'Failed to restore message'
);
return setTimeout(processNext, 1000);
}
log.info(
'Tasks',
'task=restore id=%s user=%s message=%s mailbox=%s uid=%s',
taskData._id,
taskData.user,
messageData._id,
response.mailbox,
response.uid
);
return db.database.collection('archived').deleteOne({ _id: messageData._id }, () => processNext());
});
});
};
processNext();
});
});
};

90
lib/tasks/user-delete.js Normal file
View file

@ -0,0 +1,90 @@
'use strict';
const log = require('npmlog');
const db = require('../db');
const consts = require('../consts');
module.exports = (taskData, options, callback) => {
// keep messages around for a while, delete other stuff
let processMessages = done => {
db.database.collection('messages').updateMany(
{ user: taskData.user },
{
$set: {
exp: true,
rdate: new Date(Date.now() + consts.DELETED_USER_MESSAGE_RETENTION),
userDeleted: true
}
},
err => {
if (err) {
log.error(
'Tasks',
'task=user-delete id=%s user=%s message=%s error=%s',
taskData._id,
taskData.user,
'Failed to update messages',
err.message
);
err.code = 'InternalDatabaseError';
return callback(err);
}
done();
}
);
};
processMessages(() => {
db.database.collection('mailboxes').deleteMany({ user: taskData.user }, err => {
if (err) {
log.error(
'Tasks',
'task=user-delete id=%s user=%s message=%s error=%s',
taskData._id,
taskData.user,
'Failed to delete mailboxes',
err.message
);
err.code = 'InternalDatabaseError';
}
db.users.collection('asps').deleteMany({ user: taskData.user }, err => {
if (err) {
log.error('Tasks', 'task=user-delete id=%s user=%s message=%s error=%s', taskData._id, taskData.user, 'Failed to delete asps', err.message);
err.code = 'InternalDatabaseError';
}
db.users.collection('filters').deleteMany({ user: taskData.user }, err => {
if (err) {
log.error(
'Tasks',
'task=user-delete id=%s user=%s message=%s error=%s',
taskData._id,
taskData.user,
'Failed to delete filters',
err.message
);
err.code = 'InternalDatabaseError';
}
db.users.collection('autoreplies').deleteMany({ user: taskData.user }, err => {
if (err) {
log.error(
'Tasks',
'task=user-delete id=%s user=%s message=%s error=%s',
taskData._id,
taskData.user,
'Failed to delete autoreplies',
err.message
);
err.code = 'InternalDatabaseError';
}
return callback(null, true);
});
});
});
});
});
};

View file

@ -2982,91 +2982,56 @@ class UserHandler {
// clear limits in Redis
this.redis.del('limits:' + user, () => false);
this.database.collection('messages').updateMany(
{ user },
{
$set: {
exp: true,
rdate: new Date(Date.now() + 2 * 24 * 3600 * 1000),
userDeleted: true
}
},
err => {
let tryCount = 0;
let tryDelete = err => {
if (tryCount++ > 10) {
return callback(err);
}
this.users.collection('addresses').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete messages for id=%s error=%s', user, err.message);
log.error('USERDEL', 'Failed to delete addresses for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
return callback(err);
if (tryCount > 4) {
return setTimeout(() => tryDelete(err), 100);
}
}
let tryCount = 0;
let tryDelete = err => {
if (tryCount++ > 10) {
return callback(err);
this.users.collection('users').deleteOne({ _id: user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete user id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
return setTimeout(() => tryDelete(err), 100);
}
this.database.collection('mailboxes').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete mailboxes for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
if (tryCount > 2) {
return setTimeout(() => tryDelete(err), 100);
}
}
// set up a task to delete user messages
let now = new Date();
this.database.collection('tasks').insertOne(
{
task: 'user-delete',
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
user
},
() =>
this.logAuthEvent(
user,
{
action: 'delete user',
result: 'success',
sess: meta.session,
ip: meta.ip
},
() => callback(null, true)
)
this.users.collection('addresses').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete addresses for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
if (tryCount > 4) {
return setTimeout(() => tryDelete(err), 100);
}
}
this.users.collection('users').deleteOne({ _id: user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete user id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
return setTimeout(() => tryDelete(err), 100);
}
this.users.collection('asps').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete asps for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
}
this.users.collection('filters').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete filters for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
}
this.users.collection('autoreplies').deleteMany({ user }, err => {
if (err) {
log.error('USERDEL', 'Failed to delete autoreplies for id=%s error=%s', user, err.message);
err.code = 'InternalDatabaseError';
}
return this.logAuthEvent(
user,
{
action: 'delete user',
result: 'success',
sess: meta.session,
ip: meta.ip
},
() => callback(null, true)
);
});
});
});
});
});
});
};
setImmediate(tryDelete);
}
);
);
});
});
};
setImmediate(tryDelete);
}
checkAddress(username, callback) {

View file

@ -47,7 +47,7 @@
"humanname": "0.2.2",
"iconv-lite": "0.4.24",
"ioredfour": "1.0.2-ioredis-02",
"ioredis": "4.0.1",
"ioredis": "4.0.2",
"isemail": "3.1.3",
"joi": "13.7.0",
"js-yaml": "3.12.0",
@ -58,7 +58,7 @@
"mailsplit": "4.2.3",
"mobileconfig": "2.1.0",
"mongo-cursor-pagination": "7.1.0",
"mongodb": "3.1.6",
"mongodb": "3.1.8",
"mongodb-extended-json": "1.10.0",
"node-forge": "0.7.6",
"nodemailer": "4.6.8",

280
tasks.js
View file

@ -10,20 +10,7 @@ const fs = require('fs');
const MessageHandler = require('./lib/message-handler');
const setupIndexes = yaml.safeLoad(fs.readFileSync(__dirname + '/indexes.yaml', 'utf8'));
let logger = {
info(...args) {
args.shift();
log.info('IMAP', ...args);
},
debug(...args) {
args.shift();
log.silly('IMAP', ...args);
},
error(...args) {
args.shift();
log.error('IMAP', ...args);
}
};
const taskRestore = require('./lib/tasks/restore');
let messageHandler;
let gcTimeout;
@ -66,28 +53,13 @@ module.exports.start = callback => {
let collectionpos = 0;
let ensureCollections = next => {
if (collectionpos >= collections.length) {
logger.info(
{
tnx: 'mongo'
},
'Setup %s collections',
collections.length
);
log.info('Setup', 'Setup %s collections in MongoDB', collections.length);
return next();
}
let collection = collections[collectionpos++];
db[collection.type || 'database'].createCollection(collection.collection, collection.options, err => {
if (err) {
logger.error(
{
err,
tnx: 'mongo'
},
'Failed creating collection %s %s. %s',
collectionpos,
JSON.stringify(collection.collection),
err.message
);
log.error('Setup', 'Failed creating collection %s %s. %s', collectionpos, JSON.stringify(collection.collection), err.message);
}
ensureCollections(next);
@ -98,42 +70,18 @@ module.exports.start = callback => {
let indexpos = 0;
let ensureIndexes = next => {
if (indexpos >= indexes.length) {
logger.info(
{
tnx: 'mongo'
},
'Setup indexes for %s collections',
indexes.length
);
log.info('Setup', 'Setup indexes for %s collections', indexes.length);
return next();
}
let index = indexes[indexpos++];
db[index.type || 'database'].collection(index.collection).createIndexes([index.index], (err, r) => {
if (err) {
logger.error(
{
err,
tnx: 'mongo'
},
'Failed creating index %s %s. %s',
indexpos,
JSON.stringify(index.collection + '.' + index.index.name),
err.message
);
log.error('Setup', 'Failed creating index %s %s. %s', indexpos, JSON.stringify(index.collection + '.' + index.index.name), err.message);
} else if (r.numIndexesAfter !== r.numIndexesBefore) {
logger.debug(
{
tnx: 'mongo'
},
'Created index %s %s',
indexpos,
JSON.stringify(index.collection + '.' + index.index.name)
);
log.verbose('Setup', 'Created index %s %s', indexpos, JSON.stringify(index.collection + '.' + index.index.name));
} else {
logger.debug(
{
tnx: 'mongo'
},
log.verbose(
'Setup',
'Skipped index %s %s: %s',
indexpos,
JSON.stringify(index.collection + '.' + index.index.name),
@ -147,14 +95,7 @@ module.exports.start = callback => {
gcLock.acquireLock('db_indexes', 1 * 60 * 1000, (err, lock) => {
if (err) {
logger.error(
{
tnx: 'gc',
err
},
'Failed to acquire lock error=%s',
err.message
);
log.error('GC', 'Failed to acquire lock error=%s', err.message);
return start();
} else if (!lock.success) {
return start();
@ -166,14 +107,7 @@ module.exports.start = callback => {
setTimeout(() => {
gcLock.releaseLock(lock, err => {
if (err) {
logger.error(
{
tnx: 'gc',
err
},
'Failed to release lock error=%s',
err.message
);
log.error('GC', 'Failed to release lock error=%s', err.message);
}
});
}, 60 * 1000);
@ -190,47 +124,23 @@ function clearExpiredMessages() {
// First, acquire the lock. This prevents multiple connected clients for deleting the same messages
gcLock.acquireLock('gc_expired', Math.round(consts.GC_INTERVAL * 1.2) /* Lock expires if not released */, (err, lock) => {
if (err) {
logger.error(
{
tnx: 'gc',
err
},
'Failed to acquire lock error=%s',
err.message
);
log.error('GC', 'Failed to acquire lock error=%s', err.message);
gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL);
gcTimeout.unref();
return;
} else if (!lock.success) {
logger.debug(
{
tnx: 'gc'
},
'Lock already acquired'
);
log.verbose('GC', 'Lock already acquired');
gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL);
gcTimeout.unref();
return;
}
logger.debug(
{
tnx: 'gc'
},
'Got lock for garbage collector'
);
log.verbose('GC', 'Got lock for garbage collector');
let done = () => {
gcLock.releaseLock(lock, err => {
if (err) {
logger.error(
{
tnx: 'gc',
err
},
'Failed to release lock error=%s',
err.message
);
log.error('GC', 'Failed to release lock error=%s', err.message);
}
gcTimeout = setTimeout(clearExpiredMessages, consts.GC_INTERVAL);
gcTimeout.unref();
@ -251,12 +161,7 @@ function clearExpiredMessages() {
};
let archiveExpiredMessages = next => {
logger.debug(
{
tnx: 'gc'
},
'Archiving expired messages'
);
log.verbose('GC', 'Archiving expired messages');
// find and delete all messages that are expired
// NB! scattered query, searches over all mailboxes and thus over all shards
@ -271,13 +176,7 @@ function clearExpiredMessages() {
let clear = () =>
cursor.close(() => {
if (deleted) {
logger.debug(
{
tnx: 'gc'
},
'Deleted %s messages',
deleted
);
log.verbose('GC', 'Deleted %s messages', deleted);
}
return deleteOrphaned(next);
});
@ -304,25 +203,10 @@ function clearExpiredMessages() {
},
err => {
if (err) {
logger.error(
{
tnx: 'gc',
err
},
'Failed to delete expired message id=%s. %s',
messageData._id,
err.message
);
log.error('GC', 'Failed to delete expired message id=%s. %s', messageData._id, err.message);
return cursor.close(() => done(err));
}
logger.debug(
{
tnx: 'gc',
err
},
'Deleted expired message id=%s',
messageData._id
);
log.verbose('GC', 'Deleted expired message id=%s', messageData._id);
deleted++;
if (consts.GC_DELAY_DELETE) {
setTimeout(processNext, consts.GC_DELAY_DELETE);
@ -338,12 +222,7 @@ function clearExpiredMessages() {
};
let purgeExpiredMessages = next => {
logger.debug(
{
tnx: 'gc'
},
'Purging archived messages'
);
log.verbose('GC', 'Purging archived messages');
// find and delete all messages that are expired
// NB! scattered query, searches over all mailboxes and thus over all shards
@ -370,13 +249,7 @@ function clearExpiredMessages() {
let clear = () =>
cursor.close(() => {
if (deleted) {
logger.debug(
{
tnx: 'gc'
},
'Purged %s messages',
deleted
);
log.verbose('GC', 'Purged %s messages', deleted);
}
return deleteOrphaned(next);
});
@ -398,25 +271,11 @@ function clearExpiredMessages() {
db.database.collection('archived').deleteOne({ _id: messageData._id }, err => {
if (err) {
//failed to delete
logger.error(
{
tnx: 'gc',
err
},
'Failed to delete archived message id=%s. %s',
messageData._id,
err.message
);
log.error('GC', 'Failed to delete archived message id=%s. %s', messageData._id, err.message);
return cursor.close(() => done(err));
}
logger.debug(
{
tnx: 'gc'
},
'Deleted archived message id=%s',
messageData._id
);
log.verbose('GC', 'Deleted archived message id=%s', messageData._id);
let attachmentIds = Object.keys(messageData.mimeTree.attachmentMap || {}).map(key => messageData.mimeTree.attachmentMap[key]);
@ -472,19 +331,13 @@ function runTasks() {
},
{
$set: {
locked: false
locked: false,
status: 'queued'
}
},
err => {
if (err) {
logger.error(
{
err,
tnx: 'mongo'
},
'Failed releasing expired tasks. error=%s',
err.message
);
log.error('Tasks', 'Failed releasing expired tasks. error=%s', err.message);
// back off processing tasks for 5 minutes
taskTimeout = setTimeout(runTasks, consts.TASK_STARTUP_INTERVAL);
@ -501,7 +354,8 @@ function runTasks() {
{
$set: {
locked: true,
lockedUntil: new Date(Date.now() + 1 * 3600 * 1000)
lockedUntil: new Date(Date.now() + consts.TASK_LOCK_INTERVAL),
status: 'processing'
}
},
{
@ -509,14 +363,7 @@ function runTasks() {
},
(err, r) => {
if (err) {
logger.error(
{
err,
tnx: 'mongo'
},
'Failed releasing expired tasks. error=%s',
err.message
);
log.error('Tasks', 'Failed releasing expired tasks. error=%s', err.message);
// back off processing tasks for 5 minutes
taskTimeout = setTimeout(runTasks, consts.TASK_STARTUP_INTERVAL);
@ -532,18 +379,44 @@ function runTasks() {
let taskData = r.value;
// keep lock alive
let keepAliveTimer;
let processed = false;
let keepAlive = () => {
clearTimeout(keepAliveTimer);
keepAliveTimer = setTimeout(() => {
if (processed) {
return;
}
db.database.collection('tasks').updateOne(
{
_id: taskData._id,
locked: true
},
{
$set: {
lockedUntil: new Date(Date.now() + consts.TASK_LOCK_INTERVAL),
status: 'processing'
}
},
(err, r) => {
if (!err && !processed && r.matchedCount) {
keepAlive();
}
}
);
}, consts.TASK_UPDATE_INTERVAL);
keepAliveTimer.unref();
};
keepAlive();
// we have a task to process
processTask(taskData, (err, release) => {
clearTimeout(keepAliveTimer);
processed = true;
if (err) {
logger.error(
{
err,
tnx: 'mongo'
},
'Failed processing task id=%s error=%s',
taskData._id,
err.message
);
log.error('Tasks', 'Failed processing task id=%s error=%s', taskData._id, err.message);
// back off processing tasks for 5 minutes
taskTimeout = setTimeout(runTasks, consts.TASK_STARTUP_INTERVAL);
@ -558,13 +431,15 @@ function runTasks() {
nextTask()
);
} else {
// requeue
db.database.collection('tasks').updateOne(
{
_id: taskData._id
},
{
$set: {
locked: false
locked: false,
status: 'queued'
}
},
nextTask()
@ -580,8 +455,25 @@ function runTasks() {
}
function processTask(taskData, callback) {
console.log(taskData);
log.verbose('Tasks', 'task=%s', JSON.stringify(taskData));
// release task by returning true
return callback(null, true);
switch (taskData.task) {
case 'restore':
return taskRestore(
taskData,
{
messageHandler
},
err => {
if (err) {
return callback(err);
}
// release
callback(null, true);
}
);
default:
// release task by returning true
return callback(null, true);
}
}