wildduck/lib/handlers/on-fetch.js
2017-07-20 16:10:36 +03:00

207 lines
6.9 KiB
JavaScript

'use strict';
const IMAPServerModule = require('../../imap-core');
const imapHandler = IMAPServerModule.imapHandler;
const util = require('util');
const db = require('../db');
const tools = require('../tools');
const consts = require('../consts');
module.exports = server => (path, options, session, callback) => {
server.logger.debug(
{
tnx: 'fetch',
cid: session.id
},
'[%s] Requested FETCH for "%s"',
session.id,
path
);
db.database.collection('mailboxes').findOne({
user: session.user.id,
path
}, (err, mailbox) => {
if (err) {
return callback(err);
}
if (!mailbox) {
return callback(null, 'NONEXISTENT');
}
let projection = {
uid: true,
modseq: true,
idate: true,
flags: true,
envelope: true,
bodystructure: true,
size: true
};
if (!options.metadataOnly) {
projection.mimeTree = true;
}
let query = {
mailbox: mailbox._id
};
if (options.changedSince) {
query = {
mailbox: mailbox._id,
modseq: {
$gt: options.changedSince
}
};
}
let queryAll = false;
if (options.messages.length !== session.selected.uidList.length) {
// do not use uid selector for 1:*
query.uid = tools.checkRangeQuery(options.messages);
} else {
// 1:*
queryAll = true;
}
let isUpdated = false;
let updateEntries = [];
let notifyEntries = [];
let done = (...args) => {
if (updateEntries.length) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, () => {
updateEntries = [];
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
return callback(...args);
});
});
}
if (isUpdated) {
server.notifier.fire(session.user.id, path);
}
return callback(...args);
};
let cursor = db.database.collection('messages').find(query).project(projection).sort([['uid', 1]]);
let rowCount = 0;
let processNext = () => {
cursor.next((err, message) => {
if (err) {
return done(err);
}
if (!message) {
return cursor.close(() => {
done(null, true);
});
}
if (queryAll && !session.selected.uidList.includes(message.uid)) {
// skip processing messages that we do not know about yet
return processNext();
}
let markAsSeen = options.markAsSeen && !message.flags.includes('\\Seen');
if (markAsSeen) {
message.flags.unshift('\\Seen');
}
let stream = imapHandler.compileStream(
session.formatResponse('FETCH', message.uid, {
query: options.query,
values: session.getQueryResponse(options.query, message, {
logger: server.logger,
fetchOptions: {},
database: db.database,
gridfs: db.gridfs,
acceptUTF8Enabled: session.isUTF8Enabled()
})
})
);
stream.description = util.format('* FETCH #%s uid=%s size=%sB ', ++rowCount, message.uid, message.size);
stream.on('error', err => {
session.socket.write('* BYE INTERNAL ERROR\n');
session.socket.destroy(); // ended up in erroneus state, kill the connection to abort
return cursor.close(() => done(err));
});
// send formatted response to socket
session.writeStream.write(stream, () => {
if (!markAsSeen) {
return processNext();
}
server.logger.debug(
{
tnx: 'flags',
cid: session.id
},
'[%s] UPDATE FLAGS for "%s"',
session.id,
message.uid
);
isUpdated = true;
updateEntries.push({
updateOne: {
filter: {
_id: message._id,
// include sharding key in query
mailbox: mailbox._id,
uid: message.uid
},
update: {
$addToSet: {
flags: '\\Seen'
},
$set: {
unseen: false
}
}
}
});
notifyEntries.push({
command: 'FETCH',
ignore: session.id,
uid: message.uid,
flags: message.flags,
message: message._id,
unseenChange: true
});
if (updateEntries.length >= consts.BULK_BATCH_SIZE) {
return db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}, err => {
updateEntries = [];
if (err) {
return cursor.close(() => done(err));
}
server.notifier.addEntries(session.user.id, path, notifyEntries, () => {
notifyEntries = [];
server.notifier.fire(session.user.id, path);
processNext();
});
});
} else {
processNext();
}
});
});
};
processNext();
});
};