From 669a74bbef70ba64d14f49cdc4845ebe34c7a217 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Thu, 17 Jan 2019 11:48:23 +0200 Subject: [PATCH] use locking when storing large attachments --- lib/api/messages.js | 10 +- lib/attachments/gridstore-storage.js | 216 +++++++++++++++++---------- lib/message-handler.js | 3 +- package.json | 10 +- 4 files changed, 154 insertions(+), 85 deletions(-) diff --git a/lib/api/messages.js b/lib/api/messages.js index c3808719..916383ab 100644 --- a/lib/api/messages.js +++ b/lib/api/messages.js @@ -2297,7 +2297,9 @@ module.exports = (db, server, messageHandler, userHandler) => { } if (!req.params.raw) { - raw = await getCompiledMessage(data); + raw = await getCompiledMessage(data, { + isDraft: !!result.value.draft + }); } if (!raw || !raw.length) { @@ -3759,10 +3761,14 @@ function formatMessageListing(messageData) { return response; } -async function getCompiledMessage(data) { +async function getCompiledMessage(data, options) { + options = options || {}; return new Promise((resolve, reject) => { let compiler = new MailComposer(data); let compiled = compiler.compile(); + if (options.isDraft) { + compiled.keepBcc = true; + } let stream = compiled.createReadStream(); let chunks = []; let chunklen = 0; diff --git a/lib/attachments/gridstore-storage.js b/lib/attachments/gridstore-storage.js index d606ef09..2e46b813 100644 --- a/lib/attachments/gridstore-storage.js +++ b/lib/attachments/gridstore-storage.js @@ -2,7 +2,10 @@ const GridFSBucket = require('mongodb').GridFSBucket; const libbase64 = require('libbase64'); +const RedFour = require('ioredfour'); const errors = require('../errors'); +const log = require('npmlog'); +const crypto = require('crypto'); // Set to false to disable base64 decoding feature const FEATURE_DECODE_ATTACHMENTS = true; @@ -12,6 +15,11 @@ class GridstoreStorage { this.bucketName = (options.options && options.options.bucket) || 'attachments'; this.decodeBase64 = (options.options && options.options.decodeBase64) || false; + this.lock = new RedFour({ + redis: options.redis, + namespace: 'wildduck' + }); + this.gridfs = options.gridfs; this.gridstore = new GridFSBucket(this.gridfs, { bucketName: this.bucketName, @@ -114,6 +122,34 @@ class GridstoreStorage { } } + let instance = crypto.randomBytes(8).toString('hex'); + let lockId = 'gs.' + hash.toString('base64'); + let storeLock; + + let attachmentCallback = (...args) => { + if (storeLock) { + log.silly('GridStore', '[%s] UNLOCK lock=%s status=%s', instance, lockId, storeLock.success ? 'locked' : 'empty'); + if (storeLock.success) { + this.lock.releaseLock(storeLock, () => { + if (returned) { + // might be already finished if retrying after delay + return; + } + callback(...args); + }); + // unset variable to prevent double releasing + storeLock = false; + return; + } + storeLock = false; + } + if (returned) { + // might be already finished if retrying after delay + return; + } + callback(...args); + }; + let tryCount = 0; let tryStore = () => { if (returned) { @@ -136,96 +172,122 @@ class GridstoreStorage { }, (err, result) => { if (err) { - return callback(err); + return attachmentCallback(err); } if (result && result.value) { // already exists - return callback(null, result.value._id); + return attachmentCallback(null, result.value._id); } - // try to insert it - let store = this.gridstore.openUploadStreamWithId(id, null, { - contentType: attachment.contentType, - metadata - }); - - store.once('error', err => { - if (returned) { - return; + let checkLock = done => { + if (storeLock) { + // continue processing, we have a lock + return done(); } - if (err.code === 11000) { - // most probably a race condition, try again - if (tryCount++ < 5) { - if (/attachments\.chunks /.test(err.message)) { - // partial chunks detected. might be because of: - // * another process is inserting the same attachment and thus no "files" entry yet - // * previously deleted attachment that has not been properly removed - // load data for an existing chunk to see the age of it - return this.gridfs.collection(this.bucketName + '.chunks').findOne( - { - files_id: hash - }, - { - projection: { - _id: true - } - }, - (err, data) => { - if (err) { - // whatever - return setTimeout(tryStore, 100 + 200 * Math.random()); - } + if (attachment.body.length < 255 * 1024) { + // a single chunk attachment, no need for locking + return done(); + } - if (!data || !data._id) { - // try again, no chunks found - return setTimeout(tryStore, 10); - } - - // check how old is the previous chunk - let timestamp = data._id.getTimestamp(); - if (timestamp && typeof timestamp.getTime === 'function' && timestamp.getTime() >= Date.now() - 15 * 60 * 1000) { - // chunk is newer than 15 minutes, assume race condition and try again after a while - return setTimeout(tryStore, 300 + 200 * Math.random()); - } - - // partial chunks for a probably deleted message detected, try to clean up - setTimeout(() => { - if (returned) { - return; - } - this.cleanupGarbage(id, tryStore); - }, 100 + 200 * Math.random()); - } - ); + // Try to get a lock + // Using locks is required to prevent multiple messages storing the same large attachment at + // the same time. + // NB! Setting lock ttl too high has a downside that restarting the process would still keep + // the lock and thus anyone trying to store the message would have to wait + this.lock.waitAcquireLock(lockId, 2 * 60 * 1000 /* Lock expires after 3min if not released */, false, (err, lock) => { + if (err) { + if (returned) { + return; } - return setTimeout(tryStore, 10); + returned = true; + return attachmentCallback(err); } - } - returned = true; - callback(err); - }); - store.once('finish', () => { - if (returned) { - return; - } - returned = true; - return callback(null, id); - }); - - if (!metadata.decoded) { - store.end(attachment.body); - } else { - let decoder = new libbase64.Decoder(); - decoder.pipe(store); - decoder.once('error', err => { - // pass error forward - store.emit('error', err); + storeLock = lock; + log.silly('GridStore', '[%s] LOCK lock=%s status=%s', instance, lockId, storeLock.success ? 'locked' : 'empty'); + return tryStore(); // start from over }); - decoder.end(attachment.body); - } + }; + + checkLock(() => { + // try to insert it + let store = this.gridstore.openUploadStreamWithId(id, null, { + contentType: attachment.contentType, + metadata + }); + + store.once('error', err => { + if (returned) { + return; + } + if (err.code === 11000) { + // most probably a race condition, try again + if (tryCount++ < 5) { + if (/\.chunks /.test(err.message)) { + // Partial chunks detected. Might be because of: + // * another process is inserting the same attachment and thus no "files" entry yet (should not happend though due to locking) + // * previously deleted attachment that has not been properly removed + + // Load data for an existing chunk to see the age of it + return this.gridfs.collection(this.bucketName + '.chunks').findOne( + { + files_id: hash + }, + { + projection: { + _id: true + } + }, + (err, data) => { + if (err) { + // whatever + return setTimeout(tryStore, 100 + 200 * Math.random()); + } + + if (!data || !data._id) { + // try again, no chunks found + return setTimeout(tryStore, 10); + } + + // Check how old is the previous chunk + let timestamp = data._id.getTimestamp(); + if (timestamp && typeof timestamp.getTime === 'function' && timestamp.getTime() >= Date.now() - 5 * 60 * 1000) { + // chunk is newer than 5 minutes, assume race condition and try again after a while + return setTimeout(tryStore, 300 + 200 * Math.random()); + } + + // partial chunks for a probably deleted message detected, try to clean up + setTimeout(() => { + if (returned) { + return; + } + this.cleanupGarbage(id, tryStore); + }, 100 + 200 * Math.random()); + } + ); + } + return setTimeout(tryStore, 10); + } + } + attachmentCallback(err); + }); + + store.once('finish', () => attachmentCallback(null, id)); + + if (!metadata.decoded) { + store.end(attachment.body); + } else { + let decoder = new libbase64.Decoder(); + decoder.pipe(store); + decoder.once('error', err => { + // pass error forward + store.emit('error', err); + }); + decoder.end(attachment.body); + } + }); } ); }; diff --git a/lib/message-handler.js b/lib/message-handler.js index 291ac38e..e132e0a9 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -29,7 +29,8 @@ class MessageHandler { options.attachmentStorage || new AttachmentStorage({ gridfs: options.gridfs || options.database, - options: options.attachments + options: options.attachments, + redis: this.redis }); this.indexer = new Indexer({ diff --git a/package.json b/package.json index a8259581..c35b6932 100644 --- a/package.json +++ b/package.json @@ -15,13 +15,13 @@ "author": "Andris Reinman", "license": "EUPL-1.1+", "devDependencies": { - "ajv": "6.6.2", + "ajv": "6.7.0", "apidoc": "0.17.7", "browserbox": "0.9.1", "chai": "4.2.0", "eslint": "5.12.0", "eslint-config-nodemailer": "1.2.0", - "eslint-config-prettier": "3.3.0", + "eslint-config-prettier": "3.5.0", "grunt": "1.0.3", "grunt-cli": "1.3.2", "grunt-eslint": "21.0.0", @@ -56,15 +56,15 @@ "mailsplit": "4.2.4", "mobileconfig": "2.1.0", "mongo-cursor-pagination": "7.1.0", - "mongodb": "3.1.10", + "mongodb": "3.1.12", "mongodb-extended-json": "1.10.1", "node-forge": "0.7.6", "nodemailer": "5.1.1", "npmlog": "4.1.2", - "openpgp": "4.4.3", + "openpgp": "4.4.5", "pem": "1.13.2", "pwnedpasswords": "1.0.4", - "qrcode": "1.3.2", + "qrcode": "1.3.3", "restify": "7.5.0", "restify-logger": "2.0.1", "seq-index": "1.1.0",