Limit daily donwload size for IMAP

This commit is contained in:
Andris Reinman 2017-11-12 15:13:32 +02:00
parent b284477a45
commit 6f28f90569
5 changed files with 232 additions and 174 deletions

View file

@ -25,6 +25,13 @@ module.exports = redis => {
return {
ttlcounter(key, count, max, windowSize, callback) {
if (!max || isNaN(max)) {
return callback(null, {
success: true,
value: 0,
ttl: 0
});
}
redis.ttlcounter(key, count, max, windowSize || 86400, (err, res) => {
if (err) {
return callback(err);

View file

@ -1,11 +1,13 @@
'use strict';
const config = require('wild-config');
const IMAPServerModule = require('../../imap-core');
const imapHandler = IMAPServerModule.imapHandler;
const util = require('util');
const db = require('../db');
const tools = require('../tools');
const consts = require('../consts');
const LimitedFetch = require('../limited-fetch');
module.exports = (server, messageHandler) => (path, options, session, callback) => {
server.logger.debug(
@ -28,201 +30,219 @@ module.exports = (server, messageHandler) => (path, options, session, callback)
return callback(null, 'NONEXISTENT');
}
let projection = {
uid: true,
modseq: true,
idate: true,
flags: true,
envelope: true,
bodystructure: true,
size: true
};
if (!options.metadataOnly) {
projection.mimeTree = true;
}
let query = {
mailbox: mailboxData._id
};
if (options.changedSince) {
query = {
mailbox: mailboxData._id,
modseq: {
$gt: options.changedSince
}
};
}
let queryAll = false;
if (options.messages.length !== session.selected.uidList.length) {
// do not use uid selector for 1:*
query.uid = tools.checkRangeQuery(options.messages);
} else {
// 1:*
queryAll = true;
// uid is part of the sharding key so we need it somehow represented in the query
query.uid = {
$gt: 0,
$lt: mailboxData.uidNext
};
}
let isUpdated = false;
let updateEntries = [];
let notifyEntries = [];
let done = (...args) => {
if (updateEntries.length) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, () => {
updateEntries = [];
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
return callback(...args);
});
});
messageHandler.counters.ttlcounter('idw:' + session.user.id, 0, config.imap.maxDownloadMB * 1024 * 1024, false, (err, res) => {
if (err) {
return callback(err);
}
if (isUpdated) {
server.notifier.fire(session.user.id, path);
if (!res.success) {
let err = new Error('Download was rate limited. Check again in ' + res.ttl + ' seconds');
err.response = 'NO';
return callback(err);
}
return callback(...args);
};
let cursor = db.database
.collection('messages')
.find(query)
.project(projection)
.sort([['uid', 1]]);
let projection = {
uid: true,
modseq: true,
idate: true,
flags: true,
envelope: true,
bodystructure: true,
size: true
};
let rowCount = 0;
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(() => {
done(null, true);
if (!options.metadataOnly) {
projection.mimeTree = true;
}
let query = {
mailbox: mailboxData._id
};
if (options.changedSince) {
query = {
mailbox: mailboxData._id,
modseq: {
$gt: options.changedSince
}
};
}
let queryAll = false;
if (options.messages.length !== session.selected.uidList.length) {
// do not use uid selector for 1:*
query.uid = tools.checkRangeQuery(options.messages);
} else {
// 1:*
queryAll = true;
// uid is part of the sharding key so we need it somehow represented in the query
query.uid = {
$gt: 0,
$lt: mailboxData.uidNext
};
}
let isUpdated = false;
let updateEntries = [];
let notifyEntries = [];
let done = (...args) => {
if (updateEntries.length) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, () => {
updateEntries = [];
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
return callback(...args);
});
});
}
if (queryAll && !session.selected.uidList.includes(message.uid)) {
// skip processing messages that we do not know about yet
return processNext();
if (isUpdated) {
server.notifier.fire(session.user.id, path);
}
return callback(...args);
};
let markAsSeen = options.markAsSeen && !message.flags.includes('\\Seen');
if (markAsSeen) {
message.flags.unshift('\\Seen');
}
let cursor = db.database
.collection('messages')
.find(query)
.project(projection)
.sort([['uid', 1]]);
let stream = imapHandler.compileStream(
session.formatResponse('FETCH', message.uid, {
query: options.query,
values: session.getQueryResponse(options.query, message, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
})
);
let rowCount = 0;
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(() => {
done(null, true);
});
}
stream.description = util.format('* FETCH #%s uid=%s size=%sB ', ++rowCount, message.uid, message.size);
stream.once('error', err => {
err.processed = true;
server.logger.error(
{
err,
tnx: 'fetch',
cid: session.id
},
'[%s] FETCHFAIL %s. %s',
session.id,
message._id,
err.message
);
session.socket.end('\n* BYE Internal Server Error\n');
return cursor.close(() => done());
});
// send formatted response to socket
session.writeStream.write(stream, () => {
if (!markAsSeen) {
if (queryAll && !session.selected.uidList.includes(message.uid)) {
// skip processing messages that we do not know about yet
return processNext();
}
server.logger.debug(
{
tnx: 'flags',
cid: session.id
},
'[%s] UPDATE FLAGS for "%s"',
session.id,
message.uid
let markAsSeen = options.markAsSeen && !message.flags.includes('\\Seen');
if (markAsSeen) {
message.flags.unshift('\\Seen');
}
let stream = imapHandler.compileStream(
session.formatResponse('FETCH', message.uid, {
query: options.query,
values: session.getQueryResponse(options.query, message, {
logger: server.logger,
fetchOptions: {},
database: db.database,
attachmentStorage: messageHandler.attachmentStorage,
acceptUTF8Enabled: session.isUTF8Enabled()
})
})
);
isUpdated = true;
stream.description = util.format('* FETCH #%s uid=%s size=%sB ', ++rowCount, message.uid, message.size);
updateEntries.push({
updateOne: {
filter: {
_id: message._id,
// include sharding key in query
mailbox: mailboxData._id,
uid: message.uid
stream.once('error', err => {
err.processed = true;
server.logger.error(
{
err,
tnx: 'fetch',
cid: session.id
},
update: {
$addToSet: {
flags: '\\Seen'
'[%s] FETCHFAIL %s. %s',
session.id,
message._id,
err.message
);
session.socket.end('\n* BYE Internal Server Error\n');
return cursor.close(() => done());
});
let limiter = new LimitedFetch({
ttlcounter: messageHandler.counters.ttlcounter,
maxBytes: config.imap.maxDownloadMB * 1024 * 1024,
user: session.user.id
});
stream.pipe(limiter);
// send formatted response to socket
session.writeStream.write(limiter, () => {
if (!markAsSeen) {
return processNext();
}
server.logger.debug(
{
tnx: 'flags',
cid: session.id
},
'[%s] UPDATE FLAGS for "%s"',
session.id,
message.uid
);
isUpdated = true;
updateEntries.push({
updateOne: {
filter: {
_id: message._id,
// include sharding key in query
mailbox: mailboxData._id,
uid: message.uid
},
$set: {
unseen: false
update: {
$addToSet: {
flags: '\\Seen'
},
$set: {
unseen: false
}
}
}
});
notifyEntries.push({
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags,
message: message._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, err => {
updateEntries = [];
if (err) {
return cursor.close(() => done(err));
}
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
processNext();
});
});
} else {
processNext();
}
});
notifyEntries.push({
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags,
message: message._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, err => {
updateEntries = [];
if (err) {
return cursor.close(() => done(err));
}
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
processNext();
});
});
} else {
processNext();
}
});
});
};
};
processNext();
processNext();
});
});
};

26
lib/limited-fetch.js Normal file
View file

@ -0,0 +1,26 @@
'use strict';
const Transform = require('stream').Transform;
class LimitedFetch extends Transform {
constructor(options) {
super();
this.options = options || {};
this.bytes = 0;
}
_transform(chunk, encoding, done) {
this.bytes += chunk.length;
this.push(chunk);
done();
}
_flush(done) {
if (!this.options.maxBytes) {
return done();
}
this.options.ttlcounter('idw:' + this.options.user, this.bytes, this.options.maxBytes, false, () => done());
}
}
module.exports = LimitedFetch;

View file

@ -333,6 +333,11 @@ class MessageHandler {
return rollback(err);
}
let logTime = messageData.meta.time || new Date();
if (typeof logTime === 'number') {
logTime = new Date(logTime);
}
this.database.collection('messagelog').insertOne({
id: messageData.meta.queueId || messageData._id.toString(),
action: 'STORE',
@ -343,7 +348,7 @@ class MessageHandler {
from: messageData.meta.from,
to: messageData.meta.to,
transtype: messageData.meta.transtype,
created: messageData.meta.time || new Date()
created: logTime
}, () => {
let uidValidity = mailboxData.uidValidity;
let uid = messageData.uid;

View file

@ -766,7 +766,7 @@ class UserHandler {
[mailboxQueryKey]: mailboxQueryValue,
meta: {
source: 'AUTO',
time: Date.now()
time: new Date()
},
flags,
raw: message