From 83bab195e7ad82fa58ed81ec3b52999fbe4856aa Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Sun, 29 Sep 2019 15:00:44 +0300 Subject: [PATCH] updates to message auditing --- api.js | 12 ++++- lib/api/audit.js | 12 +---- lib/audit-handler.js | 109 ++++++++++++++++++++++++++++++++++++++++--- lib/tasks/audit.js | 42 +++++++++++++++-- tasks.js | 1 + 5 files changed, 154 insertions(+), 22 deletions(-) diff --git a/api.js b/api.js index 39a009ff..a3f78d51 100644 --- a/api.js +++ b/api.js @@ -8,6 +8,7 @@ const UserHandler = require('./lib/user-handler'); const MailboxHandler = require('./lib/mailbox-handler'); const MessageHandler = require('./lib/message-handler'); const StorageHandler = require('./lib/storage-handler'); +const AuditHandler = require('./lib/audit-handler'); const ImapNotifier = require('./lib/imap-notifier'); const db = require('./lib/db'); const certs = require('./lib/certs'); @@ -41,6 +42,7 @@ let userHandler; let mailboxHandler; let messageHandler; let storageHandler; +let auditHandler; let notifier; let loggelf; @@ -464,6 +466,14 @@ module.exports = done => { loggelf: message => loggelf(message) }); + auditHandler = new AuditHandler({ + database: db.database, + users: db.users, + gridfs: db.gridfs, + bucket: 'audit', + loggelf: message => loggelf(message) + }); + server.loggelf = message => loggelf(message); usersRoutes(db, server, userHandler); @@ -480,7 +490,7 @@ module.exports = done => { authRoutes(db, server, userHandler); autoreplyRoutes(db, server); submitRoutes(db, server, messageHandler, userHandler); - auditRoutes(db, server); + auditRoutes(db, server, auditHandler); domainaliasRoutes(db, server); dkimRoutes(db, server); diff --git a/lib/api/audit.js b/lib/api/audit.js index 63979e80..abfde2dc 100644 --- a/lib/api/audit.js +++ b/lib/api/audit.js @@ -5,7 +5,7 @@ const tools = require('../tools'); const roles = require('../roles'); const ObjectID = require('mongodb').ObjectID; -module.exports = (db, server) => { +module.exports = (db, server, auditHandler) => { /** * @api {post} /audit Create new audit * @apiName PostAudit @@ -88,19 +88,11 @@ module.exports = (db, server) => { // permissions check req.validate(roles.can(req.role).updateAny('audit')); - let audit = new ObjectID(); let user = new ObjectID(result.value.user); let start = result.value.start; let end = result.value.end; - let now = new Date(); - await db.database.collection('tasks').insertOne({ - task: 'audit', - locked: false, - lockedUntil: now, - created: now, - status: 'queued', - audit, + let audit = await auditHandler.create({ user, start, end diff --git a/lib/audit-handler.js b/lib/audit-handler.js index b046a0f6..9d1aeda8 100644 --- a/lib/audit-handler.js +++ b/lib/audit-handler.js @@ -8,6 +8,7 @@ class AuditHandler { this.options = options || {}; this.database = options.database; + this.users = options.user || options.database; this.gridfs = options.gridfs || options.database; this.loggelf = options.loggelf || (() => false); @@ -20,6 +21,92 @@ class AuditHandler { }); } + async create(options) { + options = options || {}; + + if (!options.user || !ObjectID.isValid(options.user)) { + let err = new Error('Missing user ID'); + err.code = 'InputValidationError'; + throw err; + } + + let auditData = { + user: typeof options.user === 'string' ? new ObjectID(options.user) : options.user, + start: options.start, // Date or null + end: options.end, // Date or null + 'import.status': 'queued' + }; + + let r = await this.database.collection('audits').insertOne(auditData); + if (!r.insertedId) { + let err = new Error(); + err.code = 'InternalDatabaseError'; + throw err; + } + + auditData._id = r.insertedId; + + try { + // NB! this user might not exist anymore, so do not check if any users were updated or not + await this.users.collection('users').updateOne( + { + _id: auditData.user + }, + { + $addToSet: { + audit: auditData._id + } + } + ); + } catch (err) { + // try to rollback + err.code = err.code = 'InternalDatabaseError'; + + try { + await this.database.collection('audits').deleteOne({ _id: auditData._id }); + } catch (e) { + // ignore + } + + throw err; + } + + try { + let now = new Date(); + await this.database.collection('tasks').insertOne({ + task: 'audit', + locked: false, + lockedUntil: now, + created: now, + status: 'queued', + audit: auditData._id, + user: auditData.user, + start: auditData.start, + end: auditData.end + }); + } catch (err) { + // try to rollback + err.code = err.code = 'InternalDatabaseError'; + + try { + await this.database.collection('audits').deleteOne({ _id: auditData._id }); + } catch (e) { + // ignore + } + + throw err; + } + + return auditData._id; + } + + /** + * Store message to audit GridFS + * + * @param {ObjectID} audit ID of the audit session + * @param {Mixed} message Either a Buffer, an Array of Buffers or a Stream + * @param {Object} metadata Metadata for the stored message + */ async store(audit, message, metadata) { if (!message) { throw new Error('Missing message content'); @@ -36,10 +123,6 @@ class AuditHandler { metadata.date = metadata.date || new Date(); return new Promise((resolve, reject) => { - if (!Buffer.isBuffer(message) && typeof message.pipe !== 'function') { - return reject(new Error('Invalid message content')); - } - let stream = this.gridstore.openUploadStreamWithId(id, null, { contentType: 'message/rfc822', metadata @@ -48,8 +131,22 @@ class AuditHandler { stream.once('finish', () => resolve(id)); if (Buffer.isBuffer(message)) { - // store as a buffer - return stream.end(message); + message = [message]; + } + + let writeChunks = async () => { + // write chunk by chunk + for (let chunk of message) { + if (stream.write(chunk) === false) { + await new Promise(resolve => { + stream.once('drain', resolve()); + }); + } + } + }; + + if (Array.isArray(message)) { + return writeChunks().catch(err => reject(err)); } message.on('error', err => { diff --git a/lib/tasks/audit.js b/lib/tasks/audit.js index 1b5ab61e..696a07fe 100644 --- a/lib/tasks/audit.js +++ b/lib/tasks/audit.js @@ -30,17 +30,26 @@ let run = async (taskData, options) => { } let processMessage = async messageData => { - console.log(messageData); - let builder = messageHandler.indexer.rebuild(messageData.mimeTree); if (!builder || builder.type !== 'stream' || !builder.value) { return false; } - let auditMessage = await auditHandler.store(taskData.audit, builder.value, {}); + let auditMessage = await auditHandler.store(taskData.audit, builder.value, { + date: messageData.idate, + msgid: messageData.msgid, + header: messageData.mimeTree && messageData.mimeTree.parsedHeader, + ha: messageData.ha, + info: messageData.meta + }); + return auditMessage; }; + let copied = 0; + let failed = 0; + let status = 'imported'; //expect to complete successfully + let processMessages = async collection => { let cursor = await db.users.collection(collection).find(query, { projection: { @@ -65,6 +74,7 @@ let run = async (taskData, options) => { messageData._id, auditMessage ); + copied++; } catch (err) { log.error( 'Tasks', @@ -75,6 +85,7 @@ let run = async (taskData, options) => { 'Failed to process message', err.message ); + failed++; } } await cursor.close(); @@ -88,13 +99,34 @@ let run = async (taskData, options) => { 'Failed to fetch stored messages', err.message ); + err.code = 'InternalDatabaseError'; throw err; } }; - await processMessages('messages'); - await processMessages('archive'); + try { + await processMessages('messages'); + } catch (err) { + status = 'import failed'; + } + + try { + await processMessages('archive'); + } catch (err) { + status = 'import failed'; + } + + await db.database.collection('audits').updateOne( + { _id: taskData.audit }, + { + $set: { + 'import.status': status, + 'import.copied': copied, + 'import.failed': failed + } + } + ); log.verbose('Tasks', 'task=audit id=%s user=%s message=%s', taskData._id, taskData.user, `Copied user messages for auditing`); return true; diff --git a/tasks.js b/tasks.js index 4e1c4747..caaffb4f 100644 --- a/tasks.js +++ b/tasks.js @@ -95,6 +95,7 @@ module.exports.start = callback => { auditHandler = new AuditHandler({ database: db.database, + users: db.users, gridfs: db.gridfs, bucket: 'audit', loggelf: message => loggelf(message)