Adds bunyan for json logging on every package!

- Bunyan logs json output, and added a stream to send our logs to
cloudwatch
- Replaces /all/ instances of console.log. Turned eslint rule back on,
so we don't use console.log ever again.
- Added npm scripts to view pretty logs
This commit is contained in:
Juan Tejada 2016-07-08 17:13:30 -07:00
parent 14b5bef0a7
commit dce872fac8
28 changed files with 216 additions and 111 deletions

View file

@ -17,7 +17,6 @@
"object-curly-spacing": "off",
"max-len": "off",
"new-cap": ["error", {"capIsNew": false}],
"no-console": "off",
"no-constant-condition": "off",
"no-loop-func": "off",
"no-shadow": "error",

View file

@ -28,7 +28,9 @@
# Developing Locally:
```
npm start
npm run start
npm run logs
npm run stop
```
We use [pm2](http://pm2.keymetrics.io/) to launch a variety of processes

View file

@ -5,6 +5,8 @@
"main": "",
"dependencies": {
"bluebird": "3.x.x",
"bunyan": "^1.8.1",
"bunyan-cloudwatch": "^2.0.0",
"lerna": "2.0.0-beta.23",
"mysql": "^2.11.1",
"newrelic": "^1.28.1",
@ -25,8 +27,10 @@
"sqlite3": "https://github.com/bengotow/node-sqlite3/archive/bengotow/usleep-v3.1.4.tar.gz"
},
"scripts": {
"start": "./node_modules/pm2/bin/pm2 start ./pm2-dev.yml --no-daemon",
"postinstall": "node_modules/.bin/lerna bootstrap"
"start": "pm2 start ./pm2-dev.yml",
"logs": "pm2 logs --raw | bunyan -o short",
"stop": "pm2 delete all",
"postinstall": "lerna bootstrap"
},
"repository": {
"type": "git",

View file

@ -7,8 +7,10 @@ const Vision = require('vision');
const Package = require('./package');
const fs = require('fs');
const path = require('path');
const {DatabaseConnector, SchedulerUtils, Logger} = require(`nylas-core`);
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-api')
const server = new Hapi.Server({
connections: {
@ -33,8 +35,6 @@ const plugins = [Inert, Vision, HapiBasicAuth, HapiBoom, {
let sharedDb = null;
const validate = (request, username, password, callback) => {
const {DatabaseConnector, SchedulerUtils} = require(`nylas-core`);
let getSharedDb = null;
if (sharedDb) {
getSharedDb = Promise.resolve(sharedDb)
@ -88,6 +88,6 @@ server.register(plugins, (err) => {
server.start((startErr) => {
if (startErr) { throw startErr; }
console.log('API running at:', server.info.uri);
global.Logger.info({url: server.info.uri}, 'API running');
});
});

View file

@ -7,4 +7,7 @@ module.exports = (server) => {
const account = this.auth.credentials;
return DatabaseConnector.forAccount(account.id);
});
server.decorate('request', 'logger', (request) => {
return global.Logger.forAccount(request.auth.credentials)
}, {apply: true});
}

View file

@ -97,7 +97,7 @@ module.exports = (server) => {
const {settings, email, provider, name} = request.payload;
if (provider === 'imap') {
connectionChecks.push(IMAPConnection.connect(dbStub, settings))
connectionChecks.push(IMAPConnection.connect({db: dbStub, settings}))
}
Promise.all(connectionChecks).then(() => {
@ -188,7 +188,7 @@ module.exports = (server) => {
}
Promise.all([
IMAPConnection.connect({}, Object.assign({}, settings, credentials)),
IMAPConnection.connect({db: {}, settings: Object.assign({}, settings, credentials)}),
])
.then(() =>
buildAccountWith({

View file

@ -73,9 +73,7 @@ module.exports = (server) => {
},
handler: (request, reply) => {
request.getAccountDatabase().then(({File}) => {
const {headers: {accept}} = request
const {params: {id}} = request
const account = request.auth.credentials
File.findOne({where: {id}}).then((file) => {
if (!file) {
@ -83,9 +81,9 @@ module.exports = (server) => {
}
return reply(Serialization.jsonStringify(file));
})
.catch((error) => {
console.log('Error fetching file: ', error)
reply(error)
.catch((err) => {
request.logger.error(err, 'Error fetching file')
reply(err)
})
})
},
@ -107,7 +105,6 @@ module.exports = (server) => {
handler: (request, reply) => {
request.getAccountDatabase()
.then((db) => {
const {headers: {accept}} = request
const {params: {id}} = request
const account = request.auth.credentials
@ -116,12 +113,12 @@ module.exports = (server) => {
if (!file) {
return reply.notFound(`File ${id} not found`)
}
return file.fetch({account, db})
return file.fetch({account, db, logger: request.logger})
.then((stream) => reply(stream))
})
.catch((error) => {
console.log('Error fetching file: ', error)
reply(error)
.catch((err) => {
request.logger.error(err, 'Error downloading file')
reply(err)
})
})
},

View file

@ -123,15 +123,16 @@ module.exports = (server) => {
return reply.notFound(`Message ${id} not found`)
}
if (accept === 'message/rfc822') {
return message.fetchRaw({account, db}).then((rawMessage) =>
return message.fetchRaw({account, db, logger: request.logger})
.then((rawMessage) =>
reply(rawMessage)
)
}
return reply(Serialization.jsonStringify(message));
})
.catch((error) => {
console.log('Error fetching message: ', error)
reply(error)
.catch((err) => {
request.logger.error(err, 'Error fetching message')
reply(err)
})
})
},

View file

@ -6,8 +6,8 @@ module.exports = (server) => {
auth: false,
},
handler: (request, reply) => {
console.log("---> Ping!")
reply("pong")
request.logger.info('----> Pong!')
reply("Pong")
},
});
};

View file

@ -176,8 +176,9 @@ class IMAPConnection extends EventEmitter {
return new IMAPConnection(...args).connect()
}
constructor(db, settings) {
constructor({db, settings, logger = console} = {}) {
super();
this._logger = logger;
this._db = db;
this._queue = [];
this._currentOperation = null;
@ -231,13 +232,13 @@ class IMAPConnection extends EventEmitter {
this._imap = Promise.promisifyAll(new Imap(settings));
this._imap.once('end', () => {
console.log('Underlying IMAP Connection ended');
this._logger.info('Underlying IMAP Connection ended');
this._connectPromise = null;
this._imap = null;
});
this._imap.on('alert', (msg) => {
console.log(`IMAP SERVER SAYS: ${msg}`)
this._logger.info({imap_server_msg: msg}, `IMAP server message`)
})
// Emitted when new mail arrives in the currently open mailbox.
@ -346,14 +347,20 @@ class IMAPConnection extends EventEmitter {
result.then(() => {
this._currentOperation = null;
console.log(`Finished task: ${operation.description()}`)
this._logger.info({
operation_type: operation.constructor.name,
operation_description: operation.description(),
}, `Finished sync operation`)
resolve();
this.processNextOperation();
})
.catch((err) => {
this._currentOperation = null;
console.log(`Task errored: ${operation.description()}`)
console.error(err)
this._logger.error({
err,
operation_type: operation.constructor.name,
operation_description: operation.description(),
}, `Sync operation errored`)
reject(err);
})
}

View file

@ -11,4 +11,5 @@ module.exports = {
SyncPolicy: require('./sync-policy'),
SchedulerUtils: require('./scheduler-utils'),
MessageTypes: require('./message-types'),
Logger: require('./logger'),
}

View file

@ -0,0 +1,52 @@
const bunyan = require('bunyan')
const createCWStream = require('bunyan-cloudwatch')
const NODE_ENV = process.env.NODE_ENV || 'unknown'
function getLogStreams(name, env) {
const stdoutStream = {
stream: process.stdout,
level: 'info',
}
if (env === 'development') {
return [stdoutStream]
}
const cloudwatchStream = {
stream: createCWStream({
logGroup: `k2-${env}`,
logStream: `${name}-${env}`,
cloudWatchLogsOptions: {
region: 'us-east-1',
},
}),
type: 'raw',
}
return [stdoutStream, cloudwatchStream]
}
function createLogger(name, env = NODE_ENV) {
const childLogs = new Map()
const logger = bunyan.createLogger({
name,
serializers: bunyan.stdSerializers,
streams: getLogStreams(name, env),
})
return Object.assign(logger, {
forAccount(account = {}) {
if (!childLogs.has(account.id)) {
const childLog = logger.child({
account_id: account.id,
account_email: account.emailAddress,
})
childLogs.set(account.id, childLog)
}
return childLogs.get(account.id)
},
})
}
module.exports = {
createLogger,
}

View file

@ -15,11 +15,11 @@ module.exports = (sequelize, Sequelize) => {
},
},
instanceMethods: {
fetch: function fetch({account, db}) {
fetch: function fetch({account, db, logger}) {
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
return Promise.props({
message: this.getMessage(),
connection: IMAPConnection.connect(db, settings),
connection: IMAPConnection.connect({db, settings, logger}),
})
.then(({message, connection}) => {
return message.getFolder()

View file

@ -69,11 +69,11 @@ module.exports = (sequelize, Sequelize) => {
)
},
fetchRaw: function fetchRaw({account, db}) {
fetchRaw: function fetchRaw({account, db, logger}) {
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
return Promise.props({
folder: this.getFolder(),
connection: IMAPConnection.connect(db, settings),
connection: IMAPConnection.connect({db, settings, logger}),
})
.then(({folder, connection}) => {
return connection.openBox(folder.name)

View file

@ -4,6 +4,7 @@
"description": "Core shared packages",
"main": "index.js",
"dependencies": {
"bunyan": "^1.8.1",
"imap": "0.8.x",
"xoauth2": "1.x.x"
},

View file

@ -1,9 +1,11 @@
const Rx = require('rx')
const redis = require("redis");
const log = global.Logger || console
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);
class PubsubConnector {
constructor() {
this._broadcastClient = null;
@ -13,7 +15,7 @@ class PubsubConnector {
buildClient() {
const client = redis.createClient(process.env.REDIS_URL || null);
client.on("error", console.error);
client.on("error", log.error);
return client;
}

View file

@ -22,7 +22,9 @@ const forEachAccountList = (forEachCallback) => {
}
const assignPolicy = (accountId, policy) => {
console.log(`Changing policy for ${accountId} to ${JSON.stringify(policy)}`)
const log = global.Logger || console
log.info({policy, account_id: accountId}, `Changing single policy`)
const DatabaseConnector = require('./database-connector');
return DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: accountId}}).then((account) => {
@ -33,7 +35,9 @@ const assignPolicy = (accountId, policy) => {
}
const assignPolicyToAcounts = (accountIds, policy) => {
console.log(`Changing policy for ${accountIds} to ${JSON.stringify(policy)}`)
const log = global.Logger || console
log.info({policy, account_ids: accountIds}, `Changing multiple policies`)
const DatabaseConnector = require('./database-connector');
return DatabaseConnector.forShared().then(({Account}) => {
Account.findAll({where: {id: {$or: accountIds}}}).then((accounts) => {

View file

@ -1,11 +1,12 @@
const Hapi = require('hapi');
const HapiWebSocket = require('hapi-plugin-websocket');
const Inert = require('inert');
const {DatabaseConnector, PubsubConnector, SchedulerUtils} = require(`nylas-core`);
const {DatabaseConnector, PubsubConnector, SchedulerUtils, Logger} = require(`nylas-core`);
const fs = require('fs');
const path = require('path');
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-dashboard')
const server = new Hapi.Server();
server.connection({ port: process.env.PORT });
@ -98,6 +99,6 @@ server.register([HapiWebSocket, Inert], () => {
server.start((startErr) => {
if (startErr) { throw startErr; }
console.log('Dashboard running at:', server.info.uri);
global.Logger.info({uri: server.info.uri}, 'Dashboard running');
});
});

View file

@ -1,7 +1,8 @@
const {PubsubConnector, DatabaseConnector} = require(`nylas-core`)
const {PubsubConnector, DatabaseConnector, Logger} = require(`nylas-core`)
const {processors} = require('./processors')
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-message-processor')
// List of the attributes of Message that the processor should be allowed to change.
// The message may move between folders, get starred, etc. while it's being
@ -11,15 +12,13 @@ const MessageProcessorVersion = 1;
const redis = PubsubConnector.buildClient();
function runPipeline({db, accountId, message}) {
console.log(`Processing message ${message.id}`)
function runPipeline({db, accountId, message, logger}) {
logger.info(`MessageProcessor: Processing message`)
return processors.reduce((prevPromise, processor) => (
prevPromise.then((prevMessage) => {
const processed = processor({message: prevMessage, accountId, db});
if (!(processed instanceof Promise)) {
throw new Error(`processor ${processor} did not return a promise.`)
}
return processed.then((nextMessage) => {
const processed = processor({message: prevMessage, accountId, db, logger});
return Promise.resolve(processed)
.then((nextMessage) => {
if (!nextMessage.body) {
throw new Error("processor did not resolve with a valid message object.")
}
@ -46,26 +45,28 @@ function dequeueJob() {
try {
json = JSON.parse(item[1]);
} catch (error) {
console.error(`MessageProcessor Failed: Found invalid JSON item in queue: ${item}`)
global.Logger.error({item}, `MessageProcessor: Found invalid JSON item in queue`)
return dequeueJob();
}
const {messageId, accountId} = json;
const logger = global.Logger.forAccount({id: accountId}).child({message_id: messageId})
DatabaseConnector.forAccount(accountId).then((db) =>
db.Message.find({
DatabaseConnector.forAccount(accountId).then((db) => {
return db.Message.find({
where: {id: messageId},
include: [{model: db.Folder}, {model: db.Label}],
}).then((message) => {
if (!message) {
return Promise.reject(new Error(`Message not found (${messageId}). Maybe account was deleted?`))
}
return runPipeline({db, accountId, message}).then((processedMessage) =>
return runPipeline({db, accountId, message, logger}).then((processedMessage) =>
saveMessage(processedMessage)
).catch((err) =>
console.error(`MessageProcessor Failed: ${err} ${err.stack}`)
logger.error(err, `MessageProcessor: Failed`)
)
})
).finally(() => {
})
.finally(() => {
dequeueJob()
});

View file

@ -11,7 +11,7 @@ function Contact({name, address} = {}) {
const extractContacts = (values) =>
(values || []).map(v => Contact(mimelib.parseAddresses(v).pop()))
function processMessage({message}) {
function processMessage({message, logger}) {
if (message.snippet) {
// trim and clean snippet which is alreay present (from message plaintext)
message.snippet = message.snippet.replace(/[\n\r]/g, ' ').replace(/\s\s+/g, ' ')
@ -24,7 +24,7 @@ function processMessage({message}) {
// TODO: Fanciness
message.snippet = message.body.substr(0, Math.min(message.body.length, SNIPPET_SIZE));
} else {
console.log("Received message has no body or snippet.")
logger.info("MessageProcessor: Parsing - Received message has no body or snippet.")
}
message.to = extractContacts(message.headers.to);

View file

@ -9,11 +9,14 @@ class ThreadingProcessor {
// conversation. Put it back soonish.
// const messageEmails = _.uniq([].concat(message.to, message.cc, message.from).map(p => p.email));
// console.log(`Found ${threads.length} candidate threads for message with subject: ${message.subject}`)
// this.logger.info({
// num_candidate_threads: threads.length,
// message_subject: message.subject,
// }, `Found candidate threads for message`)
//
// for (const thread of threads) {
// const threadEmails = _.uniq([].concat(thread.participants).map(p => p.email));
// console.log(`Intersection: ${_.intersection(threadEmails, messageEmails).join(',')}`)
// this.logger.info(`Intersection: ${_.intersection(threadEmails, messageEmails).join(',')}`)
//
// if (_.intersection(threadEmails, messageEmails) >= threadEmails.length * 0.9) {
// return thread;
@ -66,7 +69,7 @@ class ThreadingProcessor {
})
}
processMessage({db, message}) {
processMessage({db, message, logger}) {
if (!(message.labels instanceof Array)) {
throw new Error("Threading processMessage expects labels to be an inflated array.");
}
@ -74,6 +77,8 @@ class ThreadingProcessor {
throw new Error("Threading processMessage expects folder value to be present.");
}
this.logger = logger
const {Folder, Label} = db;
let findOrCreateThread = null;
if (message.headers['x-gm-thrid']) {

View file

@ -43,7 +43,6 @@ it('adds the message to the thread', (done) => {
},
create: (message) => {
message.setThread = (thread) => {
console.log("setting")
message.thread = thread.id
}
return Promise.resolve(message)

View file

@ -1,16 +1,17 @@
global.Promise = require('bluebird');
const {DatabaseConnector} = require(`nylas-core`)
const {DatabaseConnector, Logger} = require(`nylas-core`)
const SyncProcessManager = require('./sync-process-manager');
global.Logger = Logger.createLogger('nylas-k2-sync')
const manager = new SyncProcessManager();
DatabaseConnector.forShared().then((db) => {
const {Account} = db;
Account.findAll().then((accounts) => {
if (accounts.length === 0) {
console.log(`Couldn't find any accounts to sync. Run this CURL command to auth one!`)
console.log(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.messagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`)
global.Logger.info(`Couldn't find any accounts to sync. Run this CURL command to auth one!`)
global.Logger.info(`curl -X POST -H "Content-Type: application/json" -d '{"email":"inboxapptest2@fastmail.fm", "name":"Ben Gotow", "provider":"imap", "settings":{"imap_username":"inboxapptest1@fastmail.fm","imap_host":"mail.messagingengine.com","imap_port":993,"smtp_host":"mail.messagingengine.com","smtp_port":0,"smtp_username":"inboxapptest1@fastmail.fm", "smtp_password":"trar2e","imap_password":"trar2e","ssl_required":true}}' "http://localhost:5100/auth?client_id=123"`)
}
manager.ensureAccountIDsInRedis(accounts.map(a => a.id)).then(() => {
manager.start();

View file

@ -3,8 +3,9 @@ const {Provider} = require('nylas-core');
const GMAIL_FOLDERS = ['[Gmail]/All Mail', '[Gmail]/Trash', '[Gmail]/Spam'];
class FetchFolderList {
constructor(provider) {
constructor(provider, logger = console) {
this._provider = provider;
this._logger = logger;
}
description() {

View file

@ -11,12 +11,13 @@ const FETCH_MESSAGES_FIRST_COUNT = 100;
const FETCH_MESSAGES_COUNT = 200;
class FetchMessagesInFolder {
constructor(category, options) {
constructor(category, options, logger = console) {
this._imap = null
this._box = null
this._db = null
this._category = category;
this._options = options;
this._logger = logger;
if (!this._category) {
throw new Error("FetchMessagesInFolder requires a category")
}
@ -86,9 +87,13 @@ class FetchMessagesInFolder {
}
})
console.log(` --- found ${flagChangeMessages.length || 'no'} flag changes`)
this._logger.info({
flag_changes: flagChangeMessages.length,
}, `FetchMessagesInFolder: found flag changes`)
if (createdUIDs.length > 0) {
console.log(` --- found ${createdUIDs.length} new messages. These will not be processed because we assume that they will be assigned uid = uidnext, and will be picked up in the next sync when we discover unseen messages.`)
this._logger.info({
new_messages: createdUIDs.length,
}, `FetchMessagesInFolder: found new messages. These will not be processed because we assume that they will be assigned uid = uidnext, and will be picked up in the next sync when we discover unseen messages.`)
}
if (flagChangeMessages.length === 0) {
@ -111,7 +116,9 @@ class FetchMessagesInFolder {
.filter(msg => !remoteUIDAttributes[msg.folderImapUID])
.map(msg => msg.folderImapUID)
console.log(` --- found ${removedUIDs.length} messages no longer in the folder`)
this._logger.info({
removed_messages: removedUIDs.length,
}, `FetchMessagesInFolder: found messages no longer in the folder`)
if (removedUIDs.length === 0) {
return Promise.resolve();
@ -148,7 +155,9 @@ class FetchMessagesInFolder {
}
if (desired.length === 0) {
console.warn(`Could not find good part. Options are: ${available.join(', ')}`)
this._logger.warn({
available_options: available.join(', '),
}, `FetchMessagesInFolder: Could not find good part`)
}
return desired;
@ -173,7 +182,10 @@ class FetchMessagesInFolder {
const uids = uidsByPart[key];
const desiredParts = JSON.parse(key);
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
console.log(`Fetching parts ${key} for ${uids.length} messages`)
this._logger.info({
key,
num_messages: uids.length,
}`FetchMessagesInFolder: Fetching parts for messages`)
// note: the order of UIDs in the array doesn't matter, Gmail always
// returns them in ascending (oldest => newest) order.
@ -258,11 +270,17 @@ class FetchMessagesInFolder {
)
.then((message) => {
if (created) {
console.log(`Created message ID: ${message.id}, UID: ${attributes.uid}`)
this._logger.info({
message_id: message.id,
uid: attributes.uid,
}, `FetchMessagesInFolder: Created message`)
this._createFilesFromStruct({message, struct: attributes.struct})
PubsubConnector.queueProcessMessage({accountId, messageId: message.id});
} else {
console.log(`Updated message ID: ${message.id}, UID: ${attributes.uid}`)
this._logger.info({
message_id: message.id,
uid: attributes.uid,
}, `FetchMessagesInFolder: Updated message`)
}
})
@ -291,7 +309,9 @@ class FetchMessagesInFolder {
const desiredRanges = [];
console.log(` - Fetching messages. Currently have range: ${savedSyncState.fetchedmin}:${savedSyncState.fetchedmax}`)
this._logger.info({
range: `${savedSyncState.fetchedmin}:${savedSyncState.fetchedmax}`,
}, `FetchMessagesInFolder: Fetching messages.`)
// Todo: In the future, this is where logic should go that limits
// sync based on number of messages / age of messages.
@ -303,18 +323,20 @@ class FetchMessagesInFolder {
if (savedSyncState.fetchedmax < boxUidnext) {
desiredRanges.push({min: savedSyncState.fetchedmax, max: boxUidnext})
} else {
console.log(" --- fetchedmax == uidnext, nothing more recent to fetch.")
this._logger.info('FetchMessagesInFolder: fetchedmax == uidnext, nothing more recent to fetch.')
}
if (savedSyncState.fetchedmin > 1) {
const lowerbound = Math.max(1, savedSyncState.fetchedmin - FETCH_MESSAGES_COUNT);
desiredRanges.push({min: lowerbound, max: savedSyncState.fetchedmin})
} else {
console.log(" --- fetchedmin == 1, nothing older to fetch.")
this._logger.info("FetchMessagesInFolder: fetchedmin == 1, nothing older to fetch.")
}
}
return Promise.each(desiredRanges, ({min, max}) => {
console.log(` --- fetching range: ${min}:${max}`);
this._logger.info({
range: `${min}:${max}`,
}, `FetchMessagesInFolder: Fetching range`);
return this._fetchMessagesAndQueueForProcessing(`${min}:${max}`).then(() => {
const {fetchedmin, fetchedmax} = this._category.syncState;
@ -326,7 +348,7 @@ class FetchMessagesInFolder {
});
})
}).then(() => {
console.log(` - Fetching messages finished`);
this._logger.info(`FetchMessagesInFolder: Fetching messages finished`);
});
}
@ -350,15 +372,15 @@ class FetchMessagesInFolder {
let shallowFetch = null;
if (this._imap.serverSupports(Capabilities.Condstore)) {
console.log(` - Shallow attribute scan (using CONDSTORE)`)
this._logger.info(`FetchMessagesInFolder: Shallow attribute scan (using CONDSTORE)`)
if (nextHighestmodseq === highestmodseq) {
console.log(" --- highestmodseq matches, nothing more to fetch")
this._logger.info('FetchMessagesInFolder: highestmodseq matches, nothing more to fetch')
return Promise.resolve();
}
shallowFetch = this._box.fetchUIDAttributes(`1:*`, {changedsince: highestmodseq});
} else {
const range = `${this._getLowerBoundUID(SHALLOW_SCAN_UID_COUNT)}:*`;
console.log(` - Shallow attribute scan (using range: ${range})`)
this._logger.info({range}, `FetchMessagesInFolder: Shallow attribute scan`)
shallowFetch = this._box.fetchUIDAttributes(range);
}
@ -372,7 +394,7 @@ class FetchMessagesInFolder {
this._updateMessageAttributes(remoteUIDAttributes, localMessageAttributes)
))
.then(() => {
console.log(` - finished fetching changes to messages`);
this._logger.info(`FetchMessagesInFolder: finished fetching changes to messages`);
return this.updateFolderSyncState({
highestmodseq: nextHighestmodseq,
timeShallowScan: Date.now(),
@ -386,7 +408,7 @@ class FetchMessagesInFolder {
const {fetchedmin, fetchedmax} = this._category.syncState;
const range = `${fetchedmin}:${fetchedmax}`;
console.log(` - Deep attribute scan: fetching attributes in range: ${range}`)
this._logger.info({range}, `FetchMessagesInFolder: Deep attribute scan: fetching attributes in range`)
return this._box.fetchUIDAttributes(range)
.then((remoteUIDAttributes) => {
@ -401,7 +423,7 @@ class FetchMessagesInFolder {
})
))
.then(() => {
console.log(` - Deep scan finished.`);
this._logger.info(`FetchMessagesInFolder: Deep scan finished.`);
return this.updateFolderSyncState({
highestmodseq: this._box.highestmodseq,
timeDeepScan: Date.now(),

View file

@ -43,7 +43,7 @@ class SyncProcessManager {
}
start() {
console.log(`ProcessManager: Starting with ID ${IDENTITY}`)
global.Logger.info(`ProcessManager: Starting with ID ${IDENTITY}`)
this.unassignAccountsAssignedTo(IDENTITY).then(() => {
this.unassignAccountsMissingHeartbeats();
@ -63,12 +63,12 @@ class SyncProcessManager {
client.setAsync(key, Date.now()).then(() =>
client.expireAsync(key, HEARTBEAT_EXPIRES)
).then(() =>
console.log("ProcessManager: 💘")
global.Logger.info("ProcessManager: 💘")
)
}
onSigInt() {
console.log(`ProcessManager: Exiting...`)
global.Logger.info(`ProcessManager: Exiting...`)
this._exiting = true;
this.unassignAccountsAssignedTo(IDENTITY).then(() =>
@ -85,7 +85,7 @@ class SyncProcessManager {
let unseenIds = [].concat(accountIds);
console.log("ProcessManager: Starting scan for accountIds in database that are not present in Redis.")
global.Logger.info("ProcessManager: Starting scan for accountIds in database that are not present in Redis.")
return forEachAccountList((foundProcessIdentity, foundIds) => {
unseenIds = unseenIds.filter((a) => !foundIds.includes(`${a}`))
@ -94,7 +94,7 @@ class SyncProcessManager {
if (unseenIds.length === 0) {
return;
}
console.log(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`)
global.Logger.info(`ProcessManager: Adding account IDs ${unseenIds.join(',')} to ${ACCOUNTS_UNCLAIMED}.`)
unseenIds.map((id) => client.lpushAsync(ACCOUNTS_UNCLAIMED, id));
});
}
@ -102,7 +102,7 @@ class SyncProcessManager {
unassignAccountsMissingHeartbeats() {
const client = PubsubConnector.broadcastClient();
console.log("ProcessManager: Starting unassignment for processes missing heartbeats.")
global.Logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.")
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
@ -125,12 +125,12 @@ class SyncProcessManager {
)
return unassignOne(0).then((returned) => {
console.log(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`)
global.Logger.info(`ProcessManager: Returned ${returned} accounts assigned to ${identity}.`)
});
}
update() {
console.log(`ProcessManager: Searching for an unclaimed account to sync.`)
global.Logger.info(`ProcessManager: Searching for an unclaimed account to sync.`)
this.acceptUnclaimedAccount().finally(() => {
if (this._exiting) {
@ -170,7 +170,7 @@ class SyncProcessManager {
if (this._exiting || this._workers[account.id]) {
return;
}
console.log(`ProcessManager: Starting worker for Account ${accountId}`)
global.Logger.info(`ProcessManager: Starting worker for Account ${accountId}`)
this._workers[account.id] = new SyncWorker(account, db, () => {
this.removeWorkerForAccountId(accountId)
});

View file

@ -11,8 +11,8 @@ const {
const {CLAIM_DURATION} = SchedulerUtils;
const FetchFolderList = require('./imap/fetch-category-list')
const FetchMessagesInFolder = require('./imap/fetch-messages-in-category')
const FetchFolderList = require('./imap/fetch-folder-list')
const FetchMessagesInFolder = require('./imap/fetch-messages-in-folder')
const SyncbackTaskFactory = require('./syncback-task-factory')
@ -24,6 +24,7 @@ class SyncWorker {
this._startTime = Date.now();
this._lastSyncTime = null;
this._onExpired = onExpired;
this._logger = global.Logger.forAccount(account)
this._syncTimer = null;
this._expirationTimer = null;
@ -100,7 +101,7 @@ class SyncWorker {
return Promise.reject(new Error("ensureConnection: There are no IMAP connection credentials for this account."))
}
const conn = new IMAPConnection(this._db, Object.assign({}, settings, credentials));
const conn = new IMAPConnection({db: this._db, settings: Object.assign({}, settings, credentials), logger: this._logger});
conn.on('mail', () => {
this._onConnectionIdleUpdate();
})
@ -145,7 +146,7 @@ class SyncWorker {
)
return Promise.all(categoriesToSync.map((cat) =>
this._conn.runOperation(new FetchMessagesInFolder(cat, folderSyncOptions))
this._conn.runOperation(new FetchMessagesInFolder(cat, folderSyncOptions, this._logger))
))
});
}
@ -155,10 +156,10 @@ class SyncWorker {
this._syncTimer = null;
if (!process.env.SYNC_AFTER_ERRORS && this._account.errored()) {
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) is in error state - Skipping sync`)
this._logger.info(`SyncWorker: Account is in error state - Skipping sync`)
return
}
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) sync started (${reason})`)
this._logger.info({reason}, `SyncWorker: Account sync started`)
this.ensureConnection()
.then(() => this._account.update({syncError: null}))
@ -174,7 +175,7 @@ class SyncWorker {
}
onSyncError(error) {
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} (${this._account.id})`, error)
this._logger.error(error, `SyncWorker: Error while syncing account`)
this.closeConnection()
if (error.source.includes('socket') || error.source.includes('timeout')) {
@ -203,16 +204,16 @@ class SyncWorker {
this._account.lastSyncCompletions = lastSyncCompletions
this._account.save()
console.log('Syncworker: Completed sync cycle')
this._logger.info('Syncworker: Completed sync cycle')
if (afterSync === 'idle') {
return this._getIdleFolder()
.then((idleFolder) => this._conn.openBox(idleFolder.name))
.then(() => console.log('SyncWorker: - Idling on inbox category'))
.then(() => this._logger.info('SyncWorker: - Idling on inbox category'))
}
if (afterSync === 'close') {
console.log('SyncWorker: - Closing connection');
this._logger.info('SyncWorker: - Closing connection');
this.closeConnection()
return Promise.resolve()
}
@ -226,7 +227,7 @@ class SyncWorker {
scheduleNextSync() {
if (Date.now() - this._startTime > CLAIM_DURATION) {
console.log("SyncWorker: - Has held account for more than CLAIM_DURATION, returning to pool.");
this._logger.info("SyncWorker: - Has held account for more than CLAIM_DURATION, returning to pool.");
this.cleanup();
this._onExpired();
return;
@ -238,7 +239,10 @@ class SyncWorker {
if (interval) {
const target = this._lastSyncTime + interval;
console.log(`SyncWorker: Account ${active ? 'active' : 'inactive'}. Next sync scheduled for ${new Date(target).toLocaleString()}`);
this._logger.info({
is_active: active,
next_sync: new Date(target).toLocaleString(),
}, `SyncWorker: Next sync scheduled`);
this._syncTimer = setTimeout(() => {
this.syncNow({reason: 'Scheduled'});
}, target - Date.now());

View file

@ -12,25 +12,23 @@ apps:
GMAIL_CLIENT_ID : "271342407743-nibas08fua1itr1utq9qjladbkv3esdm.apps.googleusercontent.com"
GMAIL_CLIENT_SECRET : "WhmxErj-ei6vJXLocNhBbfBF"
GMAIL_REDIRECT_URL : "http://localhost:5100/auth/gmail/oauthcallback"
NODE_ENV: 'development'
- script : packages/nylas-sync/app.js
name : sync
env :
DB_ENCRYPTION_ALGORITHM : "aes-256-ctr"
DB_ENCRYPTION_PASSWORD : "d6F3Efeq"
NODE_ENV: 'development'
- script : packages/nylas-dashboard/app.js
name : dashboard
env :
PORT: 5101
DB_ENCRYPTION_ALGORITHM : "aes-256-ctr"
DB_ENCRYPTION_PASSWORD : "d6F3Efeq"
NODE_ENV: 'development'
- script : packages/nylas-message-processor/app.js
name : processor
env :
DB_ENCRYPTION_ALGORITHM : "aes-256-ctr"
DB_ENCRYPTION_PASSWORD : "d6F3Efeq"
NODE_ENV: 'development'