2019-09-25 21:39:48 +08:00
|
|
|
'use strict';
|
|
|
|
|
|
|
|
const ObjectID = require('mongodb').ObjectID;
|
|
|
|
const GridFSBucket = require('mongodb').GridFSBucket;
|
2020-03-19 19:21:05 +08:00
|
|
|
const log = require('npmlog');
|
2019-09-25 21:39:48 +08:00
|
|
|
|
|
|
|
class AuditHandler {
|
|
|
|
constructor(options) {
|
|
|
|
this.options = options || {};
|
2019-09-28 02:26:17 +08:00
|
|
|
|
|
|
|
this.database = options.database;
|
2019-09-29 20:00:44 +08:00
|
|
|
this.users = options.user || options.database;
|
2019-09-25 21:39:48 +08:00
|
|
|
this.gridfs = options.gridfs || options.database;
|
|
|
|
|
2019-09-28 02:26:17 +08:00
|
|
|
this.loggelf = options.loggelf || (() => false);
|
|
|
|
|
2019-09-25 21:39:48 +08:00
|
|
|
this.bucketName = this.options.bucket || 'audit';
|
|
|
|
this.gridstore = new GridFSBucket(this.gridfs, {
|
|
|
|
bucketName: this.bucketName,
|
|
|
|
chunkSizeBytes: 255 * 1024,
|
|
|
|
writeConcern: { w: this.options.writeConcern || 1 }
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
2019-09-29 20:00:44 +08:00
|
|
|
async create(options) {
|
|
|
|
options = options || {};
|
|
|
|
|
|
|
|
if (!options.user || !ObjectID.isValid(options.user)) {
|
|
|
|
let err = new Error('Missing user ID');
|
|
|
|
err.code = 'InputValidationError';
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
|
|
|
|
let auditData = {
|
|
|
|
user: typeof options.user === 'string' ? new ObjectID(options.user) : options.user,
|
|
|
|
start: options.start, // Date or null
|
|
|
|
end: options.end, // Date or null
|
2019-10-01 16:22:10 +08:00
|
|
|
expires: options.expires, // Date
|
2020-07-01 15:37:28 +08:00
|
|
|
deleted: false, // Boolean
|
2020-07-04 04:33:05 +08:00
|
|
|
notes: options.notes, // String
|
2020-07-01 15:37:28 +08:00
|
|
|
meta: options.meta || {}, // Object
|
2019-10-01 16:22:10 +08:00
|
|
|
|
|
|
|
import: {
|
|
|
|
status: 'queued',
|
|
|
|
failed: 0,
|
|
|
|
copied: 0
|
2020-07-01 15:37:28 +08:00
|
|
|
},
|
|
|
|
|
|
|
|
audited: 0,
|
|
|
|
lastAuditedMessage: null
|
2019-09-29 20:00:44 +08:00
|
|
|
};
|
|
|
|
|
2019-10-01 16:22:10 +08:00
|
|
|
let r;
|
|
|
|
try {
|
|
|
|
r = await this.database.collection('audits').insertOne(auditData);
|
|
|
|
} catch (err) {
|
|
|
|
err.code = 'InternalDatabaseError';
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
|
2019-09-29 20:00:44 +08:00
|
|
|
if (!r.insertedId) {
|
2019-10-01 16:22:10 +08:00
|
|
|
let err = new Error('Failed to create audit entry');
|
2019-09-29 20:00:44 +08:00
|
|
|
err.code = 'InternalDatabaseError';
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
|
|
|
|
auditData._id = r.insertedId;
|
|
|
|
|
|
|
|
try {
|
|
|
|
let now = new Date();
|
|
|
|
await this.database.collection('tasks').insertOne({
|
|
|
|
task: 'audit',
|
|
|
|
locked: false,
|
|
|
|
lockedUntil: now,
|
|
|
|
created: now,
|
|
|
|
status: 'queued',
|
|
|
|
audit: auditData._id,
|
|
|
|
user: auditData.user,
|
|
|
|
start: auditData.start,
|
|
|
|
end: auditData.end
|
|
|
|
});
|
|
|
|
} catch (err) {
|
|
|
|
// try to rollback
|
|
|
|
err.code = err.code = 'InternalDatabaseError';
|
|
|
|
|
|
|
|
try {
|
|
|
|
await this.database.collection('audits').deleteOne({ _id: auditData._id });
|
|
|
|
} catch (e) {
|
|
|
|
// ignore
|
|
|
|
}
|
|
|
|
|
|
|
|
throw err;
|
|
|
|
}
|
|
|
|
|
|
|
|
return auditData._id;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Store message to audit GridFS
|
|
|
|
*
|
|
|
|
* @param {ObjectID} audit ID of the audit session
|
|
|
|
* @param {Mixed} message Either a Buffer, an Array of Buffers or a Stream
|
|
|
|
* @param {Object} metadata Metadata for the stored message
|
|
|
|
*/
|
2020-07-01 15:37:28 +08:00
|
|
|
async store(audit, message, metadata, skipCounters) {
|
2019-09-25 21:39:48 +08:00
|
|
|
if (!message) {
|
|
|
|
throw new Error('Missing message content');
|
|
|
|
}
|
|
|
|
|
|
|
|
if (typeof message === 'string') {
|
|
|
|
message = Buffer.from(message);
|
|
|
|
}
|
|
|
|
|
|
|
|
let id = new ObjectID();
|
|
|
|
|
|
|
|
metadata = metadata || {};
|
|
|
|
metadata.audit = metadata.audit || audit;
|
|
|
|
metadata.date = metadata.date || new Date();
|
|
|
|
|
2020-07-01 15:37:28 +08:00
|
|
|
let result = await new Promise((resolve, reject) => {
|
2019-09-25 21:39:48 +08:00
|
|
|
let stream = this.gridstore.openUploadStreamWithId(id, null, {
|
|
|
|
contentType: 'message/rfc822',
|
|
|
|
metadata
|
|
|
|
});
|
|
|
|
|
|
|
|
stream.once('finish', () => resolve(id));
|
|
|
|
|
|
|
|
if (Buffer.isBuffer(message)) {
|
2019-09-29 20:00:44 +08:00
|
|
|
message = [message];
|
|
|
|
}
|
|
|
|
|
|
|
|
let writeChunks = async () => {
|
|
|
|
// write chunk by chunk
|
|
|
|
for (let chunk of message) {
|
|
|
|
if (stream.write(chunk) === false) {
|
|
|
|
await new Promise(resolve => {
|
2019-10-01 16:22:10 +08:00
|
|
|
stream.once('drain', resolve);
|
2019-09-29 20:00:44 +08:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
2019-10-01 16:22:10 +08:00
|
|
|
stream.end();
|
2019-09-29 20:00:44 +08:00
|
|
|
};
|
|
|
|
|
|
|
|
if (Array.isArray(message)) {
|
|
|
|
return writeChunks().catch(err => reject(err));
|
2019-09-25 21:39:48 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
message.on('error', err => {
|
|
|
|
stream.emit('error', err);
|
|
|
|
});
|
|
|
|
|
|
|
|
message.pipe(stream);
|
|
|
|
});
|
2020-07-01 15:37:28 +08:00
|
|
|
|
|
|
|
if (result && !skipCounters) {
|
|
|
|
await this.database.collection('audits').updateOne({ _id: metadata.audit }, { $inc: { audited: 1 }, $set: { lastAuditedMessage: new Date() } });
|
|
|
|
}
|
|
|
|
|
|
|
|
return result;
|
2019-09-25 21:39:48 +08:00
|
|
|
}
|
2019-11-21 22:15:25 +08:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieve message from audit GridFS
|
|
|
|
*
|
|
|
|
* @param {ObjectID} audit ID of the audit session
|
|
|
|
* @param {Mixed} message Either a Buffer, an Array of Buffers or a Stream
|
|
|
|
* @param {Object} metadata Metadata for the stored message
|
|
|
|
*/
|
|
|
|
async retrieve(id) {
|
|
|
|
let row = await this.gridfs.collection('audit.files').findOne({ _id: id });
|
|
|
|
if (!row) {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
try {
|
|
|
|
let stream = this.gridstore.openDownloadStream(id);
|
|
|
|
if (stream) {
|
|
|
|
resolve(stream);
|
|
|
|
} else {
|
|
|
|
reject(new Error('Failed to open stream'));
|
|
|
|
}
|
|
|
|
} catch (err) {
|
|
|
|
reject(err);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|
2020-03-19 19:21:05 +08:00
|
|
|
|
|
|
|
async updateDeliveryStatus(queueId, seq, status, info) {
|
2020-05-27 16:55:36 +08:00
|
|
|
await this.gridfs.collection('audit.files').updateMany(
|
2020-03-19 19:21:05 +08:00
|
|
|
{ 'metadata.info.queueId': queueId },
|
|
|
|
{
|
|
|
|
$push: {
|
|
|
|
'metadata.info.delivery': {
|
|
|
|
seq,
|
|
|
|
status,
|
|
|
|
time: new Date(),
|
|
|
|
info
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
async removeAudit(auditData) {
|
|
|
|
let cursor = await this.gridfs.collection('audit.files').find({
|
|
|
|
'metadata.audit': auditData._id
|
|
|
|
});
|
|
|
|
|
|
|
|
let messages = 0;
|
|
|
|
let messageData;
|
|
|
|
while ((messageData = await cursor.next())) {
|
|
|
|
try {
|
|
|
|
await this.gridstore.delete(messageData._id);
|
|
|
|
messages++;
|
|
|
|
} catch (err) {
|
|
|
|
log.error('Audit', 'Failed to delete message %s. %s', messageData._id, err.message);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
await cursor.close();
|
|
|
|
|
2020-07-01 15:37:28 +08:00
|
|
|
await this.database.collection('audits').updateOne({ _id: auditData._id }, { $set: { deleted: true, deletedTime: new Date() } });
|
|
|
|
log.info('Audit', 'Deleted audit %s (%s messages)', auditData._id, messages);
|
|
|
|
return {
|
|
|
|
audit: auditData._id,
|
|
|
|
messages
|
|
|
|
};
|
2020-03-19 19:21:05 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
async cleanExpired() {
|
|
|
|
let expiredAudits = await this.database
|
|
|
|
.collection('audits')
|
|
|
|
.find({
|
2020-07-01 15:37:28 +08:00
|
|
|
expires: { $lt: new Date(), deleted: false }
|
2020-03-19 19:21:05 +08:00
|
|
|
})
|
|
|
|
.toArray();
|
|
|
|
|
|
|
|
for (let auditData of expiredAudits) {
|
|
|
|
try {
|
|
|
|
await this.removeAudit(auditData);
|
|
|
|
} catch (err) {
|
|
|
|
log.error('Audit', 'Failed to delete expired audit %s. %s', auditData._id, err.message);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-09-25 21:39:48 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = AuditHandler;
|