allow using other storage mechanisms for attachments than gridstore

This commit is contained in:
Andris Reinman 2017-08-07 11:29:29 +03:00
parent 4138bf2a2f
commit 177fbb7d60
10 changed files with 260 additions and 191 deletions

View file

@ -193,7 +193,7 @@ Shard the following collections by these keys:
sh.enableSharding('wildduck');
sh.shardCollection('wildduck.messages', { mailbox: 1, uid: 1 });
sh.shardCollection('wildduck.threads', { user: 'hashed' });
sh.shardCollection('wildduck.attachments.files', { 'metadata.h': 'hashed' });
sh.shardCollection('wildduck.attachments.files', { _id: 'hashed' });
sh.shardCollection('wildduck.attachments.chunks', { files_id: 'hashed' });
```

3
api.js
View file

@ -104,8 +104,9 @@ module.exports = done => {
messageHandler = new MessageHandler({
database: db.database,
users: db.users,
redis: db.redis,
gridfs: db.gridfs,
redis: db.redis
attachments: config.attachments
});
userHandler = new UserHandler({

View file

@ -52,6 +52,10 @@ maxForwards=2000
# used to push outbound emails to the sending queue
#sender="zone-mta"
[attachments]
type="gridstore"
bucket="attachments"
[log]
level="silly"
# log to syslog if true

61
imap.js
View file

@ -84,60 +84,6 @@ let mailboxHandler;
let gcTimeout;
let gcLock;
function deleteOrphanedAttachments(callback) {
// NB! scattered query
let cursor = db.gridfs.collection('attachments.files').find({
'metadata.c': 0,
'metadata.m': 0
});
let deleted = 0;
let processNext = () => {
cursor.next((err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
callback(null, deleted);
});
}
if (!attachment || (attachment.metadata && attachment.metadata.c)) {
// skip
return processNext();
}
// delete file entry first
db.gridfs.collection('attachments.files').deleteOne({
_id: attachment._id,
// make sure that we do not delete a message that is already re-used
'metadata.c': 0,
'metadata.m': 0
}, (err, result) => {
if (err || !result.deletedCount) {
return processNext();
}
// delete data chunks
db.gridfs.collection('attachments.chunks').deleteMany({
files_id: attachment._id
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleted++;
processNext();
});
});
});
};
processNext();
}
function clearExpiredMessages() {
clearTimeout(gcTimeout);
let startTime = Date.now();
@ -181,7 +127,7 @@ function clearExpiredMessages() {
if (config.imap.disableRetention) {
// delete all attachments that do not have any active links to message objects
return deleteOrphanedAttachments(() => done(null, true));
return messageHandler.attachmentStorage.deleteOrphaned(() => done(null, true));
}
// find and delete all messages that are expired
@ -208,7 +154,7 @@ function clearExpiredMessages() {
let clear = () =>
cursor.close(() => {
// delete all attachments that do not have any active links to message objects
deleteOrphanedAttachments(() => {
messageHandler.attachmentStorage.deleteOrphaned(() => {
if (deleted) {
server.logger.debug(
{
@ -297,8 +243,9 @@ module.exports = done => {
messageHandler = new MessageHandler({
database: db.database,
redis: db.redis,
gridfs: db.gridfs,
redis: db.redis
attachments: config.attachments
});
userHandler = new UserHandler({

View file

@ -253,6 +253,12 @@ indexes:
# attachments.files collection should be sharded by _id (hash)
# attachments.chunks collection should be sharded by files_id (hash)
- collection: attachments.files
type: gridfs # index applies to gridfs database
index:
name: attachment_id_hashed
key:
_id: hashed
- collection: attachments.files
type: gridfs # index applies to gridfs database
index:

View file

@ -1,8 +1,7 @@
'use strict';
const ObjectID = require('mongodb').ObjectID;
const GridstoreStorage = require('./attachments/gridstore-storage.js');
const crypto = require('crypto');
const GridFSBucket = require('mongodb').GridFSBucket;
let cryptoAsync;
try {
cryptoAsync = require('@ronomon/crypto-async'); // eslint-disable-line global-require
@ -12,30 +11,20 @@ try {
class AttachmentStorage {
constructor(options) {
this.bucketName = options.bucket || 'attachments';
this.gridfs = options.gridfs;
this.gridstore = new GridFSBucket(this.gridfs, {
bucketName: this.bucketName
});
this.options = options || {};
let type = (options.options && options.options.type) || 'gridstore';
switch (type) {
case 'gridstore':
default:
this.storage = new GridstoreStorage(this.options);
break;
}
}
get(attachmentId, callback) {
this.gridfs.collection('attachments.files').findOne({
_id: attachmentId
}, (err, attachmentData) => {
if (err) {
return callback(err);
}
if (!attachmentData) {
return callback(new Error('This attachment does not exist'));
}
return callback(null, {
contentType: attachmentData.contentType,
transferEncoding: attachmentData.metadata.transferEncoding,
metadata: attachmentData.metadata
});
});
return this.storage.get(attachmentId, callback);
}
create(attachment, callback) {
@ -43,68 +32,12 @@ class AttachmentStorage {
if (err) {
return callback(err);
}
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
'metadata.h': hash
}, {
$inc: {
'metadata.c': 1,
'metadata.m': attachment.magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (result && result.value) {
return callback(null, result.value._id);
}
let returned = false;
let id = new ObjectID();
let metadata = {
h: hash,
m: attachment.magic,
c: 1,
transferEncoding: attachment.transferEncoding
};
Object.keys(attachment.metadata || {}).forEach(key => {
if (!(key in attachment.metadata)) {
metadata[key] = attachment.metadata[key];
}
});
let store = this.gridstore.openUploadStreamWithId(id, null, {
contentType: attachment.contentType,
metadata
});
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
store.end(attachment.body);
});
return this.storage.create(attachment, hash, callback);
});
}
createReadStream(id) {
return this.gridstore.openDownloadStream(id);
return this.storage.createReadStream(id);
}
deleteMany(ids, magic, callback) {
@ -120,57 +53,15 @@ class AttachmentStorage {
}
updateMany(ids, count, magic, callback) {
// update attachments
this.gridfs.collection(this.bucketName + '.files').updateMany(
{
_id: {
$in: ids
}
},
{
$inc: {
'metadata.c': count,
'metadata.m': magic
}
},
{
multi: true,
w: 1
},
callback
);
this.storage.update(ids, count, magic, callback);
}
delete(id, magic, callback) {
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
_id: id
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
this.storage.delete(id, magic, callback);
}
if (!result || !result.value) {
return callback(null, false);
}
if (result.value.metadata.c === 0 && result.value.metadata.m === 0) {
return this.gridstore.delete(id, err => {
if (err) {
return callback(err);
}
callback(null, 1);
});
}
return callback(null, 0);
});
deleteOrphaned(callback) {
this.storage.deleteOrphaned(callback);
}
calculateHash(input, callback) {

View file

@ -0,0 +1,217 @@
'use strict';
const ObjectID = require('mongodb').ObjectID;
const GridFSBucket = require('mongodb').GridFSBucket;
class GridstoreStorage {
constructor(options) {
this.bucketName = (options.options && options.options.bucket) || 'attachments';
this.gridfs = options.gridfs;
this.gridstore = new GridFSBucket(this.gridfs, {
bucketName: this.bucketName
});
}
get(attachmentId, callback) {
this.gridfs.collection(this.bucketName + '.files').findOne({
_id: attachmentId
}, (err, attachmentData) => {
if (err) {
return callback(err);
}
if (!attachmentData) {
return callback(new Error('This attachment does not exist'));
}
return callback(null, {
contentType: attachmentData.contentType,
transferEncoding: attachmentData.metadata.transferEncoding,
length: attachmentData.length,
count: attachmentData.metadata.c,
hash: attachmentData.metadata.h,
metadata: attachmentData.metadata
});
});
}
create(attachment, hash, callback) {
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
'metadata.h': hash
}, {
$inc: {
'metadata.c': 1,
'metadata.m': attachment.magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (result && result.value) {
return callback(null, result.value._id);
}
let returned = false;
let id = new ObjectID();
let metadata = {
h: hash,
m: attachment.magic,
c: 1,
transferEncoding: attachment.transferEncoding
};
Object.keys(attachment.metadata || {}).forEach(key => {
if (!(key in attachment.metadata)) {
metadata[key] = attachment.metadata[key];
}
});
let store = this.gridstore.openUploadStreamWithId(id, null, {
contentType: attachment.contentType,
metadata
});
store.once('error', err => {
if (returned) {
return;
}
returned = true;
callback(err);
});
store.once('finish', () => {
if (returned) {
return;
}
returned = true;
return callback(null, id);
});
store.end(attachment.body);
});
}
createReadStream(id) {
return this.gridstore.openDownloadStream(id);
}
delete(id, magic, callback) {
this.gridfs.collection(this.bucketName + '.files').findOneAndUpdate({
_id: id
}, {
$inc: {
'metadata.c': -1,
'metadata.m': -magic
}
}, {
returnOriginal: false
}, (err, result) => {
if (err) {
return callback(err);
}
if (!result || !result.value) {
return callback(null, false);
}
/*
// disabled as it is preferred that attachments are not deleted immediately but
// after a while by a cleanup process. This gives the opportunity to reuse the
// attachment
if (result.value.metadata.c === 0 && result.value.metadata.m === 0) {
return this.gridstore.delete(id, err => {
if (err) {
return callback(err);
}
callback(null, 1);
});
}
*/
return callback(null, true);
});
}
update(ids, count, magic, callback) {
// update attachments
this.gridfs.collection(this.bucketName + '.files').updateMany(
{
_id: Array.isArray(ids)
? {
$in: ids
}
: ids
},
{
$inc: {
'metadata.c': count,
'metadata.m': magic
}
},
{
multi: true,
w: 1
},
callback
);
}
deleteOrphaned(callback) {
// NB! scattered query
let cursor = this.gridfs.collection(this.bucketName + '.files').find({
'metadata.c': 0,
'metadata.m': 0
});
let deleted = 0;
let processNext = () => {
cursor.next((err, attachment) => {
if (err) {
return callback(err);
}
if (!attachment) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
callback(null, deleted);
});
}
if (!attachment || (attachment.metadata && attachment.metadata.c)) {
// skip
return processNext();
}
// delete file entry first
this.gridfs.collection('attachments.files').deleteOne({
_id: attachment._id,
// make sure that we do not delete a message that is already re-used
'metadata.c': 0,
'metadata.m': 0
}, (err, result) => {
if (err || !result.deletedCount) {
return processNext();
}
// delete data chunks
this.gridfs.collection('attachments.chunks').deleteMany({
files_id: attachment._id
}, err => {
if (err) {
// ignore as we don't really care if we have orphans or not
}
deleted++;
processNext();
});
});
});
};
processNext();
}
}
module.exports = GridstoreStorage;

View file

@ -26,7 +26,8 @@ class MessageHandler {
this.attachmentStorage =
options.attachmentStorage ||
new AttachmentStorage({
gridfs: options.gridfs || options.database
gridfs: options.gridfs || options.database,
options: options.attachments
});
this.indexer = new Indexer({

View file

@ -434,9 +434,10 @@ module.exports = done => {
messageHandler = new MessageHandler({
database: db.database,
gridfs: db.gridfs,
users: db.users,
redis: db.redis
redis: db.redis,
gridfs: db.gridfs,
attachments: config.attachments
});
let started = false;

View file

@ -309,8 +309,9 @@ module.exports = done => {
messageHandler = new MessageHandler({
database: db.database,
redis: db.redis,
gridfs: db.gridfs,
redis: db.redis
attachments: config.attachments
});
userHandler = new UserHandler({