updates to message auditing

This commit is contained in:
Andris Reinman 2019-09-29 15:00:44 +03:00
parent 389d08edc9
commit 83bab195e7
5 changed files with 154 additions and 22 deletions

12
api.js
View file

@ -8,6 +8,7 @@ const UserHandler = require('./lib/user-handler');
const MailboxHandler = require('./lib/mailbox-handler');
const MessageHandler = require('./lib/message-handler');
const StorageHandler = require('./lib/storage-handler');
const AuditHandler = require('./lib/audit-handler');
const ImapNotifier = require('./lib/imap-notifier');
const db = require('./lib/db');
const certs = require('./lib/certs');
@ -41,6 +42,7 @@ let userHandler;
let mailboxHandler;
let messageHandler;
let storageHandler;
let auditHandler;
let notifier;
let loggelf;
@ -464,6 +466,14 @@ module.exports = done => {
loggelf: message => loggelf(message)
});
auditHandler = new AuditHandler({
database: db.database,
users: db.users,
gridfs: db.gridfs,
bucket: 'audit',
loggelf: message => loggelf(message)
});
server.loggelf = message => loggelf(message);
usersRoutes(db, server, userHandler);
@ -480,7 +490,7 @@ module.exports = done => {
authRoutes(db, server, userHandler);
autoreplyRoutes(db, server);
submitRoutes(db, server, messageHandler, userHandler);
auditRoutes(db, server);
auditRoutes(db, server, auditHandler);
domainaliasRoutes(db, server);
dkimRoutes(db, server);

View file

@ -5,7 +5,7 @@ const tools = require('../tools');
const roles = require('../roles');
const ObjectID = require('mongodb').ObjectID;
module.exports = (db, server) => {
module.exports = (db, server, auditHandler) => {
/**
* @api {post} /audit Create new audit
* @apiName PostAudit
@ -88,19 +88,11 @@ module.exports = (db, server) => {
// permissions check
req.validate(roles.can(req.role).updateAny('audit'));
let audit = new ObjectID();
let user = new ObjectID(result.value.user);
let start = result.value.start;
let end = result.value.end;
let now = new Date();
await db.database.collection('tasks').insertOne({
task: 'audit',
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
audit,
let audit = await auditHandler.create({
user,
start,
end

View file

@ -8,6 +8,7 @@ class AuditHandler {
this.options = options || {};
this.database = options.database;
this.users = options.user || options.database;
this.gridfs = options.gridfs || options.database;
this.loggelf = options.loggelf || (() => false);
@ -20,6 +21,92 @@ class AuditHandler {
});
}
async create(options) {
options = options || {};
if (!options.user || !ObjectID.isValid(options.user)) {
let err = new Error('Missing user ID');
err.code = 'InputValidationError';
throw err;
}
let auditData = {
user: typeof options.user === 'string' ? new ObjectID(options.user) : options.user,
start: options.start, // Date or null
end: options.end, // Date or null
'import.status': 'queued'
};
let r = await this.database.collection('audits').insertOne(auditData);
if (!r.insertedId) {
let err = new Error();
err.code = 'InternalDatabaseError';
throw err;
}
auditData._id = r.insertedId;
try {
// NB! this user might not exist anymore, so do not check if any users were updated or not
await this.users.collection('users').updateOne(
{
_id: auditData.user
},
{
$addToSet: {
audit: auditData._id
}
}
);
} catch (err) {
// try to rollback
err.code = err.code = 'InternalDatabaseError';
try {
await this.database.collection('audits').deleteOne({ _id: auditData._id });
} catch (e) {
// ignore
}
throw err;
}
try {
let now = new Date();
await this.database.collection('tasks').insertOne({
task: 'audit',
locked: false,
lockedUntil: now,
created: now,
status: 'queued',
audit: auditData._id,
user: auditData.user,
start: auditData.start,
end: auditData.end
});
} catch (err) {
// try to rollback
err.code = err.code = 'InternalDatabaseError';
try {
await this.database.collection('audits').deleteOne({ _id: auditData._id });
} catch (e) {
// ignore
}
throw err;
}
return auditData._id;
}
/**
* Store message to audit GridFS
*
* @param {ObjectID} audit ID of the audit session
* @param {Mixed} message Either a Buffer, an Array of Buffers or a Stream
* @param {Object} metadata Metadata for the stored message
*/
async store(audit, message, metadata) {
if (!message) {
throw new Error('Missing message content');
@ -36,10 +123,6 @@ class AuditHandler {
metadata.date = metadata.date || new Date();
return new Promise((resolve, reject) => {
if (!Buffer.isBuffer(message) && typeof message.pipe !== 'function') {
return reject(new Error('Invalid message content'));
}
let stream = this.gridstore.openUploadStreamWithId(id, null, {
contentType: 'message/rfc822',
metadata
@ -48,8 +131,22 @@ class AuditHandler {
stream.once('finish', () => resolve(id));
if (Buffer.isBuffer(message)) {
// store as a buffer
return stream.end(message);
message = [message];
}
let writeChunks = async () => {
// write chunk by chunk
for (let chunk of message) {
if (stream.write(chunk) === false) {
await new Promise(resolve => {
stream.once('drain', resolve());
});
}
}
};
if (Array.isArray(message)) {
return writeChunks().catch(err => reject(err));
}
message.on('error', err => {

View file

@ -30,17 +30,26 @@ let run = async (taskData, options) => {
}
let processMessage = async messageData => {
console.log(messageData);
let builder = messageHandler.indexer.rebuild(messageData.mimeTree);
if (!builder || builder.type !== 'stream' || !builder.value) {
return false;
}
let auditMessage = await auditHandler.store(taskData.audit, builder.value, {});
let auditMessage = await auditHandler.store(taskData.audit, builder.value, {
date: messageData.idate,
msgid: messageData.msgid,
header: messageData.mimeTree && messageData.mimeTree.parsedHeader,
ha: messageData.ha,
info: messageData.meta
});
return auditMessage;
};
let copied = 0;
let failed = 0;
let status = 'imported'; //expect to complete successfully
let processMessages = async collection => {
let cursor = await db.users.collection(collection).find(query, {
projection: {
@ -65,6 +74,7 @@ let run = async (taskData, options) => {
messageData._id,
auditMessage
);
copied++;
} catch (err) {
log.error(
'Tasks',
@ -75,6 +85,7 @@ let run = async (taskData, options) => {
'Failed to process message',
err.message
);
failed++;
}
}
await cursor.close();
@ -88,13 +99,34 @@ let run = async (taskData, options) => {
'Failed to fetch stored messages',
err.message
);
err.code = 'InternalDatabaseError';
throw err;
}
};
await processMessages('messages');
await processMessages('archive');
try {
await processMessages('messages');
} catch (err) {
status = 'import failed';
}
try {
await processMessages('archive');
} catch (err) {
status = 'import failed';
}
await db.database.collection('audits').updateOne(
{ _id: taskData.audit },
{
$set: {
'import.status': status,
'import.copied': copied,
'import.failed': failed
}
}
);
log.verbose('Tasks', 'task=audit id=%s user=%s message=%s', taskData._id, taskData.user, `Copied user messages for auditing`);
return true;

View file

@ -95,6 +95,7 @@ module.exports.start = callback => {
auditHandler = new AuditHandler({
database: db.database,
users: db.users,
gridfs: db.gridfs,
bucket: 'audit',
loggelf: message => loggelf(message)