use locking when storing large attachments

This commit is contained in:
Andris Reinman 2019-01-17 11:48:23 +02:00
parent a61feb0eba
commit 669a74bbef
4 changed files with 154 additions and 85 deletions

View file

@ -2297,7 +2297,9 @@ module.exports = (db, server, messageHandler, userHandler) => {
}
if (!req.params.raw) {
raw = await getCompiledMessage(data);
raw = await getCompiledMessage(data, {
isDraft: !!result.value.draft
});
}
if (!raw || !raw.length) {
@ -3759,10 +3761,14 @@ function formatMessageListing(messageData) {
return response;
}
async function getCompiledMessage(data) {
async function getCompiledMessage(data, options) {
options = options || {};
return new Promise((resolve, reject) => {
let compiler = new MailComposer(data);
let compiled = compiler.compile();
if (options.isDraft) {
compiled.keepBcc = true;
}
let stream = compiled.createReadStream();
let chunks = [];
let chunklen = 0;

View file

@ -2,7 +2,10 @@
const GridFSBucket = require('mongodb').GridFSBucket;
const libbase64 = require('libbase64');
const RedFour = require('ioredfour');
const errors = require('../errors');
const log = require('npmlog');
const crypto = require('crypto');
// Set to false to disable base64 decoding feature
const FEATURE_DECODE_ATTACHMENTS = true;
@ -12,6 +15,11 @@ class GridstoreStorage {
this.bucketName = (options.options && options.options.bucket) || 'attachments';
this.decodeBase64 = (options.options && options.options.decodeBase64) || false;
this.lock = new RedFour({
redis: options.redis,
namespace: 'wildduck'
});
this.gridfs = options.gridfs;
this.gridstore = new GridFSBucket(this.gridfs, {
bucketName: this.bucketName,
@ -114,6 +122,34 @@ class GridstoreStorage {
}
}
let instance = crypto.randomBytes(8).toString('hex');
let lockId = 'gs.' + hash.toString('base64');
let storeLock;
let attachmentCallback = (...args) => {
if (storeLock) {
log.silly('GridStore', '[%s] UNLOCK lock=%s status=%s', instance, lockId, storeLock.success ? 'locked' : 'empty');
if (storeLock.success) {
this.lock.releaseLock(storeLock, () => {
if (returned) {
// might be already finished if retrying after delay
return;
}
callback(...args);
});
// unset variable to prevent double releasing
storeLock = false;
return;
}
storeLock = false;
}
if (returned) {
// might be already finished if retrying after delay
return;
}
callback(...args);
};
let tryCount = 0;
let tryStore = () => {
if (returned) {
@ -136,96 +172,122 @@ class GridstoreStorage {
},
(err, result) => {
if (err) {
return callback(err);
return attachmentCallback(err);
}
if (result && result.value) {
// already exists
return callback(null, result.value._id);
return attachmentCallback(null, result.value._id);
}
// try to insert it
let store = this.gridstore.openUploadStreamWithId(id, null, {
contentType: attachment.contentType,
metadata
});
store.once('error', err => {
if (returned) {
return;
let checkLock = done => {
if (storeLock) {
// continue processing, we have a lock
return done();
}
if (err.code === 11000) {
// most probably a race condition, try again
if (tryCount++ < 5) {
if (/attachments\.chunks /.test(err.message)) {
// partial chunks detected. might be because of:
// * another process is inserting the same attachment and thus no "files" entry yet
// * previously deleted attachment that has not been properly removed
// load data for an existing chunk to see the age of it
return this.gridfs.collection(this.bucketName + '.chunks').findOne(
{
files_id: hash
},
{
projection: {
_id: true
}
},
(err, data) => {
if (err) {
// whatever
return setTimeout(tryStore, 100 + 200 * Math.random());
}
if (attachment.body.length < 255 * 1024) {
// a single chunk attachment, no need for locking
return done();
}
if (!data || !data._id) {
// try again, no chunks found
return setTimeout(tryStore, 10);
}
// check how old is the previous chunk
let timestamp = data._id.getTimestamp();
if (timestamp && typeof timestamp.getTime === 'function' && timestamp.getTime() >= Date.now() - 15 * 60 * 1000) {
// chunk is newer than 15 minutes, assume race condition and try again after a while
return setTimeout(tryStore, 300 + 200 * Math.random());
}
// partial chunks for a probably deleted message detected, try to clean up
setTimeout(() => {
if (returned) {
return;
}
this.cleanupGarbage(id, tryStore);
}, 100 + 200 * Math.random());
}
);
// Try to get a lock
// Using locks is required to prevent multiple messages storing the same large attachment at
// the same time.
// NB! Setting lock ttl too high has a downside that restarting the process would still keep
// the lock and thus anyone trying to store the message would have to wait
this.lock.waitAcquireLock(lockId, 2 * 60 * 1000 /* Lock expires after 3min if not released */, false, (err, lock) => {
if (err) {
if (returned) {
return;
}
return setTimeout(tryStore, 10);
returned = true;
return attachmentCallback(err);
}
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
if (!metadata.decoded) {
store.end(attachment.body);
} else {
let decoder = new libbase64.Decoder();
decoder.pipe(store);
decoder.once('error', err => {
// pass error forward
store.emit('error', err);
storeLock = lock;
log.silly('GridStore', '[%s] LOCK lock=%s status=%s', instance, lockId, storeLock.success ? 'locked' : 'empty');
return tryStore(); // start from over
});
decoder.end(attachment.body);
}
};
checkLock(() => {
// try to insert it
let store = this.gridstore.openUploadStreamWithId(id, null, {
contentType: attachment.contentType,
metadata
});
store.once('error', err => {
if (returned) {
return;
}
if (err.code === 11000) {
// most probably a race condition, try again
if (tryCount++ < 5) {
if (/\.chunks /.test(err.message)) {
// Partial chunks detected. Might be because of:
// * another process is inserting the same attachment and thus no "files" entry yet (should not happend though due to locking)
// * previously deleted attachment that has not been properly removed
// Load data for an existing chunk to see the age of it
return this.gridfs.collection(this.bucketName + '.chunks').findOne(
{
files_id: hash
},
{
projection: {
_id: true
}
},
(err, data) => {
if (err) {
// whatever
return setTimeout(tryStore, 100 + 200 * Math.random());
}
if (!data || !data._id) {
// try again, no chunks found
return setTimeout(tryStore, 10);
}
// Check how old is the previous chunk
let timestamp = data._id.getTimestamp();
if (timestamp && typeof timestamp.getTime === 'function' && timestamp.getTime() >= Date.now() - 5 * 60 * 1000) {
// chunk is newer than 5 minutes, assume race condition and try again after a while
return setTimeout(tryStore, 300 + 200 * Math.random());
}
// partial chunks for a probably deleted message detected, try to clean up
setTimeout(() => {
if (returned) {
return;
}
this.cleanupGarbage(id, tryStore);
}, 100 + 200 * Math.random());
}
);
}
return setTimeout(tryStore, 10);
}
}
attachmentCallback(err);
});
store.once('finish', () => attachmentCallback(null, id));
if (!metadata.decoded) {
store.end(attachment.body);
} else {
let decoder = new libbase64.Decoder();
decoder.pipe(store);
decoder.once('error', err => {
// pass error forward
store.emit('error', err);
});
decoder.end(attachment.body);
}
});
}
);
};

View file

@ -29,7 +29,8 @@ class MessageHandler {
options.attachmentStorage ||
new AttachmentStorage({
gridfs: options.gridfs || options.database,
options: options.attachments
options: options.attachments,
redis: this.redis
});
this.indexer = new Indexer({

View file

@ -15,13 +15,13 @@
"author": "Andris Reinman",
"license": "EUPL-1.1+",
"devDependencies": {
"ajv": "6.6.2",
"ajv": "6.7.0",
"apidoc": "0.17.7",
"browserbox": "0.9.1",
"chai": "4.2.0",
"eslint": "5.12.0",
"eslint-config-nodemailer": "1.2.0",
"eslint-config-prettier": "3.3.0",
"eslint-config-prettier": "3.5.0",
"grunt": "1.0.3",
"grunt-cli": "1.3.2",
"grunt-eslint": "21.0.0",
@ -56,15 +56,15 @@
"mailsplit": "4.2.4",
"mobileconfig": "2.1.0",
"mongo-cursor-pagination": "7.1.0",
"mongodb": "3.1.10",
"mongodb": "3.1.12",
"mongodb-extended-json": "1.10.1",
"node-forge": "0.7.6",
"nodemailer": "5.1.1",
"npmlog": "4.1.2",
"openpgp": "4.4.3",
"openpgp": "4.4.5",
"pem": "1.13.2",
"pwnedpasswords": "1.0.4",
"qrcode": "1.3.2",
"qrcode": "1.3.3",
"restify": "7.5.0",
"restify-logger": "2.0.1",
"seq-index": "1.1.0",