Added API endpoint to push changes in user account

This commit is contained in:
Andris Reinman 2017-07-18 17:38:05 +03:00
parent ad1ac6d280
commit ba3e111ed2
17 changed files with 388 additions and 140 deletions

View file

@ -148,7 +148,7 @@ If a messages is downloaded by a client this message gets marked as _Seen_
If a messages is deleted by a client this message gets marked as Seen and moved to Trash folder
## HTTP API
# HTTP API
> **NB!** The HTTP API is being re-designed
@ -160,7 +160,7 @@ TODO:
2. Search/list messages
3. Expose journal updates through WebSocket or similar
#### Responses
### Responses
All failed responses look like the following:
@ -170,13 +170,13 @@ All failed responses look like the following:
}
```
### Users
## Users
User accounts
#### Get one user
### Get one user
##### GET /users/{user}
#### GET /users/{user}
Returns data about a specific user
@ -220,9 +220,9 @@ Response for a successful operation:
Recipient/forward limits assume that messages are sent using ZoneMTA with [zonemta-wildduck](https://github.com/wildduck-email/zonemta-wildduck) plugin, otherwise the counters are not updated.
#### Add a new user
### Add a new user
##### POST /users
#### POST /users
Creates a new user, returns the ID upon success.
@ -258,9 +258,9 @@ Response for a successful operation:
After you have created an user you can use these credentials to log in to the IMAP server.
#### Update user details
### Update user details
##### PUT /users/{user}
#### PUT /users/{user}
Updates the properties of an user. Only specify these fields that you want to be updated.
@ -292,13 +292,13 @@ Response for a successful operation:
}
```
### UserAddresses
## UserAddresses
Manage email addresses and aliases for an user.
#### List addresses
### List addresses
##### GET /users/{user}/addresses
#### GET /users/{user}/addresses
Lists all registered email addresses for an user.
@ -334,9 +334,9 @@ Response for a successful operation:
}
```
#### Get one address
### Get one address
##### GET /users/{user}/addresses/{address}
#### GET /users/{user}/addresses/{address}
Returns data about a specific address.
@ -363,9 +363,9 @@ Response for a successful operation:
}
```
#### Add a new address
### Add a new address
##### POST /users/{user}/addresses
#### POST /users/{user}/addresses
Creates a new email address alias for an existing user, returns the ID upon success.
@ -394,9 +394,9 @@ Response for a successful operation:
After you have registered a new address then LMTP maildrop server starts accepting mail for it and stores messages to the users mailbox.
#### Update address details
### Update address details
##### PUT /users/{user}/addresses/{address}
#### PUT /users/{user}/addresses/{address}
Updates the properties of an address. Currently, only `main` can be updated.
@ -421,9 +421,9 @@ Response for a successful operation:
"success": true
}
```
#### Delete an alias address
### Delete an alias address
##### DELETE /users/{user}/addresses/{address}
#### DELETE /users/{user}/addresses/{address}
Deletes an email address alias from an existing user.
@ -446,9 +446,11 @@ Response for a successful operation:
}
```
#### Recalculate user quota
## UserQuota
##### POST /users/{user}/quota/reset
### Recalculate user quota
#### POST /users/{user}/quota/reset
Recalculates used storage for an user. Use this when it seems that quota counters for an user do not match with reality.
@ -473,6 +475,40 @@ Response for a successful operation:
Be aware though that this method is not atomic and should be done only if quota counters are way off.
## UserUpdates
Get user related events as an Event Source stream
### Stream update events
#### GET /users/{user}/updates
Streams changes in user account as EventSource stream
**Parameters**
- **user** (required) is the ID of the user
**Example**
```
curl "http://localhost:8080/users/59467f27535f8f0f067ba8e6/updates"
```
Response stream:
```
data: {"command":"EXISTS", "message":"596e0703f0bdd512aeac3600", "mailbox":"596c9c37ef2213165daadc65",...}
id: 596e0703f0bdd512aeac3605
data: {"command":"CREATE","mailbox":"596e09853f845a14f3620b5c","name":"My Mail",...}
id: 596e09853f845a14f3620b5d
```
First entry in the event stream indicates that a message with id `596e0703f0bdd512aeac3600` was added to mailbox `596c9c37ef2213165daadc65`, second entry indicates that a new mailbox called *"My Mail"* with id `596e09853f845a14f3620b5c` was created.
Be aware though that this connection needs to be properly closed if you do not want to end up with memory leaks.
## Message filtering
> The filtering system is subject to change with the API updates. Most probably the filters are going to reside in separate collection and not as part of the user object.
@ -514,7 +550,7 @@ Filters are configuration objects stored in the `filters` array of the users obj
action: {
// mark message as seen
seen: true,
unseen: false,
// mark message as flagged
flag: true,

162
api.js
View file

@ -5,9 +5,11 @@ const restify = require('restify');
const log = require('npmlog');
const Joi = require('joi');
const bcrypt = require('bcryptjs');
const crypto = require('crypto');
const tools = require('./lib/tools');
const consts = require('./lib/consts');
const UserHandler = require('./lib/user-handler');
const ImapNotifier = require('./lib/imap-notifier');
const db = require('./lib/db');
const certs = require('./lib/certs').get('api');
const ObjectID = require('mongodb').ObjectID;
@ -27,6 +29,7 @@ if (certs && config.api.secure) {
const server = restify.createServer(serverOptions);
let userHandler;
let notifier;
server.use(restify.plugins.queryParser());
server.use(
@ -1034,6 +1037,161 @@ server.get('/users/:user/mailboxes/:mailbox', (req, res, next) => {
});
});
server.get('/users/:user/updates', (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
user: Joi.string().hex().lowercase().length(24).required(),
'Last-Event-ID': Joi.string().hex().lowercase().length(24)
});
if (req.header('Last-Event-ID')) {
req.params['Last-Event-ID'] = req.header('Last-Event-ID');
}
const result = Joi.validate(req.params, schema, {
abortEarly: false,
convert: true
});
if (result.error) {
res.json({
error: result.error.message
});
return next();
}
let user = new ObjectID(result.value.user);
let lastEventId = result.value['Last-Event-ID'] ? new ObjectID(result.value['Last-Event-ID']) : false;
db.users.collection('users').findOne({
_id: user
}, {
fields: {
username: true,
address: true
}
}, (err, userData) => {
if (err) {
res.json({
error: 'MongoDB Error: ' + err.message
});
return next();
}
if (!userData) {
res.json({
error: 'This user does not exist'
});
return next();
}
let session = { id: crypto.randomBytes(10).toString('base64'), user: { id: userData._id, username: userData.username } };
let journalReading = false;
let journalReader = () => {
if (journalReading) {
return;
}
journalReading = true;
loadJournalStream(req, res, user, lastEventId, (err, info) => {
if (err) {
// ignore?
}
lastEventId = info && info.lastEventId;
journalReading = false;
});
};
let close = () => {
notifier.removeListener(session, '*', journalReader);
};
let setup = () => {
notifier.addListener(session, '*', journalReader);
let finished = false;
let done = () => {
if (finished) {
return;
}
finished = true;
close();
return next();
};
req.connection.setTimeout(30 * 60 * 1000, done);
req.connection.on('end', done);
};
res.writeHead(200, { 'Content-Type': 'text/event-stream', Connection: 'close' });
if (lastEventId) {
loadJournalStream(req, res, user, lastEventId, setup);
} else {
db.database.collection('journal').findOne({ user }, { sort: { _id: -1 } }, (err, latest) => {
if (!err && latest) {
lastEventId = latest._id;
}
setup();
});
}
});
});
function formatJournalData(e) {
let data = {};
Object.keys(e).forEach(key => {
if (!['_id', 'ignore', 'user'].includes(key)) {
data[key] = e[key];
}
});
let response = [];
response.push('data: ' + JSON.stringify(data));
response.push('id: ' + e._id.toString());
return response.join('\n') + '\n\n';
}
function loadJournalStream(req, res, user, lastEventId, done) {
let query = { user };
if (lastEventId) {
query._id = { $gt: lastEventId };
}
let cursor = db.database.collection('journal').find(query).sort({ _id: 1 });
let processed = 0;
let processNext = () => {
cursor.next((err, e) => {
if (err) {
return done(err);
}
if (!e) {
return cursor.close(() => {
// delete all attachments that do not have any active links to message objects
done(null, {
lastEventId,
processed
});
});
}
lastEventId = e._id;
if (!e || !e.command) {
// skip
return processNext();
}
res.write(formatJournalData(e));
processed++;
processNext();
});
};
processNext();
}
module.exports = done => {
if (!config.imap.enabled) {
return setImmediate(() => done(null, false));
@ -1042,6 +1200,10 @@ module.exports = done => {
let started = false;
userHandler = new UserHandler({ database: db.database, users: db.users, redis: db.redis });
notifier = new ImapNotifier({
database: db.database,
redis: db.redis
});
server.on('error', err => {
if (!started) {

View file

@ -56,7 +56,7 @@ module.exports = {
});
}
this._server.notifier.fire(this.session.user.username, mailbox, {
this._server.notifier.fire(this.session.user.id, mailbox, {
action: 'DELETE',
mailbox
});

View file

@ -279,7 +279,7 @@ module.exports = function(options) {
uidNext: folder.uidNext,
uidValidity: folder.uidValidity,
highestModseq: folder.modifyIndex,
unseen: folder.messages.filter(message => message.flags.indexOf('\\Seen') < 0).length
unseen: folder.messages.filter(message => !message.flags.includes('\\Seen')).length
});
};
@ -313,7 +313,7 @@ module.exports = function(options) {
uid: message.uid
},
() => {
this.notifier.fire(session.user.username, mailbox);
this.notifier.fire(session.user.id, mailbox);
return callback(null, true, {
uidValidity: folder.uidValidity,
@ -339,7 +339,7 @@ module.exports = function(options) {
let processMessages = () => {
if (i >= folder.messages.length) {
this.notifier.fire(session.user.username, mailbox);
this.notifier.fire(session.user.id, mailbox);
return callback(null, true, modified);
}
@ -445,7 +445,7 @@ module.exports = function(options) {
}
let entries = [];
for ((i = 0), (len = deleted.length); i < len; i++) {
for (i = 0, len = deleted.length; i < len; i++) {
entries.push({
command: 'EXPUNGE',
ignore: session.id,
@ -457,7 +457,7 @@ module.exports = function(options) {
}
this.notifier.addEntries(session.user.username, mailbox, entries, () => {
this.notifier.fire(session.user.username, mailbox);
this.notifier.fire(session.user.id, mailbox);
return callback(null, true);
});
};
@ -490,7 +490,7 @@ module.exports = function(options) {
}
}
for ((i = 0), (len = messages.length); i < len; i++) {
for (i = 0, len = messages.length; i < len; i++) {
messages[i].uid = destinationFolder.uidNext++;
destinationUid.push(messages[i].uid);
destinationFolder.messages.push(messages[i]);
@ -532,7 +532,7 @@ module.exports = function(options) {
}
// if BODY[] is touched, then add \Seen flag and notify other clients
if (message.flags.indexOf('\\Seen') < 0) {
if (!message.flags.includes('\\Seen')) {
message.flags.unshift('\\Seen');
entries.push({
command: 'FETCH',
@ -549,7 +549,7 @@ module.exports = function(options) {
let processMessage = () => {
if (pos >= folder.messages.length) {
// once messages are processed show relevant updates
this.notifier.fire(session.user.username, mailbox);
this.notifier.fire(session.user.id, mailbox);
return callback(null, true);
}
let message = folder.messages[pos++];

View file

@ -151,16 +151,18 @@ indexes:
text: 5
- collection: messages
index:
name: mailbox_seen_flag
# in most cases we only care about unseen, not seen messages
name: mailbox_unseen_flag
key:
mailbox: 1
seen: 1
unseen: 1
- collection: messages
index:
name: mailbox_deleted_flag
# some mail agents list messages that do not have the \Deleted flag set
name: mailbox_undeleted_flag
key:
mailbox: 1
deleted: 1
undeleted: 1
- collection: messages
index:
name: mailbox_flagged_flag
@ -219,10 +221,25 @@ indexes:
- collection: journal
index:
# this index is used to apply changes in a mailbox for IMAP session
name: mailbox_modseq
key:
mailbox: 1
modseq: 1
- collection: journal
index:
# this index is used to send updates to a logged in webmail user
name: user_limit_id
key:
user: 1
_id: 1
- collection: journal
index:
# this index is used to find the latest journal entry
name: user_limit_id_reverse
key:
user: 1
_id: -1
- collection: journal
index:
name: autoexpire

View file

@ -46,11 +46,23 @@ module.exports = server => (path, session, callback) => {
retention: user.retention
};
db.database.collection('mailboxes').insertOne(mailbox, err => {
db.database.collection('mailboxes').insertOne(mailbox, (err, r) => {
if (err) {
return callback(err);
}
return callback(null, true);
return server.notifier.addEntries(
session.user.id,
path,
{
command: 'CREATE',
mailbox: r.insertId,
name: path
},
() => {
server.notifier.fire(session.user.id, path);
return callback(null, true);
}
);
});
});
});

View file

@ -27,83 +27,87 @@ module.exports = server => (path, session, callback) => {
return callback(null, 'CANNOT');
}
db.database.collection('mailboxes').deleteOne({
_id: mailbox._id
}, err => {
if (err) {
return callback(err);
}
// calculate mailbox size by aggregating the size's of all messages
db.database
.collection('messages')
.aggregate(
[
{
$match: {
mailbox: mailbox._id
}
},
{
$group: {
_id: {
mailbox: '$mailbox'
},
storageUsed: {
$sum: '$size'
}
}
}
],
{
cursor: {
batchSize: 1
}
}
)
.toArray((err, res) => {
server.notifier.addEntries(
session.user.id,
path,
{
command: 'DROP',
mailbox: mailbox._id
},
() => {
db.database.collection('mailboxes').deleteOne({
_id: mailbox._id
}, err => {
if (err) {
return callback(err);
}
let storageUsed = (res && res[0] && res[0].storageUsed) || 0;
// calculate mailbox size by aggregating the size's of all messages
db.database
.collection('messages')
.aggregate(
[
{
$match: {
mailbox: mailbox._id
}
},
{
$group: {
_id: {
mailbox: '$mailbox'
},
storageUsed: {
$sum: '$size'
}
}
}
],
{
cursor: {
batchSize: 1
}
}
)
.toArray((err, res) => {
if (err) {
return callback(err);
}
db.database.collection('messages').deleteMany({
mailbox: mailbox._id
}, err => {
if (err) {
return callback(err);
}
let storageUsed = (res && res[0] && res[0].storageUsed) || 0;
let done = () => {
db.database.collection('journal').deleteMany({
db.database.collection('messages').deleteMany({
mailbox: mailbox._id
}, err => {
if (err) {
return callback(err);
}
callback(null, true);
});
};
if (!storageUsed) {
return done();
}
let done = () => {
server.notifier.fire(session.user.id, path);
callback(null, true);
};
// decrement quota counters
db.users.collection('users').findOneAndUpdate(
{
_id: mailbox.user
},
{
$inc: {
storageUsed: -Number(storageUsed) || 0
if (!storageUsed) {
return done();
}
},
done
);
});
// decrement quota counters
db.users.collection('users').findOneAndUpdate(
{
_id: mailbox.user
},
{
$inc: {
storageUsed: -Number(storageUsed) || 0
}
},
done
);
});
});
});
});
}
);
});
};

View file

@ -29,7 +29,7 @@ module.exports = server => (path, update, session, callback) => {
.find({
user: session.user.id,
mailbox: mailbox._id,
deleted: true
undeleted: false
})
.project({
_id: true,

View file

@ -164,7 +164,7 @@ module.exports = server => (path, options, session, callback) => {
flags: '\\Seen'
},
$set: {
seen: true
unseen: false
}
}
}

View file

@ -25,25 +25,35 @@ module.exports = server => (path, newname, session, callback) => {
if (mailbox) {
return callback(null, 'ALREADYEXISTS');
}
return server.notifier.addEntries(
session.user.id,
path,
{
command: 'RENAME',
name: newname
},
() => {
db.database.collection('mailboxes').findOneAndUpdate({
user: session.user.id,
path
}, {
$set: {
path: newname
}
}, {}, (err, item) => {
if (err) {
return callback(err);
}
db.database.collection('mailboxes').findOneAndUpdate({
user: session.user.id,
path
}, {
$set: {
path: newname
}
}, {}, (err, item) => {
if (err) {
return callback(err);
}
if (!item || !item.value) {
// was not able to acquire a lock
return callback(null, 'NONEXISTENT');
}
if (!item || !item.value) {
// was not able to acquire a lock
return callback(null, 'NONEXISTENT');
server.notifier.fire(session.user.id, path);
return callback(null, true);
});
}
callback(null, true);
});
);
});
};

View file

@ -37,7 +37,7 @@ module.exports = server => (path, session, callback) => {
.collection('messages')
.find({
mailbox: mailbox._id,
seen: false
unseen: true
})
.count((err, unseen) => {
if (err) {

View file

@ -130,9 +130,9 @@ module.exports = server => (path, update, session, callback) => {
flagsupdate = {
$set: {
flags: message.flags,
seen: message.flags.includes('\\Seen'),
unseen: !message.flags.includes('\\Seen'),
flagged: message.flags.includes('\\Flagged'),
deleted: message.flags.includes('\\Deleted'),
undeleted: !message.flags.includes('\\Deleted'),
draft: message.flags.includes('\\Draft')
}
};
@ -171,7 +171,7 @@ module.exports = server => (path, update, session, callback) => {
flagsupdate.$set = {};
if (newFlags.includes('\\Seen')) {
flagsupdate.$set = {
seen: true
unseen: false
};
}
if (newFlags.includes('\\Flagged')) {
@ -181,7 +181,7 @@ module.exports = server => (path, update, session, callback) => {
}
if (newFlags.includes('\\Deleted')) {
flagsupdate.$set = {
deleted: true
undeleted: false
};
}
if (newFlags.includes('\\Draft')) {
@ -225,7 +225,7 @@ module.exports = server => (path, update, session, callback) => {
flagsupdate.$set = {};
if (oldFlags.includes('\\Seen')) {
flagsupdate.$set = {
seen: false
unseen: true
};
}
if (oldFlags.includes('\\Flagged')) {
@ -235,7 +235,7 @@ module.exports = server => (path, update, session, callback) => {
}
if (oldFlags.includes('\\Deleted')) {
flagsupdate.$set = {
deleted: false
undeleted: true
};
}
if (oldFlags.includes('\\Draft')) {

View file

@ -41,6 +41,7 @@ class ImapNotifier extends EventEmitter {
clearTimeout(data.timeout);
publishTimers.delete(ev);
this._listeners.emit(ev);
this._listeners.emit(ev.split(':').shift() + ':*');
};
if (publishTimers.has(ev)) {
@ -49,7 +50,7 @@ class ImapNotifier extends EventEmitter {
data.count++;
if (data.initial < Date.now() - 1000) {
// if the event has been held back already for a second, the fire immediatelly
// if the event has been held back already for a second, then fire immediatelly
return fire();
}
} else {
@ -76,8 +77,9 @@ class ImapNotifier extends EventEmitter {
let data = JSON.parse(message);
if (data.e && !data.p) {
scheduleDataEvent(data.e);
} else {
} else if (data.e) {
this._listeners.emit(data.e, data.p);
this._listeners.emit(data.e.split(':').shift() + ':*', data.p);
}
} catch (E) {
//
@ -94,8 +96,11 @@ class ImapNotifier extends EventEmitter {
* @param {String} user
* @returns {String} md5 hex
*/
_eventName(path, user) {
return crypto.createHash('md5').update(user.toString() + ':' + path).digest('hex');
_eventName(user, path) {
if (path.length >= 32) {
path = crypto.createHash('md5').update(path).digest('hex');
}
return user + ':' + path;
}
/**
@ -213,6 +218,7 @@ class ImapNotifier extends EventEmitter {
entry.modseq = entry.modseq || modseq;
entry.created = entry.created || created;
entry.mailbox = entry.mailbox || mailbox._id;
entry.user = mailbox.user;
});
if (updated.length) {

View file

@ -160,9 +160,9 @@ class MessageHandler {
msgid,
// use boolean for more commonly used (and searched for) flags
seen: flags.includes('\\Seen'),
unseen: !flags.includes('\\Seen'),
flagged: flags.includes('\\Flagged'),
deleted: flags.includes('\\Deleted'),
undeleted: !flags.includes('\\Deleted'),
draft: flags.includes('\\Draft'),
magic: maildata.magic,
@ -653,7 +653,7 @@ class MessageHandler {
message.rdate = Date.now() + (target.retention || 0);
if (options.markAsSeen) {
message.seen = true;
message.unseen = false;
if (!message.flags.includes('\\Seen')) {
message.flags.push('\\Seen');
}
@ -683,7 +683,8 @@ class MessageHandler {
removeEntries.push({
command: 'EXPUNGE',
ignore: options.session && options.session.id,
uid: sourceUid
uid: messageUid,
message: messageId
});
existsEntries.push({

View file

@ -438,7 +438,7 @@ class POP3Connection extends EventEmitter {
this.session.state = 'UPDATE';
let deleted = this.session.listing.messages.filter(message => message.popped);
let seen = this.session.listing.messages.filter(message => !message.seen && message.fetched && !message.popped);
let seen = this.session.listing.messages.filter(message => message.seen && message.fetched && !message.popped);
if (!deleted.length && !seen.length) {
return finish();

View file

@ -188,7 +188,7 @@ class POP3Server extends EventEmitter {
* @param {Function} callback Callback to run with message listing
*/
onListMessages(session, callback) {
// messages are objects {id: 'abc', size: 123, seen: false}
// messages are objects {id: 'abc', size: 123, seen: true}
return callback(null, {
messages: [],
count: 0,

View file

@ -93,7 +93,7 @@ const serverOptions = {
mailbox: true,
// required to decide if we need to update flags after RETR
flags: true,
seen: true
unseen: true
})
.sort([['uid', -1]])
.limit(config.pop3.maxMessages || MAX_MESSAGES)
@ -113,7 +113,7 @@ const serverOptions = {
mailbox: message.mailbox,
size: message.size,
flags: message.flags,
seen: message.seen
seen: !message.unseen
})),
count: messages.length,
size: messages.reduce((acc, message) => acc + message.size, 0)
@ -263,7 +263,7 @@ function markAsSeen(session, messages, callback) {
}, {
$set: {
modseq: mailboxData.modifyIndex,
seen: true
unseen: false
},
$addToSet: {
flags: '\\Seen'