From 258b2f896e0012497311b1906f99642add231e95 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Fri, 7 Apr 2017 21:29:14 +0300 Subject: [PATCH] pop3 initial --- config/default.js | 7 ++ imap-core/lib/imap-server.js | 51 ++++++----- imap.js | 23 +++-- lib/pop3-connection.js | 158 +++++++++++++++++++++++++++++++++++ lib/pop3-server.js | 147 ++++++++++++++++++++++++++++++++ package.json | 1 - pop3.js | 63 ++++++++++++++ worker.js | 58 +++++++------ 8 files changed, 454 insertions(+), 54 deletions(-) create mode 100644 lib/pop3-connection.js create mode 100644 lib/pop3-server.js create mode 100644 pop3.js diff --git a/config/default.js b/config/default.js index 96680874..7a6c2927 100644 --- a/config/default.js +++ b/config/default.js @@ -53,6 +53,13 @@ module.exports = { maxMB: 5 }, + pop3: { + enabled: true, + port: 9995, + host: '0.0.0.0', + secure: true + }, + api: { port: 8080 } diff --git a/imap-core/lib/imap-server.js b/imap-core/lib/imap-server.js index 1357d664..963c40e7 100644 --- a/imap-core/lib/imap-server.js +++ b/imap-core/lib/imap-server.js @@ -1,12 +1,12 @@ 'use strict'; -let net = require('net'); -let tls = require('tls'); -let IMAPConnection = require('./imap-connection').IMAPConnection; -let tlsOptions = require('./tls-options'); -let EventEmitter = require('events').EventEmitter; -let util = require('util'); -let clone = require('clone'); +const net = require('net'); +const tls = require('tls'); +const IMAPConnection = require('./imap-connection').IMAPConnection; +const tlsOptions = require('./tls-options'); +const EventEmitter = require('events').EventEmitter; +const shared = require('nodemailer/lib/shared'); +const util = require('util'); const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated @@ -21,25 +21,16 @@ class IMAPServer extends EventEmitter { constructor(options) { super(); - this.options = options ? clone(options) : {}; + this.options = options || {}; // apply TLS defaults if needed if (this.options.secure) { this.options = tlsOptions(this.options); } - // setup logger - if ('logger' in this.options) { - // use provided logger or use vanity logger if option is set to false - this.logger = this.options.logger || { - info: () => false, - debug: () => false, - error: () => false - }; - } else { - // create default console logger - this.logger = this._createDefaultLogger(); - } + this.logger = shared.getLogger(this.options, { + component: this.options.component || 'pop3-server' + }); /** * Timeout after close has been called until pending connections are forcibly closed @@ -86,13 +77,17 @@ class IMAPServer extends EventEmitter { // close active connections if (connections) { - this.logger.info('Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000); + this.logger.info({ + tnx: 'close' + }, 'Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000); } this._closeTimeout = setTimeout(() => { connections = this.connections.size; if (connections) { - this.logger.info('Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : ''); + this.logger.info({ + tnx: 'close' + }, 'Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : ''); this.connections.forEach(connection => { connection.send('* BYE System shutdown'); @@ -153,6 +148,14 @@ class IMAPServer extends EventEmitter { _onListening() { let address = this.server.address(); this.logger.info( + // + { + tnx: 'listen', + host: address.address, + port: address.port, + secure: !!this.options.secure, + protocol: 'IMAP' + }, '%sIMAP Server listening on %s:%s', this.options.secure ? 'Secure ' : '', address.family === 'IPv4' ? address.address : '[' + address.address + ']', @@ -165,7 +168,9 @@ class IMAPServer extends EventEmitter { * @event */ _onClose() { - this.logger.info('IMAP Server closed'); + this.logger.info({ + tnx: 'closed' + }, 'IMAP Server closed'); this.emit('close'); } diff --git a/imap.js b/imap.js index 0eacff41..ccbe520a 100644 --- a/imap.js +++ b/imap.js @@ -33,9 +33,18 @@ const serverOptions = { }, logger: { - info: log.silly.bind(log, 'IMAP'), - debug: log.silly.bind(log, 'IMAP'), - error: log.error.bind(log, 'IMAP') + info(...args) { + args.shift(); + log.info('IMAP', ...args); + }, + debug(...args) { + args.shift(); + log.silly('IMAP', ...args); + }, + error(...args) { + args.shift(); + log.error('IMAP', ...args); + } }, maxMessage: config.imap.maxMB * 1024 * 1024, @@ -1696,7 +1705,9 @@ module.exports = done => { started = true; return done(err); } - server.logger.error(err); + server.logger.error({ + err + }, err); }); // start listening @@ -1712,7 +1723,9 @@ module.exports = done => { let indexpos = 0; let ensureIndexes = () => { if (indexpos >= setupIndexes.length) { - server.logger.info('Setup indexes for %s collections', setupIndexes.length); + server.logger.info({ + tnx: 'mongo' + }, 'Setup indexes for %s collections', setupIndexes.length); return start(); } let index = setupIndexes[indexpos++]; diff --git a/lib/pop3-connection.js b/lib/pop3-connection.js new file mode 100644 index 00000000..fecb4b92 --- /dev/null +++ b/lib/pop3-connection.js @@ -0,0 +1,158 @@ +'use strict'; + +const crypto = require('crypto'); +const EventEmitter = require('events'); + +const SOCKET_TIMEOUT = 60 * 1000; + +class POP3Connection extends EventEmitter { + constructor(server, socket) { + super(); + this._server = server; + this._socket = socket; + + this._closed = false; + this._closing = false; + + this.remoteAddress = this._socket.remoteAddress; + this._id = crypto.randomBytes(9).toString('base64'); + + this.processing = false; + this.queue = []; + this._remainder = ''; + } + + init() { + this._setListeners(); + this._resetSession(); + this._server.logger.info({ + tnx: 'connection', + cid: this._id, + host: this.remoteAddress + }, 'Connection from %s', this.remoteAddress); + this._socket.write('+OK WDPop ready for requests from ' + this.remoteAddress + '\r\n'); + } + + _setListeners() { + this._socket.on('close', () => this._onClose()); + this._socket.on('error', err => this._onError(err)); + this._socket.setTimeout(this._server.options.socketTimeout || SOCKET_TIMEOUT, () => this._onTimeout()); + this._socket.on('readable', () => { + if (this.processing) { + return; + } + this.processing = true; + + this.read(); + }); + } + + /** + * Fired when the socket is closed + * @event + */ + _onClose( /* hadError */ ) { + if (this._closed) { + return; + } + + this.queue = []; + this.processing = false; + this._remainder = ''; + + this._closed = true; + this._closing = false; + + this._server.logger.info({ + tnx: 'close', + cid: this._id, + host: this.remoteAddress, + user: this.user + }, 'Connection closed to %s', this.remoteAddress); + + this.emit('close'); + } + + /** + * Fired when an error occurs with the socket + * + * @event + * @param {Error} err Error object + */ + _onError(err) { + if (err.code === 'ECONNRESET' || err.code === 'EPIPE') { + return this.close(); // mark connection as 'closing' + } + + this._server.logger.error({ + err, + tnx: 'error', + user: this.user + }, '%s', err.message); + this.emit('error', err); + } + + /** + * Fired when socket timeouts. Closes connection + * + * @event + */ + _onTimeout() { + this.close(); + } + + _resetSession() { + this.session = {}; + } + + close() { + if (!this._socket.destroyed && this._socket.writable) { + this._socket.end(); + } + this._closing = true; + } + + read() { + let chunk; + let data = this._remainder; + while ((chunk = this._socket.read()) !== null) { + data += chunk.toString('binary'); + if (data.indexOf('\n') >= 0) { + let lines = data.split(/\r?\n/).map(line => Buffer.from(line, 'binary').toString()); + this._remainder = lines.pop(); + + if (lines.length) { + if (this.queue.length) { + this.queue = this.queue.concat(lines); + } else { + this.queue = lines; + } + } + + return this.processQueue(); + } + } + + this.processing = false; + } + + processQueue() { + if (!this.queue.length) { + this.read(); // see if there's anything left to read + return; + } + let line = this.queue.shift().trim(); + let parts = line.split(' '); + let command = parts.shift().toUpperCase(); + let args = parts.join(' '); + + console.log({ + command, + args + }); + + setImmediate(() => this.processQueue()); + } +} + +module.exports = POP3Connection; diff --git a/lib/pop3-server.js b/lib/pop3-server.js new file mode 100644 index 00000000..da68d780 --- /dev/null +++ b/lib/pop3-server.js @@ -0,0 +1,147 @@ +'use strict'; + +const EventEmitter = require('events'); +const net = require('net'); +const tls = require('tls'); +const tlsOptions = require('../imap-core/lib/tls-options'); +const shared = require('nodemailer/lib/shared'); +const POP3Connection = require('./pop3-connection'); + +const CLOSE_TIMEOUT = 1 * 1000; // how much to wait until pending connections are terminated + +class POP3Server extends EventEmitter { + constructor(options) { + super(); + + this.options = options || {}; + + /** + * Timeout after close has been called until pending connections are forcibly closed + */ + this._closeTimeout = false; + + /** + * A set of all currently open connections + */ + this.connections = new Set(); + + // apply TLS defaults if needed + if (this.options.secure) { + this.options = tlsOptions(this.options); + } + + this.logger = shared.getLogger(this.options, { + component: this.options.component || 'pop3-server' + }); + + this.server = (this.options.secure ? tls : net) + .createServer(this.options, socket => this._onConnect(socket)); + + // ensure _sharedCreds, fixes an issue in node v4+ where STARTTLS fails because _sharedCreds does not exist + this.server._sharedCreds = this.server._sharedCreds || this.secureContext.get('default'); + + this._setListeners(); + } + + _setListeners() { + this.server.on('listening', () => this._onListening()); + this.server.on('close', () => this._onClose()); + this.server.on('error', err => this._onError(err)); + } + + /** + * Called when server started listening + * + * @event + */ + _onListening() { + let address = this.server.address(); + this.logger.info( + // + { + tnx: 'listen', + host: address.address, + port: address.port, + secure: !!this.options.secure, + protocol: 'POP3' + }, + '%s%s Server listening on %s:%s', + this.options.secure ? 'Secure ' : '', + 'POP3', + address.family === 'IPv4' ? address.address : '[' + address.address + ']', + address.port); + } + + /** + * Called when server is closed + * + * @event + */ + _onClose() { + this.logger.info({ + tnx: 'closed' + }, 'POP3 Server closed'); + this.emit('close'); + } + + /** + * Called when an error occurs with the server + * + * @event + */ + _onError(err) { + this.emit('error', err); + } + + _onConnect(socket) { + let connection = new POP3Connection(this, socket); + this.connections.add(connection); + connection.once('error', err => { + this.connections.delete(connection); + this._onError(err); + }); + connection.once('close', () => { + this.connections.delete(connection); + }); + connection.init(); + } + + close(callback) { + let connections = this.connections.size; + let timeout = this.options.closeTimeout || CLOSE_TIMEOUT; + + // stop accepting new connections + this.server.close(() => { + clearTimeout(this._closeTimeout); + if (typeof callback === 'function') { + return callback(); + } + }); + + // close active connections + if (connections) { + this.logger.info({ + tnx: 'close' + }, 'Server closing with %s pending connection%s, waiting %s seconds before terminating', connections, connections !== 1 ? 's' : '', timeout / 1000); + } + + this._closeTimeout = setTimeout(() => { + connections = this.connections.size; + if (connections) { + this.logger.info({ + tnx: 'close' + }, 'Closing %s pending connection%s to close the server', connections, connections !== 1 ? 's' : ''); + + this.connections.forEach(connection => { + connection.close(); + }); + } + }, timeout); + } + + listen(...args) { + this.server.listen(...args); + } +} + +module.exports = POP3Server; diff --git a/package.json b/package.json index 990dd53b..cba26e6c 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,6 @@ }, "dependencies": { "bcryptjs": "^2.4.3", - "clone": "^2.1.1", "config": "^1.25.1", "grid-fs": "^1.0.1", "html-to-text": "^3.2.0", diff --git a/pop3.js b/pop3.js new file mode 100644 index 00000000..15b8d963 --- /dev/null +++ b/pop3.js @@ -0,0 +1,63 @@ +'use strict'; + +const config = require('config'); +const log = require('npmlog'); +const POP3Server = require('./lib/pop3-server'); +const fs = require('fs'); + +const serverOptions = { + port: config.pop3.port, + host: config.pop3.host, + secure: config.pop3.secure, + + // log to console + logger: { + info(...args) { + args.shift(); + log.info('POP3', ...args); + }, + debug(...args) { + args.shift(); + log.silly('POP3', ...args); + }, + error(...args) { + args.shift(); + log.error('POP3', ...args); + } + } + +}; + +if (config.pop3.key) { + serverOptions.key = fs.readFileSync(config.pop3.key); +} + +if (config.pop3.cert) { + serverOptions.cert = fs.readFileSync(config.pop3.cert); +} + +const server = new POP3Server(serverOptions); + +module.exports = done => { + if (!config.pop3.enabled) { + return setImmediate(() => done(null, false)); + } + + let started = false; + + server.on('error', err => { + if (!started) { + started = true; + return done(err); + } + log.error('POP3', err); + }); + + server.listen(config.pop3.port, config.pop3.host, () => { + if (started) { + return server.close(); + } + started = true; + done(null, server); + }); +}; diff --git a/worker.js b/worker.js index 7167f40c..12d64dbb 100644 --- a/worker.js +++ b/worker.js @@ -3,6 +3,7 @@ let config = require('config'); let log = require('npmlog'); let imap = require('./imap'); +let pop3 = require('./pop3'); let smtp = require('./smtp'); let api = require('./api'); let db = require('./lib/db'); @@ -19,41 +20,48 @@ db.connect(err => { log.error('App', 'Failed to start IMAP server'); return process.exit(1); } - // Start SMTP maildrop server - smtp(err => { + // Start POP3 server + pop3(err => { if (err) { - log.error('App', 'Failed to start SMTP server'); + log.error('App', 'Failed to start POP3 server'); return process.exit(1); } - - // Start HTTP API server - api(err => { + // Start SMTP maildrop server + smtp(err => { if (err) { - log.error('App', 'Failed to start API server'); + log.error('App', 'Failed to start SMTP server'); return process.exit(1); } - log.info('App', 'All servers started, ready to process some mail'); + // Start HTTP API server + api(err => { + if (err) { + log.error('App', 'Failed to start API server'); + return process.exit(1); + } - // downgrade user and group if needed - if (config.group) { - try { - process.setgid(config.group); - log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid()); - } catch (E) { - log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message); - return process.exit(1); + log.info('App', 'All servers started, ready to process some mail'); + + // downgrade user and group if needed + if (config.group) { + try { + process.setgid(config.group); + log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid()); + } catch (E) { + log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message); + return process.exit(1); + } } - } - if (config.user) { - try { - process.setuid(config.user); - log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid()); - } catch (E) { - log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message); - return process.exit(1); + if (config.user) { + try { + process.setuid(config.user); + log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid()); + } catch (E) { + log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message); + return process.exit(1); + } } - } + }); }); }); });