List flagged messages

This commit is contained in:
Andris Reinman 2017-11-22 16:22:36 +02:00
parent 6db874255c
commit 4008a3bdd9
7 changed files with 2511 additions and 2072 deletions

View file

@ -142,7 +142,7 @@ All failed responses look like the following:
### Paging
For paging lists longer than allowed limit, Wild Duck API returns URLs for `next` and `previous` pages. When paging do not change these URLs yourself. If query arguments are changed then the results might be unreliable.
For paging lists longer than allowed limit, Wild Duck API returns cursors for `next` and `previous` pages.
```json
{
@ -155,6 +155,16 @@ For paging lists longer than allowed limit, Wild Duck API returns URLs for `next
}
```
The `page` property in return value is a "soft" argument, you need to set it yourself with the query argument. WildDuck does not know from which page the request was made to but your application does.
Fetch next page using cursors:
GET /users/{user}/mailboxes/{mailbox}/messages?next={nextCursor}&page={page+1}
Fetch previous page using cursors:
GET /users/{user}/mailboxes/{mailbox}/messages?previous={nextCursor}&page={page-1}
## Users
User accounts
@ -1290,6 +1300,54 @@ Response for a successful operation:
}
```
### List flagged messages
#### GET /user/{user}/flagged
Lists flagged messages in the account (excludes Spam and Trash)
**Parameters**
- **user** (required) is the ID of the user
- **order** optional message ordering, either "asc" or "desc". Defaults to "desc" (newer first)
**Example**
```
curl "http://localhost:8080/users/59467f27535f8f0f067ba8e6/flagged"
```
Response for a successful operation:
```json
{
"success": true,
"total": 1,
"page": 1,
"previousCursor": false,
"nextCursor": false,
"results": [
{
"id": 444,
"mailbox": "59467f27535f8f0f067ba8e6",
"thread": "5971da7754cfdc7f0983bbde",
"from": {
"address": "sender@example.com",
"name": "Sender Name"
},
"subject": "Subject line",
"date": "2011-11-02T19:19:08.000Z",
"intro": "Beginning text in the message…",
"attachments": false,
"seen": true,
"deleted": false,
"flagged": true,
"draft": false
}
]
}
```
### Search for messages
#### GET /user/{user}/search

51
imap.js
View file

@ -249,6 +249,7 @@ function clearExpiredMessages() {
uid: true,
size: true,
'mimeTree.attachmentMap': true,
'meta.queueId': true,
magic: true,
unseen: true
});
@ -307,28 +308,38 @@ function clearExpiredMessages() {
let attachmentIds = Object.keys(messageData.mimeTree.attachmentMap || {}).map(key => messageData.mimeTree.attachmentMap[key]);
if (!attachmentIds.length) {
// no stored attachments
deleted++;
if (consts.GC_DELAY_DELETE) {
setTimeout(processNext, consts.GC_DELAY_DELETE);
} else {
setImmediate(processNext);
}
return;
}
return db.database.collection('messagelog').insertOne(
{
id: messageData.meta.queueId || messageData._id.toString(),
action: 'DELETED',
parentId: messageData._id,
created: new Date()
},
() => {
if (!attachmentIds.length) {
// no stored attachments
deleted++;
if (consts.GC_DELAY_DELETE) {
setTimeout(processNext, consts.GC_DELAY_DELETE);
} else {
setImmediate(processNext);
}
return;
}
messageHandler.attachmentStorage.updateMany(attachmentIds, -1, -messageData.magic, err => {
if (err) {
// should we care about this error?
messageHandler.attachmentStorage.updateMany(attachmentIds, -1, -messageData.magic, err => {
if (err) {
// should we care about this error?
}
deleted++;
if (consts.GC_DELAY_DELETE) {
setTimeout(processNext, consts.GC_DELAY_DELETE);
} else {
setImmediate(processNext);
}
});
}
deleted++;
if (consts.GC_DELAY_DELETE) {
setTimeout(processNext, consts.GC_DELAY_DELETE);
} else {
setImmediate(processNext);
}
});
);
});
});
};

File diff suppressed because it is too large Load diff

View file

@ -4,14 +4,22 @@ const crypto = require('crypto');
const Joi = require('joi');
const ObjectID = require('mongodb').ObjectID;
const tools = require('../tools');
const base32 = require('base32.js');
module.exports = (db, server, notifier) => {
server.get('/users/:user/updates', (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
user: Joi.string().hex().lowercase().length(24).required(),
'Last-Event-ID': Joi.string().hex().lowercase().length(24)
user: Joi.string()
.hex()
.lowercase()
.length(24)
.required(),
'Last-Event-ID': Joi.string()
.hex()
.lowercase()
.length(24)
});
if (req.header('Last-Event-ID')) {
@ -33,116 +41,127 @@ module.exports = (db, server, notifier) => {
let user = new ObjectID(result.value.user);
let lastEventId = result.value['Last-Event-ID'] ? new ObjectID(result.value['Last-Event-ID']) : false;
db.users.collection('users').findOne({
_id: user
}, {
fields: {
username: true,
address: true
}
}, (err, userData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!userData) {
res.json({
error: 'This user does not exist'
});
return next();
}
let session = { id: crypto.randomBytes(10).toString('base64'), user: { id: userData._id, username: userData.username } };
let closed = false;
let idleTimer = false;
let idleCounter = 0;
let sendIdleComment = () => {
clearTimeout(idleTimer);
if (closed) {
return;
db.users.collection('users').findOne(
{
_id: user
},
{
fields: {
username: true,
address: true
}
res.write(': idling ' + ++idleCounter + '\n\n');
idleTimer = setTimeout(sendIdleComment, 15 * 1000);
};
let resetIdleComment = () => {
clearTimeout(idleTimer);
if (closed) {
return;
}
idleTimer = setTimeout(sendIdleComment, 15 * 1000);
};
let journalReading = false;
let journalReader = () => {
if (journalReading || closed) {
return;
}
journalReading = true;
loadJournalStream(db, req, res, user, lastEventId, (err, info) => {
if (err) {
// ignore?
}
lastEventId = info && info.lastEventId;
journalReading = false;
if (info && info.processed) {
resetIdleComment();
}
});
};
let close = () => {
closed = true;
clearTimeout(idleTimer);
notifier.removeListener(session, '*', journalReader);
};
let setup = () => {
notifier.addListener(session, '*', journalReader);
let finished = false;
let done = () => {
if (finished) {
return;
}
finished = true;
close();
},
(err, userData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!userData) {
res.json({
error: 'This user does not exist'
});
return next();
}
let session = {
id: 'api.' + base32.encode(crypto.randomBytes(10)).toLowerCase(),
user: {
id: userData._id,
username: userData.username
}
};
req.connection.setTimeout(30 * 60 * 1000, done);
req.connection.on('end', done);
req.connection.on('error', done);
};
let closed = false;
let idleTimer = false;
let idleCounter = 0;
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
if (lastEventId) {
loadJournalStream(db, req, res, user, lastEventId, (err, info) => {
if (err) {
res.write('event: error\ndata: ' + err.message.split('\n').join('\ndata: ') + '\n\n');
// ignore
let sendIdleComment = () => {
clearTimeout(idleTimer);
if (closed) {
return;
}
setup();
if (info && info.processed) {
resetIdleComment();
} else {
res.write(': idling ' + ++idleCounter + '\n\n');
idleTimer = setTimeout(sendIdleComment, 15 * 1000);
};
let resetIdleComment = () => {
clearTimeout(idleTimer);
if (closed) {
return;
}
idleTimer = setTimeout(sendIdleComment, 15 * 1000);
};
let journalReading = false;
let journalReader = () => {
if (journalReading || closed) {
return;
}
journalReading = true;
loadJournalStream(db, req, res, user, lastEventId, (err, info) => {
if (err) {
// ignore?
}
lastEventId = info && info.lastEventId;
journalReading = false;
if (info && info.processed) {
resetIdleComment();
}
});
};
let close = () => {
closed = true;
clearTimeout(idleTimer);
notifier.removeListener(session, '*', journalReader);
};
let setup = () => {
notifier.addListener(session, '*', journalReader);
let finished = false;
let done = () => {
if (finished) {
return;
}
finished = true;
close();
return next();
};
req.connection.setTimeout(30 * 60 * 1000, done);
req.connection.on('end', done);
req.connection.on('error', done);
};
res.writeHead(200, { 'Content-Type': 'text/event-stream' });
if (lastEventId) {
loadJournalStream(db, req, res, user, lastEventId, (err, info) => {
if (err) {
res.write('event: error\ndata: ' + err.message.split('\n').join('\ndata: ') + '\n\n');
// ignore
}
setup();
if (info && info.processed) {
resetIdleComment();
} else {
sendIdleComment();
}
});
} else {
db.database.collection('journal').findOne({ user }, { sort: { _id: -1 } }, (err, latest) => {
if (!err && latest) {
lastEventId = latest._id;
}
setup();
sendIdleComment();
}
});
} else {
db.database.collection('journal').findOne({ user }, { sort: { _id: -1 } }, (err, latest) => {
if (!err && latest) {
lastEventId = latest._id;
}
setup();
sendIdleComment();
});
});
}
}
});
);
});
};
@ -158,7 +177,12 @@ function formatJournalData(e) {
});
let response = [];
response.push('data: ' + JSON.stringify(data, false, 2).split('\n').join('\ndata: '));
response.push(
'data: ' +
JSON.stringify(data, false, 2)
.split('\n')
.join('\ndata: ')
);
if (e._id) {
response.push('id: ' + e._id.toString());
}
@ -174,7 +198,10 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
let mailboxes = new Set();
let cursor = db.database.collection('journal').find(query).sort({ _id: 1 });
let cursor = db.database
.collection('journal')
.find(query)
.sort({ _id: 1 });
let processed = 0;
let processNext = () => {
cursor.next((err, e) => {

View file

@ -1,10 +1,9 @@
'use strict';
const consts = require('../consts');
const db = require('../db');
// EXPUNGE deletes all messages in selected mailbox marked with \Delete
module.exports = server => (path, update, session, callback) => {
module.exports = (server, messageHandler) => (path, update, session, callback) => {
server.logger.debug(
{
tnx: 'expunge',
@ -14,118 +13,123 @@ module.exports = server => (path, update, session, callback) => {
session.id,
path
);
db.database.collection('mailboxes').findOne({
user: session.user.id,
path
}, (err, mailboxData) => {
if (err) {
return callback(err);
}
if (!mailboxData) {
return callback(null, 'NONEXISTENT');
}
let cursor = db.database
.collection('messages')
.find({
user: session.user.id,
mailbox: mailboxData._id,
undeleted: false,
// uid is part of the sharding key so we need it somehow represented in the query
uid: {
$gt: 0,
$lt: mailboxData.uidNext
}
})
.sort([['uid', 1]]);
let deletedMessages = 0;
let deletedStorage = 0;
let updateQuota = next => {
if (!deletedMessages) {
return next();
db.database.collection('mailboxes').findOne(
{
user: session.user.id,
path
},
(err, mailboxData) => {
if (err) {
return callback(err);
}
if (!mailboxData) {
return callback(null, 'NONEXISTENT');
}
db.users.collection('users').findOneAndUpdate(
{
_id: mailboxData.user
},
{
$inc: {
storageUsed: -deletedStorage
let cursor = db.database
.collection('messages')
.find({
user: session.user.id,
mailbox: mailboxData._id,
undeleted: false,
// uid is part of the sharding key so we need it somehow represented in the query
uid: {
$gt: 0,
$lt: mailboxData.uidNext
}
},
next
);
};
})
.sort([['uid', 1]]);
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
return updateQuota(() => callback(err));
}
if (!messageData) {
return cursor.close(() => {
updateQuota(() => {
server.notifier.fire(session.user.id, path);
if (session && session.selected && session.selected.uidList) {
session.writeStream.write({
tag: '*',
command: String(session.selected.uidList.length),
attributes: [
{
type: 'atom',
value: 'EXISTS'
}
]
});
}
return callback(null, true);
});
});
let deletedMessages = 0;
let deletedStorage = 0;
let updateQuota = next => {
if (!deletedMessages) {
return next();
}
messageData.exp = true;
messageData.rdate = Date.now() + consts.ARCHIVE_TIME;
db.database.collection('archived').insertOne(messageData, err => {
if (err) {
return updateQuota(() => cursor.close(() => callback(err)));
}
if (!update.silent) {
session.writeStream.write(session.formatResponse('EXPUNGE', messageData.uid));
}
db.database.collection('messages').deleteOne({
_id: messageData._id,
mailbox: mailboxData._id,
uid: messageData.uid
}, err => {
if (err) {
return updateQuota(() => cursor.close(() => callback(err)));
db.users.collection('users').findOneAndUpdate(
{
_id: mailboxData.user
},
{
$inc: {
storageUsed: -deletedStorage
}
},
next
);
};
deletedMessages++;
deletedStorage += Number(messageData.size) || 0;
let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
return updateQuota(() => callback(err));
}
if (!messageData) {
return cursor.close(() => {
updateQuota(() => {
server.notifier.fire(session.user.id, path);
if (!update.silent && session && session.selected && session.selected.uidList) {
session.writeStream.write({
tag: '*',
command: String(session.selected.uidList.length),
attributes: [
{
type: 'atom',
value: 'EXISTS'
}
]
});
}
return callback(null, true);
});
});
}
return server.notifier.addEntries(
session.user.id,
path,
{
command: 'EXPUNGE',
ignore: session.id,
uid: messageData.uid,
message: messageData._id,
unseen: messageData.unseen
},
processNext
);
});
messageHandler.del(
{
messageData,
session,
// do not archive drafts
archive: !messageData.flags.includes('\\Draft'),
delayNotifications: true
},
err => {
if (err) {
server.logger.error(
{
tnx: 'EXPUNGE',
err
},
'Failed to delete message id=%s. %s',
messageData._id,
err.message
);
return cursor.close(() => updateQuota(() => callback(err)));
}
server.logger.debug(
{
tnx: 'EXPUNGE',
err
},
'Deleted message id=%s',
messageData._id
);
deletedMessages++;
deletedStorage += Number(messageData.size) || 0;
if (!update.silent) {
session.writeStream.write(session.formatResponse('EXPUNGE', messageData.uid));
}
setImmediate(processNext);
}
);
});
});
};
};
processNext();
});
processNext();
}
);
};

View file

@ -118,6 +118,8 @@ class ImapNotifier extends EventEmitter {
addListener(session, path, handler) {
let eventName = this._eventName(session.user.id.toString(), path);
this._listeners.addListener(eventName, handler);
this.logger.debug('[%s] New journal listener for %s ("%s:%s")', session.id, eventName, session.user.username, path);
}
/**
@ -171,19 +173,24 @@ class ImapNotifier extends EventEmitter {
if (updated.length) {
// provision new modseq value
return this.database.collection('mailboxes').findOneAndUpdate(mailboxQuery, {
$inc: {
modifyIndex: 1
}
}, {
returnOriginal: false
}, (err, item) => {
if (err) {
return callback(err);
}
return this.database.collection('mailboxes').findOneAndUpdate(
mailboxQuery,
{
$inc: {
modifyIndex: 1
}
},
{
returnOriginal: false
},
(err, item) => {
if (err) {
return callback(err);
}
next(null, item && item.value);
});
next(null, item && item.value);
}
);
}
if (mailbox) {
return next(null, mailbox);
@ -193,18 +200,22 @@ class ImapNotifier extends EventEmitter {
// final action to push entries to journal
let pushToJournal = () => {
this.database.collection('journal').insertMany(entries, {
w: 1,
ordered: false
}, (err, r) => {
if (err) {
return callback(err);
this.database.collection('journal').insertMany(
entries,
{
w: 1,
ordered: false
},
(err, r) => {
if (err) {
return callback(err);
}
setImmediate(() => this.updateCounters(entries));
return callback(null, r.insertedCount);
}
setImmediate(() => this.updateCounters(entries));
return callback(null, r.insertedCount);
});
);
};
getMailbox((err, mailboxData) => {
@ -226,22 +237,26 @@ class ImapNotifier extends EventEmitter {
if (updated.length) {
this.logger.debug('Updating message collection %s %s entries', mailboxData._id, updated.length);
this.database.collection('messages').updateMany({
_id: {
$in: updated
this.database.collection('messages').updateMany(
{
_id: {
$in: updated
},
mailbox: mailboxData._id
},
mailbox: mailboxData._id
}, {
// only update modseq if the new value is larger than old one
$max: {
modseq
{
// only update modseq if the new value is larger than old one
$max: {
modseq
}
},
err => {
if (err) {
this.logger.error('Error updating modseq for messages. %s', err.message);
}
pushToJournal();
}
}, err => {
if (err) {
this.logger.error('Error updating modseq for messages. %s', err.message);
}
pushToJournal();
});
);
} else {
pushToJournal();
}
@ -277,27 +292,30 @@ class ImapNotifier extends EventEmitter {
modifyIndex = Number(modifyIndex) || 0;
let user = session.user.id;
this.database.collection('mailboxes').findOne({
user,
path
}, (err, mailbox) => {
if (err) {
return callback(err);
this.database.collection('mailboxes').findOne(
{
user,
path
},
(err, mailbox) => {
if (err) {
return callback(err);
}
if (!mailbox) {
return callback(null, 'NONEXISTENT');
}
this.database
.collection('journal')
.find({
mailbox: mailbox._id,
modseq: {
$gt: modifyIndex
}
})
.sort([['modseq', 1]])
.toArray(callback);
}
if (!mailbox) {
return callback(null, 'NONEXISTENT');
}
this.database
.collection('journal')
.find({
mailbox: mailbox._id,
modseq: {
$gt: modifyIndex
}
})
.sort([['modseq', 1]])
.toArray(callback);
});
);
}
updateCounters(entries) {

File diff suppressed because it is too large Load diff