From 177fbb7d6054ad864eaf35a9513fcd9f9197fd91 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Mon, 7 Aug 2017 11:29:29 +0300 Subject: [PATCH] allow using other storage mechanisms for attachments than gridstore --- README.md | 2 +- api.js | 3 +- config/default.toml | 4 + imap.js | 61 +------- indexes.yaml | 6 + lib/attachment-storage.js | 147 +++--------------- lib/attachments/gridstore-storage.js | 217 +++++++++++++++++++++++++++ lib/message-handler.js | 3 +- lmtp.js | 5 +- pop3.js | 3 +- 10 files changed, 260 insertions(+), 191 deletions(-) create mode 100644 lib/attachments/gridstore-storage.js diff --git a/README.md b/README.md index dea4f288..b914fe28 100644 --- a/README.md +++ b/README.md @@ -193,7 +193,7 @@ Shard the following collections by these keys: sh.enableSharding('wildduck'); sh.shardCollection('wildduck.messages', { mailbox: 1, uid: 1 }); sh.shardCollection('wildduck.threads', { user: 'hashed' }); -sh.shardCollection('wildduck.attachments.files', { 'metadata.h': 'hashed' }); +sh.shardCollection('wildduck.attachments.files', { _id: 'hashed' }); sh.shardCollection('wildduck.attachments.chunks', { files_id: 'hashed' }); ``` diff --git a/api.js b/api.js index cf6a5bad..ac4b211f 100644 --- a/api.js +++ b/api.js @@ -104,8 +104,9 @@ module.exports = done => { messageHandler = new MessageHandler({ database: db.database, users: db.users, + redis: db.redis, gridfs: db.gridfs, - redis: db.redis + attachments: config.attachments }); userHandler = new UserHandler({ diff --git a/config/default.toml b/config/default.toml index bb46d115..92bac061 100644 --- a/config/default.toml +++ b/config/default.toml @@ -52,6 +52,10 @@ maxForwards=2000 # used to push outbound emails to the sending queue #sender="zone-mta" +[attachments] + type="gridstore" + bucket="attachments" + [log] level="silly" # log to syslog if true diff --git a/imap.js b/imap.js index 49286418..490071aa 100644 --- a/imap.js +++ b/imap.js @@ -84,60 +84,6 @@ let mailboxHandler; let gcTimeout; let gcLock; -function deleteOrphanedAttachments(callback) { - // NB! scattered query - let cursor = db.gridfs.collection('attachments.files').find({ - 'metadata.c': 0, - 'metadata.m': 0 - }); - - let deleted = 0; - let processNext = () => { - cursor.next((err, attachment) => { - if (err) { - return callback(err); - } - if (!attachment) { - return cursor.close(() => { - // delete all attachments that do not have any active links to message objects - callback(null, deleted); - }); - } - - if (!attachment || (attachment.metadata && attachment.metadata.c)) { - // skip - return processNext(); - } - - // delete file entry first - db.gridfs.collection('attachments.files').deleteOne({ - _id: attachment._id, - // make sure that we do not delete a message that is already re-used - 'metadata.c': 0, - 'metadata.m': 0 - }, (err, result) => { - if (err || !result.deletedCount) { - return processNext(); - } - - // delete data chunks - db.gridfs.collection('attachments.chunks').deleteMany({ - files_id: attachment._id - }, err => { - if (err) { - // ignore as we don't really care if we have orphans or not - } - - deleted++; - processNext(); - }); - }); - }); - }; - - processNext(); -} - function clearExpiredMessages() { clearTimeout(gcTimeout); let startTime = Date.now(); @@ -181,7 +127,7 @@ function clearExpiredMessages() { if (config.imap.disableRetention) { // delete all attachments that do not have any active links to message objects - return deleteOrphanedAttachments(() => done(null, true)); + return messageHandler.attachmentStorage.deleteOrphaned(() => done(null, true)); } // find and delete all messages that are expired @@ -208,7 +154,7 @@ function clearExpiredMessages() { let clear = () => cursor.close(() => { // delete all attachments that do not have any active links to message objects - deleteOrphanedAttachments(() => { + messageHandler.attachmentStorage.deleteOrphaned(() => { if (deleted) { server.logger.debug( { @@ -297,8 +243,9 @@ module.exports = done => { messageHandler = new MessageHandler({ database: db.database, + redis: db.redis, gridfs: db.gridfs, - redis: db.redis + attachments: config.attachments }); userHandler = new UserHandler({ diff --git a/indexes.yaml b/indexes.yaml index 4cb87df8..908b5cf2 100644 --- a/indexes.yaml +++ b/indexes.yaml @@ -253,6 +253,12 @@ indexes: # attachments.files collection should be sharded by _id (hash) # attachments.chunks collection should be sharded by files_id (hash) +- collection: attachments.files + type: gridfs # index applies to gridfs database + index: + name: attachment_id_hashed + key: + _id: hashed - collection: attachments.files type: gridfs # index applies to gridfs database index: diff --git a/lib/attachment-storage.js b/lib/attachment-storage.js index 0817c72b..123c9964 100644 --- a/lib/attachment-storage.js +++ b/lib/attachment-storage.js @@ -1,8 +1,7 @@ 'use strict'; -const ObjectID = require('mongodb').ObjectID; +const GridstoreStorage = require('./attachments/gridstore-storage.js'); const crypto = require('crypto'); -const GridFSBucket = require('mongodb').GridFSBucket; let cryptoAsync; try { cryptoAsync = require('@ronomon/crypto-async'); // eslint-disable-line global-require @@ -12,30 +11,20 @@ try { class AttachmentStorage { constructor(options) { - this.bucketName = options.bucket || 'attachments'; - this.gridfs = options.gridfs; - this.gridstore = new GridFSBucket(this.gridfs, { - bucketName: this.bucketName - }); + this.options = options || {}; + + let type = (options.options && options.options.type) || 'gridstore'; + + switch (type) { + case 'gridstore': + default: + this.storage = new GridstoreStorage(this.options); + break; + } } get(attachmentId, callback) { - this.gridfs.collection('attachments.files').findOne({ - _id: attachmentId - }, (err, attachmentData) => { - if (err) { - return callback(err); - } - if (!attachmentData) { - return callback(new Error('This attachment does not exist')); - } - - return callback(null, { - contentType: attachmentData.contentType, - transferEncoding: attachmentData.metadata.transferEncoding, - metadata: attachmentData.metadata - }); - }); + return this.storage.get(attachmentId, callback); } create(attachment, callback) { @@ -43,68 +32,12 @@ class AttachmentStorage { if (err) { return callback(err); } - - this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({ - 'metadata.h': hash - }, { - $inc: { - 'metadata.c': 1, - 'metadata.m': attachment.magic - } - }, { - returnOriginal: false - }, (err, result) => { - if (err) { - return callback(err); - } - - if (result && result.value) { - return callback(null, result.value._id); - } - - let returned = false; - - let id = new ObjectID(); - let metadata = { - h: hash, - m: attachment.magic, - c: 1, - transferEncoding: attachment.transferEncoding - }; - Object.keys(attachment.metadata || {}).forEach(key => { - if (!(key in attachment.metadata)) { - metadata[key] = attachment.metadata[key]; - } - }); - - let store = this.gridstore.openUploadStreamWithId(id, null, { - contentType: attachment.contentType, - metadata - }); - - store.once('error', err => { - if (returned) { - return; - } - returned = true; - callback(err); - }); - - store.once('finish', () => { - if (returned) { - return; - } - returned = true; - return callback(null, id); - }); - - store.end(attachment.body); - }); + return this.storage.create(attachment, hash, callback); }); } createReadStream(id) { - return this.gridstore.openDownloadStream(id); + return this.storage.createReadStream(id); } deleteMany(ids, magic, callback) { @@ -120,57 +53,15 @@ class AttachmentStorage { } updateMany(ids, count, magic, callback) { - // update attachments - this.gridfs.collection(this.bucketName + '.files').updateMany( - { - _id: { - $in: ids - } - }, - { - $inc: { - 'metadata.c': count, - 'metadata.m': magic - } - }, - { - multi: true, - w: 1 - }, - callback - ); + this.storage.update(ids, count, magic, callback); } delete(id, magic, callback) { - this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({ - _id: id - }, { - $inc: { - 'metadata.c': -1, - 'metadata.m': -magic - } - }, { - returnOriginal: false - }, (err, result) => { - if (err) { - return callback(err); - } + this.storage.delete(id, magic, callback); + } - if (!result || !result.value) { - return callback(null, false); - } - - if (result.value.metadata.c === 0 && result.value.metadata.m === 0) { - return this.gridstore.delete(id, err => { - if (err) { - return callback(err); - } - callback(null, 1); - }); - } - - return callback(null, 0); - }); + deleteOrphaned(callback) { + this.storage.deleteOrphaned(callback); } calculateHash(input, callback) { diff --git a/lib/attachments/gridstore-storage.js b/lib/attachments/gridstore-storage.js new file mode 100644 index 00000000..03aeb778 --- /dev/null +++ b/lib/attachments/gridstore-storage.js @@ -0,0 +1,217 @@ +'use strict'; + +const ObjectID = require('mongodb').ObjectID; +const GridFSBucket = require('mongodb').GridFSBucket; + +class GridstoreStorage { + constructor(options) { + this.bucketName = (options.options && options.options.bucket) || 'attachments'; + this.gridfs = options.gridfs; + this.gridstore = new GridFSBucket(this.gridfs, { + bucketName: this.bucketName + }); + } + + get(attachmentId, callback) { + this.gridfs.collection(this.bucketName + '.files').findOne({ + _id: attachmentId + }, (err, attachmentData) => { + if (err) { + return callback(err); + } + if (!attachmentData) { + return callback(new Error('This attachment does not exist')); + } + + return callback(null, { + contentType: attachmentData.contentType, + transferEncoding: attachmentData.metadata.transferEncoding, + length: attachmentData.length, + count: attachmentData.metadata.c, + hash: attachmentData.metadata.h, + metadata: attachmentData.metadata + }); + }); + } + + create(attachment, hash, callback) { + this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({ + 'metadata.h': hash + }, { + $inc: { + 'metadata.c': 1, + 'metadata.m': attachment.magic + } + }, { + returnOriginal: false + }, (err, result) => { + if (err) { + return callback(err); + } + + if (result && result.value) { + return callback(null, result.value._id); + } + + let returned = false; + + let id = new ObjectID(); + let metadata = { + h: hash, + m: attachment.magic, + c: 1, + transferEncoding: attachment.transferEncoding + }; + Object.keys(attachment.metadata || {}).forEach(key => { + if (!(key in attachment.metadata)) { + metadata[key] = attachment.metadata[key]; + } + }); + + let store = this.gridstore.openUploadStreamWithId(id, null, { + contentType: attachment.contentType, + metadata + }); + + store.once('error', err => { + if (returned) { + return; + } + returned = true; + callback(err); + }); + + store.once('finish', () => { + if (returned) { + return; + } + returned = true; + return callback(null, id); + }); + + store.end(attachment.body); + }); + } + + createReadStream(id) { + return this.gridstore.openDownloadStream(id); + } + + delete(id, magic, callback) { + this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({ + _id: id + }, { + $inc: { + 'metadata.c': -1, + 'metadata.m': -magic + } + }, { + returnOriginal: false + }, (err, result) => { + if (err) { + return callback(err); + } + + if (!result || !result.value) { + return callback(null, false); + } + + /* + // disabled as it is preferred that attachments are not deleted immediately but + // after a while by a cleanup process. This gives the opportunity to reuse the + // attachment + + if (result.value.metadata.c === 0 && result.value.metadata.m === 0) { + return this.gridstore.delete(id, err => { + if (err) { + return callback(err); + } + callback(null, 1); + }); + } + */ + + return callback(null, true); + }); + } + + update(ids, count, magic, callback) { + // update attachments + this.gridfs.collection(this.bucketName + '.files').updateMany( + { + _id: Array.isArray(ids) + ? { + $in: ids + } + : ids + }, + { + $inc: { + 'metadata.c': count, + 'metadata.m': magic + } + }, + { + multi: true, + w: 1 + }, + callback + ); + } + + deleteOrphaned(callback) { + // NB! scattered query + let cursor = this.gridfs.collection(this.bucketName + '.files').find({ + 'metadata.c': 0, + 'metadata.m': 0 + }); + + let deleted = 0; + let processNext = () => { + cursor.next((err, attachment) => { + if (err) { + return callback(err); + } + if (!attachment) { + return cursor.close(() => { + // delete all attachments that do not have any active links to message objects + callback(null, deleted); + }); + } + + if (!attachment || (attachment.metadata && attachment.metadata.c)) { + // skip + return processNext(); + } + + // delete file entry first + this.gridfs.collection('attachments.files').deleteOne({ + _id: attachment._id, + // make sure that we do not delete a message that is already re-used + 'metadata.c': 0, + 'metadata.m': 0 + }, (err, result) => { + if (err || !result.deletedCount) { + return processNext(); + } + + // delete data chunks + this.gridfs.collection('attachments.chunks').deleteMany({ + files_id: attachment._id + }, err => { + if (err) { + // ignore as we don't really care if we have orphans or not + } + + deleted++; + processNext(); + }); + }); + }); + }; + + processNext(); + } +} + +module.exports = GridstoreStorage; diff --git a/lib/message-handler.js b/lib/message-handler.js index 84df5f41..f55d747f 100644 --- a/lib/message-handler.js +++ b/lib/message-handler.js @@ -26,7 +26,8 @@ class MessageHandler { this.attachmentStorage = options.attachmentStorage || new AttachmentStorage({ - gridfs: options.gridfs || options.database + gridfs: options.gridfs || options.database, + options: options.attachments }); this.indexer = new Indexer({ diff --git a/lmtp.js b/lmtp.js index 5f7e9ecb..733d3bc1 100644 --- a/lmtp.js +++ b/lmtp.js @@ -434,9 +434,10 @@ module.exports = done => { messageHandler = new MessageHandler({ database: db.database, - gridfs: db.gridfs, users: db.users, - redis: db.redis + redis: db.redis, + gridfs: db.gridfs, + attachments: config.attachments }); let started = false; diff --git a/pop3.js b/pop3.js index f7d760d5..2881c6b5 100644 --- a/pop3.js +++ b/pop3.js @@ -309,8 +309,9 @@ module.exports = done => { messageHandler = new MessageHandler({ database: db.database, + redis: db.redis, gridfs: db.gridfs, - redis: db.redis + attachments: config.attachments }); userHandler = new UserHandler({