Expand promise utils

This commit is contained in:
Ben Gotow 2016-07-13 17:22:02 -07:00
parent 1eeb9128c0
commit b43b623ca6
6 changed files with 28 additions and 23 deletions

View file

@ -14,6 +14,7 @@
"mysql": "^2.11.1",
"newrelic": "^1.28.1",
"pm2": "^1.1.3",
"promise-props": "^1.0.0",
"promise.prototype.finally": "^1.0.1",
"redis": "2.x.x",
"rx": "4.x.x",

View file

@ -47,9 +47,9 @@ class IMAPConnection extends EventEmitter {
connect() {
if (!this._connectPromise) {
this._connectPromise = this._resolveIMAPSettings()
.then((settings) => this._buildUnderlyingConnection(settings))
.thenReturn(this);
this._connectPromise = this._resolveIMAPSettings().then((settings) =>
this._buildUnderlyingConnection(settings)
);
}
return this._connectPromise;
}
@ -108,7 +108,7 @@ class IMAPConnection extends EventEmitter {
this._imap.on('update', () => this.emit('update'))
this._imap.once('ready', () => {
resolve()
resolve(this)
});
this._imap.once('error', (err) => {

View file

@ -1,16 +1,15 @@
/* eslint no-restricted-syntax: 0 */
require('promise.prototype.finally')
const _ = require('underscore')
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
global.Promise.props = require('promise-props');
function each(iterable, iterator) {
return Promise.resolve(iterable)
.then((iter) => Array.from(iter))
.then((array) => {
global.Promise.each = function each(iterable, iterator) {
return Promise.resolve(iterable).then((array) => {
return new Promise((resolve, reject) => {
array.reduce((prevPromise, item, idx, len) => (
Array.from(array).reduce((prevPromise, item, idx, len) => (
prevPromise.then(() => Promise.resolve(iterator(item, idx, len)))
), Promise.resolve())
.then(() => resolve(iterable))
@ -19,6 +18,14 @@ function each(iterable, iterator) {
})
}
global.Promise.sleep = function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms))
}
global.Promise.prototype.thenReturn = function thenReturnPolyfill(value) {
this.then(function then() { return Promise.resolve(value); })
}
function promisify(nodeFn) {
return function wrapper(...fnArgs) {
return new Promise((resolve, reject) => {
@ -34,7 +41,7 @@ function promisify(nodeFn) {
}
function promisifyAll(obj) {
for(const key in obj) {
for (const key in obj) {
if (!key.endsWith('Async') && _.isFunction(obj[key])) {
obj[`${key}Async`] = promisify(obj[key])
}
@ -43,8 +50,6 @@ function promisifyAll(obj) {
}
module.exports = {
each,
sleep,
promisify,
promisifyAll,
}

View file

@ -8,13 +8,12 @@ const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod?
const PromiseUtils = require('./promise-utils')
const PubsubConnector = require('./pubsub-connector');
const MessageTypes = require('./message-types')
const forEachAccountList = (forEachCallback) => {
const client = PubsubConnector.broadcastClient();
return PromiseUtils.each(client.keysAsync(`accounts:*`), (key) => {
return Promise.each(client.keysAsync(`accounts:*`), (key) => {
const processId = key.replace('accounts:', '');
return client.lrangeAsync(key, 0, 20000).then((foundIds) =>
forEachCallback(processId, foundIds)

View file

@ -1,7 +1,7 @@
const _ = require('underscore');
const Imap = require('imap');
const {IMAPConnection, PubsubConnector, PromiseUtils} = require('nylas-core');
const {IMAPConnection, PubsubConnector} = require('nylas-core');
const {Capabilities} = IMAPConnection;
const MessageFlagAttributes = ['id', 'folderImapUID', 'unread', 'starred', 'folderImapXGMLabels']
@ -176,7 +176,7 @@ class FetchMessagesInFolder {
uidsByPart[key].push(attributes.uid);
})
.then(() => {
return PromiseUtils.each(Object.keys(uidsByPart), (key) => {
return Promise.each(Object.keys(uidsByPart), (key) => {
const uids = uidsByPart[key];
const desiredParts = JSON.parse(key);
const bodies = ['HEADER'].concat(desiredParts.map(p => p.id));
@ -329,7 +329,7 @@ class FetchMessagesInFolder {
}
}
return PromiseUtils.each(desiredRanges, ({min, max}) => {
return Promise.each(desiredRanges, ({min, max}) => {
this._logger.info({
range: `${min}:${max}`,
}, `FetchMessagesInFolder: Fetching range`);

View file

@ -1,6 +1,6 @@
const os = require('os');
const SyncWorker = require('./sync-worker');
const {DatabaseConnector, PubsubConnector, SchedulerUtils, PromiseUtils} = require(`nylas-core`)
const {DatabaseConnector, PubsubConnector, SchedulerUtils} = require(`nylas-core`)
const IDENTITY = `${os.hostname()}-${process.pid}`;
@ -110,7 +110,7 @@ class SyncProcessManager {
this._logger.info("ProcessManager: Starting unassignment for processes missing heartbeats.")
PromiseUtils.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
Promise.each(client.keysAsync(`${ACCOUNTS_CLAIMED_PREFIX}*`), (key) => {
const id = key.replace(ACCOUNTS_CLAIMED_PREFIX, '');
return client.existsAsync(HEARTBEAT_FOR(id)).then((exists) =>
(exists ? Promise.resolve() : this.unassignAccountsAssignedTo(id))
@ -166,7 +166,7 @@ class SyncProcessManager {
// If we've added an account, wait a second before asking for another one.
// Spacing them out is probably healthy.
return PromiseUtils.sleep(2000);
return Promise.sleep(2000);
});
}