Remove bluebird

- Implement `each` and `promisifyAll` + other bluebird fns
This commit is contained in:
Juan Tejada 2016-07-13 16:23:25 -07:00
parent 93942e792d
commit ab9c01a249
12 changed files with 76 additions and 25 deletions

View file

@ -9,16 +9,17 @@
"bunyan-cloudwatch": "2.0.0", "bunyan-cloudwatch": "2.0.0",
"bunyan-loggly": "^1.0.0", "bunyan-loggly": "^1.0.0",
"bunyan-prettystream": "^0.1.3", "bunyan-prettystream": "^0.1.3",
"imap": "0.8.x",
"lerna": "2.0.0-beta.23", "lerna": "2.0.0-beta.23",
"mysql": "^2.11.1", "mysql": "^2.11.1",
"newrelic": "^1.28.1", "newrelic": "^1.28.1",
"pm2": "^1.1.3", "pm2": "^1.1.3",
"promise.prototype.finally": "^1.0.1",
"redis": "2.x.x", "redis": "2.x.x",
"rx": "4.x.x", "rx": "4.x.x",
"sequelize": "3.x.x", "sequelize": "3.x.x",
"underscore": "1.x.x", "underscore": "1.x.x",
"utf7": "https://github.com/truebit/utf7/archive/1f753bac59b99d93b17a5ef11681e232465e2558.tar.gz", "utf7": "https://github.com/truebit/utf7/archive/1f753bac59b99d93b17a5ef11681e232465e2558.tar.gz"
"imap": "0.8.x"
}, },
"devDependencies": { "devDependencies": {
"babel-eslint": "6.x", "babel-eslint": "6.x",

View file

@ -11,7 +11,6 @@ const fs = require('fs');
const path = require('path'); const path = require('path');
const {DatabaseConnector, SchedulerUtils, Logger} = require(`nylas-core`); const {DatabaseConnector, SchedulerUtils, Logger} = require(`nylas-core`);
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-api') global.Logger = Logger.createLogger('nylas-k2-api')
const onUnhandledError = (err) => global.Logger.fatal(err, 'Unhandled error') const onUnhandledError = (err) => global.Logger.fatal(err, 'Unhandled error')

View file

@ -3,6 +3,7 @@ const _ = require('underscore');
const xoauth2 = require('xoauth2'); const xoauth2 = require('xoauth2');
const EventEmitter = require('events'); const EventEmitter = require('events');
const PromiseUtils = require('./promise-utils')
const IMAPBox = require('./imap-box'); const IMAPBox = require('./imap-box');
const { const {
convertImapError, convertImapError,
@ -87,7 +88,7 @@ class IMAPConnection extends EventEmitter {
_buildUnderlyingConnection(settings) { _buildUnderlyingConnection(settings) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._imap = Promise.promisifyAll(new Imap(settings)); this._imap = PromiseUtils.promisifyAll(new Imap(settings));
// Emitted when new mail arrives in the currently open mailbox. // Emitted when new mail arrives in the currently open mailbox.
// Fix https://github.com/mscdex/node-imap/issues/445 // Fix https://github.com/mscdex/node-imap/issues/445

View file

@ -1,5 +1,3 @@
global.Promise = require('bluebird');
module.exports = { module.exports = {
Provider: { Provider: {
Gmail: 'gmail', Gmail: 'gmail',
@ -13,4 +11,5 @@ module.exports = {
MessageTypes: require('./message-types'), MessageTypes: require('./message-types'),
Logger: require('./logger'), Logger: require('./logger'),
Errors: require('./imap-errors'), Errors: require('./imap-errors'),
PromiseUtils: require('./promise-utils'),
} }

View file

@ -0,0 +1,50 @@
require('promise.prototype.finally')
const _ = require('underscore')
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function each(iterable, iterator) {
return Promise.resolve(iterable)
.then((iter) => Array.from(iter))
.then((array) => {
return new Promise((resolve, reject) => {
array.reduce((prevPromise, item, idx, len) => (
prevPromise.then(() => Promise.resolve(iterator(item, idx, len)))
), Promise.resolve())
.then(() => resolve(iterable))
.catch((err) => reject(err))
})
})
}
function promisify(nodeFn) {
return function wrapper(...fnArgs) {
return new Promise((resolve, reject) => {
nodeFn.call(this, ...fnArgs, (err, ...results) => {
if (err) {
reject(err)
return
}
resolve(...results)
});
})
}
}
function promisifyAll(obj) {
for(const key in obj) {
if (!key.endsWith('Async') && _.isFunction(obj[key])) {
obj[`${key}Async`] = promisify(obj[key])
}
}
return obj
}
module.exports = {
each,
sleep,
promisify,
promisifyAll,
}

View file

@ -1,9 +1,10 @@
const Rx = require('rx') const Rx = require('rx')
const redis = require("redis"); const redis = require("redis");
const PromiseUtils = require('./promise-utils')
const log = global.Logger || console const log = global.Logger || console
Promise.promisifyAll(redis.RedisClient.prototype); PromiseUtils.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype); PromiseUtils.promisifyAll(redis.Multi.prototype);
class PubsubConnector { class PubsubConnector {

View file

@ -8,12 +8,13 @@ const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod? const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod?
const PromiseUtils = require('./promise-utils')
const PubsubConnector = require('./pubsub-connector'); const PubsubConnector = require('./pubsub-connector');
const MessageTypes = require('./message-types') const MessageTypes = require('./message-types')
const forEachAccountList = (forEachCallback) => { const forEachAccountList = (forEachCallback) => {
const client = PubsubConnector.broadcastClient(); const client = PubsubConnector.broadcastClient();
return Promise.each(client.keysAsync(`accounts:*`), (key) => { return PromiseUtils.each(client.keysAsync(`accounts:*`), (key) => {
const processId = key.replace('accounts:', ''); const processId = key.replace('accounts:', '');
return client.lrangeAsync(key, 0, 20000).then((foundIds) => return client.lrangeAsync(key, 0, 20000).then((foundIds) =>
forEachCallback(processId, foundIds) forEachCallback(processId, foundIds)

View file

@ -5,7 +5,6 @@ const Hapi = require('hapi');
const HapiWebSocket = require('hapi-plugin-websocket'); const HapiWebSocket = require('hapi-plugin-websocket');
const {Logger} = require(`nylas-core`); const {Logger} = require(`nylas-core`);
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-dashboard') global.Logger = Logger.createLogger('nylas-k2-dashboard')
const server = new Hapi.Server(); const server = new Hapi.Server();

View file

@ -1,7 +1,6 @@
const {PubsubConnector, DatabaseConnector, Logger} = require(`nylas-core`) const {PubsubConnector, DatabaseConnector, Logger} = require(`nylas-core`)
const {processors} = require('./processors') const {processors} = require('./processors')
global.Promise = require('bluebird');
global.Logger = Logger.createLogger('nylas-k2-message-processor') global.Logger = Logger.createLogger('nylas-k2-message-processor')
// List of the attributes of Message that the processor should be allowed to change. // List of the attributes of Message that the processor should be allowed to change.

View file

@ -1,5 +1,4 @@
// require('newrelic'); // require('newrelic');
global.Promise = require('bluebird');
const {DatabaseConnector, Logger} = require(`nylas-core`) const {DatabaseConnector, Logger} = require(`nylas-core`)
const SyncProcessManager = require('./sync-process-manager'); const SyncProcessManager = require('./sync-process-manager');

View file

@ -1,7 +1,7 @@
const _ = require('underscore'); const _ = require('underscore');
const Imap = require('imap'); const Imap = require('imap');
const {IMAPConnection, PubsubConnector} = require('nylas-core'); const {IMAPConnection, PubsubConnector, PromiseUtils} = require('nylas-core');
const {Capabilities} = IMAPConnection; const {Capabilities} = IMAPConnection;
const MessageFlagAttributes = ['id', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels'] const MessageFlagAttributes = ['id', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels']
@ -113,12 +113,12 @@ class FetchMessagesInFolder {
const {Message} = this._db; const {Message} = this._db;
const removedUIDs = localMessageAttributes const removedUIDs = localMessageAttributes
.filter(msg => !remoteUIDAttributes[msg.folderImapUID]) .filter(msg => !remoteUIDAttributes[msg.folderImapUID])
.map(msg => msg.folderImapUID) .map(msg => msg.folderImapUID)
this._logger.info({ this._logger.info({
removed_messages: removedUIDs.length, removed_messages: removedUIDs.length,
}, `FetchMessagesInFolder: found messages no longer in the folder`) }, `FetchMessagesInFolder: found messages no longer in the folder`)
if (removedUIDs.length === 0) { if (removedUIDs.length === 0) {
return Promise.resolve(); return Promise.resolve();
@ -176,7 +176,7 @@ class FetchMessagesInFolder {
uidsByPart[key].push(attributes.uid); uidsByPart[key].push(attributes.uid);
}) })
.then(() => { .then(() => {
return Promise.each(Object.keys(uidsByPart), (key) => { return PromiseUtils.each(Object.keys(uidsByPart), (key) => {
const uids = uidsByPart[key]; const uids = uidsByPart[key];
const desiredParts = JSON.parse(key); const desiredParts = JSON.parse(key);
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id)); const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
@ -329,7 +329,7 @@ class FetchMessagesInFolder {
} }
} }
return Promise.each(desiredRanges, ({min, max}) => { return PromiseUtils.each(desiredRanges, ({min, max}) => {
this._logger.info({ this._logger.info({
range: `${min}:${max}`, range: `${min}:${max}`,
}, `FetchMessagesInFolder: Fetching range`); }, `FetchMessagesInFolder: Fetching range`);
@ -343,7 +343,8 @@ class FetchMessagesInFolder {
timeFetchedUnseen: Date.now(), timeFetchedUnseen: Date.now(),
}); });
}) })
}).then(() => { })
.then(() => {
this._logger.info(`FetchMessagesInFolder: Fetching messages finished`); this._logger.info(`FetchMessagesInFolder: Fetching messages finished`);
}); });
} }

View file

@ -1,6 +1,6 @@
const os = require('os'); const os = require('os');
const SyncWorker = require('./sync-worker'); const SyncWorker = require('./sync-worker');
const {DatabaseConnector, PubsubConnector, SchedulerUtils} = require(`nylas-core`) const {DatabaseConnector, PubsubConnector, SchedulerUtils, PromiseUtils} = require(`nylas-core`)
const IDENTITY = `${os.hostname()}-${process.pid}`; const IDENTITY = `${os.hostname()}-${process.pid}`;
@ -110,12 +110,13 @@ class SyncProcessManager {
this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.") this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.")
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => { PromiseUtils.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, ''); const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) => return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) =>
(exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id)) (exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id))
) )
}).finally(() => { })
.finally(() => {
const delay = HEARTBEAT_EXPIRES * 1000; const delay = HEARTBEAT_EXPIRES * 1000;
setTimeout(() => this.unassignAccountsMissingHeartbeats(), delay); setTimeout(() => this.unassignAccountsMissingHeartbeats(), delay);
}); });
@ -165,7 +166,7 @@ class SyncProcessManager {
// If we've added an account, wait a second before asking for another one. // If we've added an account, wait a second before asking for another one.
// Spacing them out is probably healthy. // Spacing them out is probably healthy.
return Promise.delay(2000); return PromiseUtils.sleep(2000);
}); });
} }