wildduck/lib/tasks/audit.js
2021-06-20 13:40:04 +03:00

194 lines
5.8 KiB
JavaScript

'use strict';
const log = require('npmlog');
const db = require('../db');
let run = async (task, data, options) => {
const { auditHandler, messageHandler } = options;
let query = {
user: data.user
};
if (data.start) {
let start = data.start;
if (['number', 'string'].includes(typeof start)) {
start = new Date(start);
}
query.idate = query.idate || {};
query.idate.$gte = start;
}
if (data.end) {
let end = data.end;
if (['number', 'string'].includes(typeof end)) {
end = new Date(end);
}
query.idate = query.idate || {};
query.idate.$lte = end;
}
let mailboxes = new Map(
(await db.database.collection('mailboxes').find({ user: data.user }).toArray()).map(mailboxData => [mailboxData._id.toString(), mailboxData])
);
let processMessage = async messageData => {
let builder = messageHandler.indexer.rebuild(messageData.mimeTree);
if (!builder || builder.type !== 'stream' || !builder.value) {
return false;
}
let mailboxData = messageData.mailbox ? mailboxes.get(messageData.mailbox.toString()) : false;
let auditMessage = await auditHandler.store(
data.audit,
builder.value,
{
date: messageData.idate,
msgid: messageData.msgid,
header: messageData.mimeTree && messageData.mimeTree.parsedHeader,
ha: messageData.ha,
mailbox: messageData.mailbox,
mailboxPath: mailboxData ? mailboxData.path : false,
info: messageData.meta,
draft: messageData.draft,
imported: true
},
true
);
return auditMessage;
};
let copied = 0;
let failed = 0;
let status = 'imported'; //expect to complete successfully
let processMessages = async collection => {
let cursor = await db.database.collection(collection).find(query, {
noCursorTimeout: true,
projection: {
_id: true,
user: true,
mimeTree: true,
meta: true,
mailbox: true,
idate: true,
msgid: true,
ha: true,
draft: true
}
});
let messageData;
try {
while ((messageData = await cursor.next())) {
try {
let auditMessage = await processMessage(messageData);
log.verbose(
'Tasks',
'task=audit id=%s user=%s coll=%s message=%s src=%s dst=%s',
task._id,
data.user,
collection,
'Stored message to audit base',
messageData._id,
auditMessage
);
copied++;
try {
await db.database.collection('audits').updateOne(
{ _id: data.audit },
{
$inc: {
'import.copied': 1,
audited: 1
},
$set: {
lastAuditedMessage: new Date()
}
}
);
} catch (e) {
//ignore
}
} catch (err) {
log.error(
'Tasks',
'task=audit id=%s user=%s coll=%s message=%s error=%s',
task._id,
data.user,
collection,
'Failed to process message',
err.message
);
failed++;
try {
await db.database.collection('audits').updateOne(
{ _id: data.audit },
{
$inc: {
'import.failed': 1
}
}
);
} catch (e) {
//ignore
}
}
}
await cursor.close();
} catch (err) {
log.error(
'Tasks',
'task=audit id=%s user=%s coll=%s message=%s error=%s',
task._id,
data.user,
collection,
'Failed to fetch stored messages',
err.message
);
err.code = 'InternalDatabaseError';
throw err;
}
};
await db.database.collection('audits').updateOne(
{ _id: data.audit },
{
$set: {
'import.status': 'importing'
}
}
);
try {
await processMessages('messages');
} catch (err) {
status = 'failed';
}
try {
await processMessages('archive');
} catch (err) {
status = 'failed';
}
await db.database.collection('audits').updateOne(
{ _id: data.audit },
{
$set: {
'import.status': status
}
}
);
log.verbose('Tasks', 'task=audit id=%s user=%s message=%s copied=%s failed=%s', task._id, data.user, `Copied user messages for auditing`, copied, failed);
return true;
};
module.exports = (task, data, options, callback) => {
run(task, data, options)
.then(response => callback(null, response))
.catch(callback);
};