diff --git a/acme.js b/acme.js index 0943ade..918f574 100644 --- a/acme.js +++ b/acme.js @@ -21,7 +21,7 @@ const serverOptions = { const server = restify.createServer(serverOptions); // res.pipe does not work if Gzip is enabled -server.use(restify.plugins.gzipResponse()); +//server.use(restify.plugins.gzipResponse()); server.use( restify.plugins.queryParser({ diff --git a/config/default.toml b/config/default.toml index 342a11e..00d791e 100644 --- a/config/default.toml +++ b/config/default.toml @@ -109,3 +109,7 @@ url = "http://127.0.0.1:9200" user = "elastic" pass = "supersecret" index = "wildduck" + +[elasticsearch.indexer] +# idexing worker +enabled = false diff --git a/indexer.js b/indexer.js new file mode 100644 index 0000000..f0d49cc --- /dev/null +++ b/indexer.js @@ -0,0 +1,73 @@ +'use strict'; + +const log = require('npmlog'); +const config = require('wild-config'); +const Gelf = require('gelf'); +const os = require('os'); +const Queue = require('bull'); + +let loggelf; + +module.exports.start = callback => { + if (!config.elasticsearch || !config.elasticsearch.indexer || !config.elasticsearch.indexer.enabled) { + return setImmediate(() => callback(null, false)); + } + + const component = config.log.gelf.component || 'wildduck'; + const hostname = config.log.gelf.hostname || os.hostname(); + const gelf = + config.log.gelf && config.log.gelf.enabled + ? new Gelf(config.log.gelf.options) + : { + // placeholder + emit: (key, message) => log.info('Gelf', JSON.stringify(message)) + }; + + loggelf = message => { + if (typeof message === 'string') { + message = { + short_message: message + }; + } + + message = message || {}; + + if (!message.short_message || message.short_message.indexOf(component.toUpperCase()) !== 0) { + message.short_message = component.toUpperCase() + ' ' + (message.short_message || ''); + } + + message.facility = component; // facility is deprecated but set by the driver if not provided + message.host = hostname; + message.timestamp = Date.now() / 1000; + message._component = component; + Object.keys(message).forEach(key => { + if (!message[key]) { + delete message[key]; + } + }); + try { + gelf.emit('gelf.log', message); + } catch (err) { + log.error('Gelf', err); + } + }; + + const indexingQueue = new Queue('indexing', typeof config.dbs.redis === 'object' ? { redis: config.dbs.redis } : config.dbs.redis); + + indexingQueue.process(async job => { + try { + if (!job || !job.data || !job.data.ev) { + return false; + } + const data = job.data; + console.log('DATA FOR INDEXING', data); + + loggelf({ _msg: 'hellow world' }); + } catch (err) { + log.error('Indexing', err); + throw err; + } + }); + + callback(); +}; diff --git a/lib/acme/certs.js b/lib/acme/certs.js index 357b095..bad6505 100644 --- a/lib/acme/certs.js +++ b/lib/acme/certs.js @@ -268,10 +268,26 @@ const acquireCert = async (domain, acmeOptions, certificateData, certHandler) => let updated = await certHandler.update({ _id: certificateData._id }, updates, { certUpdated: true }); if (!updated) { log.error('ACME', 'Failed to generate certificate for %s. Update failed', domain); + certHandler.loggelf({ + short_message: `SNI cert provisioning failed for ${domain}`, + _sni_servername: domain, + _cert_id: certificateData._id.toString(), + _cert_action: 'provision', + _cert_error: 'No response', + _cert_error_code: 'NoResponse' + }); return cert; } log.info('ACME', 'Certificate successfully generated for %s (expires %s)', domain, parsed.validTo); + certHandler.loggelf({ + short_message: `SNI cert provisioned for ${domain}`, + _sni_servername: domain, + _cert_id: certificateData._id.toString(), + _cert_action: 'provision', + _cert_result: 'success', + _cert_expires: updates.expires.toISOString() + }); return await certHandler.getRecord({ _id: certificateData._id }, true); } catch (err) { try { @@ -299,11 +315,20 @@ const acquireCert = async (domain, acmeOptions, certificateData, certHandler) => } catch (err) { log.error('ACME', 'Failed to update certificate record domain=%s error=%s', domain, err.message); } - } - if (certificateData && certificateData.cert) { - // use existing certificate data if exists - return certificateData; + certHandler.loggelf({ + short_message: `SNI cert provisioning failed for ${domain}`, + _sni_servername: domain, + _cert_id: certificateData._id.toString(), + _cert_action: 'provision', + _cert_error: err.message, + _cert_error_code: err.code + }); + + if (certificateData && certificateData.cert) { + // use existing certificate data if exists + return certificateData; + } } throw err; diff --git a/lib/elasticsearch.js b/lib/elasticsearch.js index 9d83619..25dc226 100644 --- a/lib/elasticsearch.js +++ b/lib/elasticsearch.js @@ -44,6 +44,8 @@ const init = async () => { } else if (indexInfo && indexInfo.updated && indexInfo.changes) { log.info('ElasticSearch', 'Index "%s" updated (%s)', config.elasticsearch.index, JSON.stringify(indexInfo.changes)); } + + return true; }; module.exports = { init }; diff --git a/server.js b/server.js index 8cc84b8..ab85b7a 100644 --- a/server.js +++ b/server.js @@ -67,8 +67,10 @@ if (!processCount || processCount <= 1) { // single process mode, do not fork anything initElasticSearch() - .then(() => { - log.info('App', `ElasticSearch setup checked`); + .then(started => { + if (started) { + log.verbose('App', `ElasticSearch setup checked`); + } }) .catch(err => { log.error('App', `ElasticSearch setup failed: ${err.message}${err.meta?.statusCode ? ` (${err.meta?.statusCode})` : ''}`); @@ -98,8 +100,10 @@ if (!processCount || processCount <= 1) { // Fork workers. initElasticSearch() - .then(() => { - log.info('App', `ElasticSearch setup checked`); + .then(started => { + if (started) { + log.verbose('App', `ElasticSearch setup checked`); + } }) .catch(err => { log.error('App', `ElasticSearch setup failed: ${err.message}${err.meta?.statusCode ? ` (${err.meta?.statusCode})` : ''}`); diff --git a/worker.js b/worker.js index 43703b2..9f18c64 100644 --- a/worker.js +++ b/worker.js @@ -9,6 +9,7 @@ const api = require('./api'); const acme = require('./acme'); const tasks = require('./tasks'); const webhooks = require('./webhooks'); +const indexer = require('./indexer'); const plugins = require('./lib/plugins'); const db = require('./lib/db'); const errors = require('./lib/errors'); @@ -38,71 +39,79 @@ db.connect(err => { return setTimeout(() => process.exit(1), 3000); } - // Start IMAP server - imap(err => { + indexer.start(err => { if (err) { - log.error('App', 'Failed to start IMAP server. %s', err.message); + log.error('App', 'Failed to start indexer process. %s', err.message); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - // Start POP3 server - pop3(err => { + + // Start IMAP server + imap(err => { if (err) { - log.error('App', 'Failed to start POP3 server'); + log.error('App', 'Failed to start IMAP server. %s', err.message); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - // Start LMTP maildrop server - lmtp(err => { + // Start POP3 server + pop3(err => { if (err) { - log.error('App', 'Failed to start LMTP server'); + log.error('App', 'Failed to start POP3 server'); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - - // Start HTTP API server - api(err => { + // Start LMTP maildrop server + lmtp(err => { if (err) { - log.error('App', 'Failed to start API server'); + log.error('App', 'Failed to start LMTP server'); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - // Start HTTP ACME server - acme(err => { + // Start HTTP API server + api(err => { if (err) { - log.error('App', 'Failed to start ACME server'); + log.error('App', 'Failed to start API server'); errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - // downgrade user and group if needed - if (config.group) { - try { - process.setgid(config.group); - log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid()); - } catch (E) { - log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message); - errors.notify(E); + // Start HTTP ACME server + acme(err => { + if (err) { + log.error('App', 'Failed to start ACME server'); + errors.notify(err); return setTimeout(() => process.exit(1), 3000); } - } - if (config.user) { - try { - process.setuid(config.user); - log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid()); - } catch (E) { - log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message); - errors.notify(E); - return setTimeout(() => process.exit(1), 3000); - } - } - plugins.init('receiver'); - plugins.handler.load(() => { - log.verbose('Plugins', 'Plugins loaded'); - plugins.handler.runHooks('init', [], () => { - log.info('App', 'All servers started, ready to process some mail'); + // downgrade user and group if needed + if (config.group) { + try { + process.setgid(config.group); + log.info('App', 'Changed group to "%s" (%s)', config.group, process.getgid()); + } catch (E) { + log.error('App', 'Failed to change group to "%s" (%s)', config.group, E.message); + errors.notify(E); + return setTimeout(() => process.exit(1), 3000); + } + } + if (config.user) { + try { + process.setuid(config.user); + log.info('App', 'Changed user to "%s" (%s)', config.user, process.getuid()); + } catch (E) { + log.error('App', 'Failed to change user to "%s" (%s)', config.user, E.message); + errors.notify(E); + return setTimeout(() => process.exit(1), 3000); + } + } + + plugins.init('receiver'); + plugins.handler.load(() => { + log.verbose('Plugins', 'Plugins loaded'); + plugins.handler.runHooks('init', [], () => { + log.info('App', 'All servers started, ready to process some mail'); + }); }); }); });