pop3 initial

This commit is contained in:
Andris Reinman 2017-04-07 21:29:14 +03:00
parent 18d4c34f70
commit 258b2f896e
8 changed files with 454 additions and 54 deletions

View file

@ -53,6 +53,13 @@ module.exports = {
maxMB: 5
},
pop3: {
enabled: true,
port: 9995,
host: '0.0.0.0',
secure: true
},
api: {
port: 8080
}

View file

@ -1,12 +1,12 @@
'use strict';
let net = require('net');
let tls = require('tls');
let IMAPConnection = require('./imap-connection').IMAPConnection;
let tlsOptions = require('./tls-options');
let EventEmitter = require('events').EventEmitter;
let util = require('util');
let clone = require('clone');
const net = require('net');
const tls = require('tls');
const IMAPConnection = require('./imap-connection').IMAPConnection;
const tlsOptions = require('./tls-options');
const EventEmitter = require('events').EventEmitter;
const shared = require('nodemailer/lib/shared');
const util = require('util');
const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated
@ -21,25 +21,16 @@ class IMAPServer extends EventEmitter {
constructor(options) {
super();
this.options = options ? clone(options) : {};
this.options = options || {};
// apply TLS defaults if needed
if (this.options.secure) {
this.options = tlsOptions(this.options);
}
// setup logger
if ('logger' in this.options) {
// use provided logger or use vanity logger if option is set to false
this.logger = this.options.logger || {
info: () => false,
debug: () => false,
error: () => false
};
} else {
// create default console logger
this.logger = this._createDefaultLogger();
}
this.logger = shared.getLogger(this.options, {
component: this.options.component || 'pop3-server'
});
/**
* Timeout after close has been called until pending connections are forcibly closed
@ -86,13 +77,17 @@ class IMAPServer extends EventEmitter {
// close active connections
if (connections) {
this.logger.info('Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000);
this.logger.info({
tnx: 'close'
}, 'Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000);
}
this._closeTimeout = setTimeout(() => {
connections = this.connections.size;
if (connections) {
this.logger.info('Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : '');
this.logger.info({
tnx: 'close'
}, 'Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : '');
this.connections.forEach(connection => {
connection.send('* BYE System shutdown');
@ -153,6 +148,14 @@ class IMAPServer extends EventEmitter {
_onListening() {
let address = this.server.address();
this.logger.info(
//
{
tnx: 'listen',
host: address.address,
port: address.port,
secure: !!this.options.secure,
protocol: 'IMAP'
},
'%sIMAP Server listening on %s:%s',
this.options.secure ? 'Secure ' : '',
address.family === 'IPv4' ? address.address : '[' + address.address + ']',
@ -165,7 +168,9 @@ class IMAPServer extends EventEmitter {
* @event
*/
_onClose() {
this.logger.info('IMAP Server closed');
this.logger.info({
tnx: 'closed'
}, 'IMAP Server closed');
this.emit('close');
}

23
imap.js
View file

@ -33,9 +33,18 @@ const serverOptions = {
},
logger: {
info: log.silly.bind(log, 'IMAP'),
debug: log.silly.bind(log, 'IMAP'),
error: log.error.bind(log, 'IMAP')
info(...args) {
args.shift();
log.info('IMAP', ...args);
},
debug(...args) {
args.shift();
log.silly('IMAP', ...args);
},
error(...args) {
args.shift();
log.error('IMAP', ...args);
}
},
maxMessage: config.imap.maxMB * 1024 * 1024,
@ -1696,7 +1705,9 @@ module.exports = done => {
started = true;
return done(err);
}
server.logger.error(err);
server.logger.error({
err
}, err);
});
// start listening
@ -1712,7 +1723,9 @@ module.exports = done => {
let indexpos = 0;
let ensureIndexes = () => {
if (indexpos >= setupIndexes.length) {
server.logger.info('Setup indexes for %s collections', setupIndexes.length);
server.logger.info({
tnx: 'mongo'
}, 'Setup indexes for %s collections', setupIndexes.length);
return start();
}
let index = setupIndexes[indexpos++];

158
lib/pop3-connection.js Normal file
View file

@ -0,0 +1,158 @@
'use strict';
const crypto = require('crypto');
const EventEmitter = require('events');
const SOCKET_TIMEOUT = 60 * 1000;
class POP3Connection extends EventEmitter {
constructor(server, socket) {
super();
this._server = server;
this._socket = socket;
this._closed = false;
this._closing = false;
this.remoteAddress = this._socket.remoteAddress;
this._id = crypto.randomBytes(9).toString('base64');
this.processing = false;
this.queue = [];
this._remainder = '';
}
init() {
this._setListeners();
this._resetSession();
this._server.logger.info({
tnx: 'connection',
cid: this._id,
host: this.remoteAddress
}, 'Connection from %s', this.remoteAddress);
this._socket.write('+OK WDPop ready for requests from ' + this.remoteAddress + '\r\n');
}
_setListeners() {
this._socket.on('close', () => this._onClose());
this._socket.on('error', err => this._onError(err));
this._socket.setTimeout(this._server.options.socketTimeout || SOCKET_TIMEOUT, () => this._onTimeout());
this._socket.on('readable', () => {
if (this.processing) {
return;
}
this.processing = true;
this.read();
});
}
/**
* Fired when the socket is closed
* @event
*/
_onClose( /* hadError */ ) {
if (this._closed) {
return;
}
this.queue = [];
this.processing = false;
this._remainder = '';
this._closed = true;
this._closing = false;
this._server.logger.info({
tnx: 'close',
cid: this._id,
host: this.remoteAddress,
user: this.user
}, 'Connection closed to %s', this.remoteAddress);
this.emit('close');
}
/**
* Fired when an error occurs with the socket
*
* @event
* @param {Error} err Error object
*/
_onError(err) {
if (err.code === 'ECONNRESET' || err.code === 'EPIPE') {
return this.close(); // mark connection as 'closing'
}
this._server.logger.error({
err,
tnx: 'error',
user: this.user
}, '%s', err.message);
this.emit('error', err);
}
/**
* Fired when socket timeouts. Closes connection
*
* @event
*/
_onTimeout() {
this.close();
}
_resetSession() {
this.session = {};
}
close() {
if (!this._socket.destroyed && this._socket.writable) {
this._socket.end();
}
this._closing = true;
}
read() {
let chunk;
let data = this._remainder;
while ((chunk = this._socket.read()) !== null) {
data += chunk.toString('binary');
if (data.indexOf('\n') >= 0) {
let lines = data.split(/\r?\n/).map(line => Buffer.from(line, 'binary').toString());
this._remainder = lines.pop();
if (lines.length) {
if (this.queue.length) {
this.queue = this.queue.concat(lines);
} else {
this.queue = lines;
}
}
return this.processQueue();
}
}
this.processing = false;
}
processQueue() {
if (!this.queue.length) {
this.read(); // see if there's anything left to read
return;
}
let line = this.queue.shift().trim();
let parts = line.split(' ');
let command = parts.shift().toUpperCase();
let args = parts.join(' ');
console.log({
command,
args
});
setImmediate(() => this.processQueue());
}
}
module.exports = POP3Connection;

147
lib/pop3-server.js Normal file
View file

@ -0,0 +1,147 @@
'use strict';
const EventEmitter = require('events');
const net = require('net');
const tls = require('tls');
const tlsOptions = require('../imap-core/lib/tls-options');
const shared = require('nodemailer/lib/shared');
const POP3Connection = require('./pop3-connection');
const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated
class POP3Server extends EventEmitter {
constructor(options) {
super();
this.options = options || {};
/**
* Timeout after close has been called until pending connections are forcibly closed
*/
this._closeTimeout = false;
/**
* A set of all currently open connections
*/
this.connections = new Set();
// apply TLS defaults if needed
if (this.options.secure) {
this.options = tlsOptions(this.options);
}
this.logger = shared.getLogger(this.options, {
component: this.options.component || 'pop3-server'
});
this.server = (this.options.secure ? tls : net)
.createServer(this.options, socket => this._onConnect(socket));
// ensure _sharedCreds, fixes an issue in node v4+ where STARTTLS fails because _sharedCreds does not exist
this.server._sharedCreds = this.server._sharedCreds || this.secureContext.get('default');
this._setListeners();
}
_setListeners() {
this.server.on('listening', () => this._onListening());
this.server.on('close', () => this._onClose());
this.server.on('error', err => this._onError(err));
}
/**
* Called when server started listening
*
* @event
*/
_onListening() {
let address = this.server.address();
this.logger.info(
//
{
tnx: 'listen',
host: address.address,
port: address.port,
secure: !!this.options.secure,
protocol: 'POP3'
},
'%s%s Server listening on %s:%s',
this.options.secure ? 'Secure ' : '',
'POP3',
address.family === 'IPv4' ? address.address : '[' + address.address + ']',
address.port);
}
/**
* Called when server is closed
*
* @event
*/
_onClose() {
this.logger.info({
tnx: 'closed'
}, 'POP3 Server closed');
this.emit('close');
}
/**
* Called when an error occurs with the server
*
* @event
*/
_onError(err) {
this.emit('error', err);
}
_onConnect(socket) {
let connection = new POP3Connection(this, socket);
this.connections.add(connection);
connection.once('error', err => {
this.connections.delete(connection);
this._onError(err);
});
connection.once('close', () => {
this.connections.delete(connection);
});
connection.init();
}
close(callback) {
let connections = this.connections.size;
let timeout = this.options.closeTimeout || CLOSE_TIMEOUT;
// stop accepting new connections
this.server.close(() => {
clearTimeout(this._closeTimeout);
if (typeof callback === 'function') {
return callback();
}
});
// close active connections
if (connections) {
this.logger.info({
tnx: 'close'
}, 'Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000);
}
this._closeTimeout = setTimeout(() => {
connections = this.connections.size;
if (connections) {
this.logger.info({
tnx: 'close'
}, 'Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : '');
this.connections.forEach(connection => {
connection.close();
});
}
}, timeout);
}
listen(...args) {
this.server.listen(...args);
}
}
module.exports = POP3Server;

View file

@ -20,7 +20,6 @@
},
"dependencies": {
"bcryptjs": "^2.4.3",
"clone": "^2.1.1",
"config": "^1.25.1",
"grid-fs": "^1.0.1",
"html-to-text": "^3.2.0",

63
pop3.js Normal file
View file

@ -0,0 +1,63 @@
'use strict';
const config = require('config');
const log = require('npmlog');
const POP3Server = require('./lib/pop3-server');
const fs = require('fs');
const serverOptions = {
port: config.pop3.port,
host: config.pop3.host,
secure: config.pop3.secure,
// log to console
logger: {
info(...args) {
args.shift();
log.info('POP3', ...args);
},
debug(...args) {
args.shift();
log.silly('POP3', ...args);
},
error(...args) {
args.shift();
log.error('POP3', ...args);
}
}
};
if (config.pop3.key) {
serverOptions.key = fs.readFileSync(config.pop3.key);
}
if (config.pop3.cert) {
serverOptions.cert = fs.readFileSync(config.pop3.cert);
}
const server = new POP3Server(serverOptions);
module.exports = done => {
if (!config.pop3.enabled) {
return setImmediate(() => done(null, false));
}
let started = false;
server.on('error', err => {
if (!started) {
started = true;
return done(err);
}
log.error('POP3', err);
});
server.listen(config.pop3.port, config.pop3.host, () => {
if (started) {
return server.close();
}
started = true;
done(null, server);
});
};

View file

@ -3,6 +3,7 @@
let config = require('config');
let log = require('npmlog');
let imap = require('./imap');
let pop3 = require('./pop3');
let smtp = require('./smtp');
let api = require('./api');
let db = require('./lib/db');
@ -19,41 +20,48 @@ db.connect(err => {
log.error('App', 'Failed to start IMAP server');
return process.exit(1);
}
// Start SMTP maildrop server
smtp(err => {
// Start POP3 server
pop3(err => {
if (err) {
log.error('App', 'Failed to start SMTP server');
log.error('App', 'Failed to start POP3 server');
return process.exit(1);
}
// Start HTTP API server
api(err => {
// Start SMTP maildrop server
smtp(err => {
if (err) {
log.error('App', 'Failed to start API server');
log.error('App', 'Failed to start SMTP server');
return process.exit(1);
}
log.info('App', 'All servers started, ready to process some mail');
// Start HTTP API server
api(err => {
if (err) {
log.error('App', 'Failed to start API server');
return process.exit(1);
}
// downgrade user and group if needed
if (config.group) {
try {
process.setgid(config.group);
log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message);
return process.exit(1);
log.info('App', 'All servers started, ready to process some mail');
// downgrade user and group if needed
if (config.group) {
try {
process.setgid(config.group);
log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid());
} catch (E) {
log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message);
return process.exit(1);
}
}
}
if (config.user) {
try {
process.setuid(config.user);
log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message);
return process.exit(1);
if (config.user) {
try {
process.setuid(config.user);
log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid());
} catch (E) {
log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message);
return process.exit(1);
}
}
}
});
});
});
});