resolved merge -- Accounts and contacts are now both added to serilaization

This commit is contained in:
Annie 2016-07-08 17:09:57 -07:00
commit cb20f8cecd
18 changed files with 204 additions and 156 deletions

View file

@ -22,4 +22,4 @@ EXPOSE 5100
# We use a start-aws command that automatically spawns the correct process
# based on environment variables (which changes instance to instance)
CMD [ "./node_modules/pm2/bin/pm2", "start", "./pm2-prod-${AWS_SERVICE_NAME}.yml"]
CMD ./node_modules/pm2/bin/pm2 start --no-daemon ./pm2-prod-${AWS_SERVICE_NAME}.yml

View file

@ -9,6 +9,7 @@
3. Install [Docker for Mac](https://docs.docker.com/docker-for-mac/)
4. Install [NVM](https://github.com/creationix/nvm) `brew install nvm`
5. Install Node 6+ via NVM: `nvm install 6`
6. Install Redis locally `brew install redis`
## New to AWS:
@ -30,4 +31,16 @@
npm start
```
We use [pm2](http://pm2.keymetrics.io/) to launch a variety of processes
(sync, api, dashboard, processor, etc).
You can see the scripts that are running and their arguments in
`/pm2-dev.yml`
To test to see if the basic API is up go to: `http://lvh.me:5100/ping`. You
should see `pong`.
`lvh.me` is a DNS hack that redirects back to 127.0.0.1 with the added
benefit of letting us use subdomains.
# Deploying

View file

@ -6,23 +6,26 @@ function replacer(key, value) {
}
function jsonSchema(modelName) {
const models = ['Message', 'Thread', 'File', 'Error', 'SyncbackRequest', 'Contact']
const models = ['Message', 'Thread', 'File', 'Error', 'SyncbackRequest', 'Account', 'Contact']
if (models.includes(modelName)) {
return Joi.object();
}
if (modelName === 'Account') {
return Joi.object().keys({
id: Joi.number(),
object: Joi.string(),
email_address: Joi.string(),
provider: Joi.string(),
organization_unit: Joi.string(),
connection_settings: Joi.object(),
sync_policy: Joi.object(),
sync_error: Joi.object().allow(null),
first_sync_completed_at: Joi.number().allow(null),
last_sync_completions: Joi.array(),
})
// Ben: Disabled temporarily because folks keep changing the keys and it's hard
// to keep in sync. Might need to consider another approach to these.
// return Joi.object().keys({
// id: Joi.number(),
// object: Joi.string(),
// email_address: Joi.string(),
// provider: Joi.string(),
// organization_unit: Joi.string(),
// connection_settings: Joi.object(),
// sync_policy: Joi.object(),
// sync_error: Joi.object().allow(null),
// first_sync_completed_at: Joi.number().allow(null),
// last_sync_completions: Joi.array(),
// })
}
if (modelName === 'Folder') {
return Joi.object().keys({

View file

@ -1,21 +1,23 @@
const Sequelize = require('sequelize');
module.exports = {
JSONType: (fieldName, {defaultValue = '{}'} = {}) => ({
type: Sequelize.STRING,
defaultValue,
JSONType: (fieldName, {defaultValue = {}} = {}) => ({
type: Sequelize.TEXT,
get: function get() {
return JSON.parse(this.getDataValue(fieldName))
const val = this.getDataValue(fieldName);
if (!val) { return defaultValue }
return JSON.parse(val);
},
set: function set(val) {
this.setDataValue(fieldName, JSON.stringify(val));
},
}),
JSONARRAYType: (fieldName, {defaultValue = '[]'} = {}) => ({
type: Sequelize.STRING,
defaultValue,
JSONARRAYType: (fieldName, {defaultValue = []} = {}) => ({
type: Sequelize.TEXT,
get: function get() {
return JSON.parse(this.getDataValue(fieldName))
const val = this.getDataValue(fieldName);
if (!val) { return defaultValue }
return JSON.parse(val);
},
set: function set(val) {
this.setDataValue(fieldName, JSON.stringify(val));

View file

@ -9,13 +9,31 @@ module.exports = (db, sequelize) => {
}
}
const isTransaction = ({$modelOptions}) => {
return $modelOptions.name.singular === "transaction"
const isSilent = (data) => {
data._previousDataValues
data._changed
if (data.$modelOptions.name.singular === "transaction") {
return true
}
if (data._changed && data._changed.syncState) {
for (const key of Object.keys(data._changed)) {
if (key === "syncState") { continue }
if (data._changed[key] !== data._previousDataValues[key]) {
// SyncState changed, but so did something else
return false;
}
}
// Be silent due to nothing but sync state changing
return true;
}
}
const transactionLogger = (type) => {
return (sequelizeHookData) => {
if (isTransaction(sequelizeHookData)) return;
if (isSilent(sequelizeHookData)) return;
const transactionData = Object.assign({type: type},
parseHookData(sequelizeHookData)
);

View file

@ -4,6 +4,14 @@ const _ = require('underscore');
const xoauth2 = require('xoauth2');
const EventEmitter = require('events');
class IMAPConnectionNotReadyError extends Error {
constructor(funcName) {
super(`${funcName} - You must call connect() first.`);
// hack so that the error matches the ones used by node-imap
this.source = 'socket';
}
}
class IMAPBox {
@ -123,28 +131,28 @@ class IMAPBox {
addFlags(range, flags) {
if (!this._imap) {
throw new Error(`IMAPBox::addFlags - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPBox::addFlags`)
}
return this._imap.addFlagsAsync(range, flags)
}
delFlags(range, flags) {
if (!this._imap) {
throw new Error(`IMAPBox::delFlags - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPBox::delFlags`)
}
return this._imap.delFlagsAsync(range, flags)
}
moveFromBox(range, folderName) {
if (!this._imap) {
throw new Error(`IMAPBox::moveFromBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPBox::moveFromBox`)
}
return this._imap.moveAsync(range, folderName)
}
closeBox({expunge = true} = {}) {
if (!this._imap) {
throw new Error(`IMAPBox::closeBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPBox::closeBox`)
}
return this._imap.closeBoxAsync(expunge)
}
@ -174,7 +182,8 @@ class IMAPConnection extends EventEmitter {
this._queue = [];
this._currentOperation = null;
this._settings = settings;
this._imap = null
this._imap = null;
this._connectPromise = null;
}
connect() {
@ -222,7 +231,9 @@ class IMAPConnection extends EventEmitter {
this._imap = Promise.promisifyAll(new Imap(settings));
this._imap.once('end', () => {
console.log('Connection ended');
console.log('Underlying IMAP Connection ended');
this._connectPromise = null;
this._imap = null;
});
this._imap.on('alert', (msg) => {
@ -255,11 +266,12 @@ class IMAPConnection extends EventEmitter {
this._queue = [];
this._imap.end();
this._imap = null;
this._connectPromise = null;
}
serverSupports(capability) {
if (!this._imap) {
throw new Error(`IMAPConnection::serverSupports - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::serverSupports`)
}
this._imap.serverSupports(capability);
}
@ -269,7 +281,7 @@ class IMAPConnection extends EventEmitter {
*/
openBox(folderName, {readOnly = false} = {}) {
if (!this._imap) {
throw new Error(`IMAPConnection::openBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::openBox`)
}
return this._imap.openBoxAsync(folderName, readOnly).then((box) =>
new IMAPBox(this._imap, box)
@ -278,35 +290,35 @@ class IMAPConnection extends EventEmitter {
getBoxes() {
if (!this._imap) {
throw new Error(`IMAPConnection::getBoxes - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::getBoxes`)
}
return this._imap.getBoxesAsync()
}
addBox(folderName) {
if (!this._imap) {
throw new Error(`IMAPConnection::addBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::addBox`)
}
return this._imap.addBoxAsync(folderName)
}
renameBox(oldFolderName, newFolderName) {
if (!this._imap) {
throw new Error(`IMAPConnection::renameBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::renameBox`)
}
return this._imap.renameBoxAsync(oldFolderName, newFolderName)
}
delBox(folderName) {
if (!this._imap) {
throw new Error(`IMAPConnection::delBox - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::delBox`)
}
return this._imap.delBoxAsync(folderName)
}
runOperation(operation) {
if (!this._imap) {
throw new Error(`IMAPConnection::runOperation - You need to call connect() first.`)
throw new IMAPConnectionNotReadyError(`IMAPConnection::runOperation`)
}
return new Promise((resolve, reject) => {
this._queue.push({operation, resolve, reject});
@ -317,11 +329,13 @@ class IMAPConnection extends EventEmitter {
}
processNextOperation() {
if (this._currentOperation) { return }
if (this._currentOperation) {
return;
}
this._currentOperation = this._queue.shift();
if (!this._currentOperation) {
this.emit('queue-empty');
return
return;
}
const {operation, resolve, reject} = this._currentOperation;
@ -329,8 +343,8 @@ class IMAPConnection extends EventEmitter {
if (result instanceof Promise === false) {
reject(new Error(`Expected ${operation.constructor.name} to return promise.`))
}
result
.then(() => {
result.then(() => {
this._currentOperation = null;
console.log(`Finished task: ${operation.description()}`)
resolve();
@ -344,6 +358,6 @@ class IMAPConnection extends EventEmitter {
})
}
}
IMAPConnection.Capabilities = Capabilities;
IMAPConnection.Capabilities = Capabilities;
module.exports = IMAPConnection

View file

@ -4,9 +4,9 @@ module.exports = (sequelize, Sequelize) => {
const File = sequelize.define('file', {
accountId: { type: Sequelize.STRING, allowNull: false },
version: Sequelize.INTEGER,
filename: Sequelize.STRING,
filename: Sequelize.STRING(500),
partId: Sequelize.STRING,
contentType: Sequelize.STRING,
contentType: Sequelize.STRING(500),
size: Sequelize.INTEGER,
}, {
classMethods: {

View file

@ -8,11 +8,11 @@ module.exports = (sequelize, Sequelize) => {
accountId: { type: Sequelize.STRING, allowNull: false },
version: Sequelize.INTEGER,
headerMessageId: Sequelize.STRING,
body: Sequelize.STRING,
body: Sequelize.TEXT,
headers: JSONType('headers'),
subject: Sequelize.STRING,
snippet: Sequelize.STRING,
hash: Sequelize.STRING,
subject: Sequelize.STRING(500),
snippet: Sequelize.STRING(255),
hash: Sequelize.STRING(65),
date: Sequelize.DATE,
unread: Sequelize.BOOLEAN,
starred: Sequelize.BOOLEAN,
@ -23,7 +23,7 @@ module.exports = (sequelize, Sequelize) => {
bcc: JSONARRAYType('bcc'),
replyTo: JSONARRAYType('replyTo'),
folderImapUID: { type: Sequelize.STRING, allowNull: true},
folderImapXGMLabels: { type: Sequelize.STRING, allowNull: true},
folderImapXGMLabels: { type: Sequelize.TEXT, allowNull: true},
}, {
indexes: [
{

View file

@ -1,4 +1,4 @@
const {typeJSON} = require('../model-helpers')
const {JSONType} = require('../../database-types');
module.exports = (sequelize, Sequelize) => {
const SyncbackRequest = sequelize.define('syncbackRequest', {
@ -8,8 +8,8 @@ module.exports = (sequelize, Sequelize) => {
defaultValue: "NEW",
allowNull: false,
},
error: typeJSON('error'),
props: typeJSON('props'),
error: JSONType('error'),
props: JSONType('props'),
});
return SyncbackRequest;

View file

@ -5,8 +5,8 @@ module.exports = (sequelize, Sequelize) => {
accountId: { type: Sequelize.STRING, allowNull: false },
version: Sequelize.INTEGER,
threadId: Sequelize.STRING,
subject: Sequelize.STRING,
snippet: Sequelize.STRING,
subject: Sequelize.STRING(500),
snippet: Sequelize.STRING(255),
unreadCount: Sequelize.INTEGER,
starredCount: Sequelize.INTEGER,
firstMessageDate: Sequelize.DATE,

View file

@ -1,17 +1,11 @@
const {JSONARRAYType} = require('../../database-types');
module.exports = (sequelize, Sequelize) => {
const Transaction = sequelize.define('transaction', {
type: Sequelize.STRING,
objectId: Sequelize.STRING,
modelName: Sequelize.STRING,
changedFields: {
type: Sequelize.STRING,
get: function get() {
return JSON.parse(this.getDataValue('changedFields'))
},
set: function set(val) {
this.setDataValue('changedFields', JSON.stringify(val));
},
},
changedFields: JSONARRAYType('changedFields'),
});
return Transaction;

View file

@ -1,31 +0,0 @@
const Sequelize = require('sequelize');
module.exports = {
typeJSON: function typeJSON(key) {
return {
type: Sequelize.STRING,
get: function get() {
const val = this.getDataValue(key);
if (typeof val === 'string') {
try {
return JSON.parse(val)
} catch (e) {
return val
}
}
return val
},
set: function set(val) {
let valToSet = val
if (typeof val !== 'string') {
try {
valToSet = JSON.stringify(val)
} catch (e) {
valToSet = val;
}
}
return this.setDataValue(key, valToSet)
},
}
},
}

View file

@ -9,7 +9,7 @@ module.exports = (sequelize, Sequelize) => {
provider: Sequelize.STRING,
emailAddress: Sequelize.STRING,
connectionSettings: JSONType('connectionSettings'),
connectionCredentials: Sequelize.STRING,
connectionCredentials: Sequelize.TEXT,
syncPolicy: JSONType('syncPolicy'),
syncError: JSONType('syncError', {defaultValue: null}),
firstSyncCompletedAt: Sequelize.INTEGER,

View file

@ -20,18 +20,18 @@ const attach = (directory) => {
});
}
DatabaseConnector.forShared().then(({Account}) => {
server.register([HapiWebSocket, Inert], () => {
attach('./routes/')
server.register([HapiWebSocket, Inert], () => {
attach('./routes/')
server.route({
method: "POST",
path: "/accounts",
config: {
plugins: {
websocket: {
only: true,
connect: (wss, ws) => {
server.route({
method: "POST",
path: "/accounts",
config: {
plugins: {
websocket: {
only: true,
connect: (wss, ws) => {
DatabaseConnector.forShared().then(({Account}) => {
Account.findAll().then((accounts) => {
accounts.forEach((acct) => {
ws.send(JSON.stringify({ cmd: "ACCOUNT", payload: acct }));
@ -57,35 +57,47 @@ DatabaseConnector.forShared().then(({Account}) => {
ws.send(JSON.stringify({ cmd: "ASSIGNMENTS", payload: assignments}))
)
}, 1000);
},
disconnect: () => {
clearInterval(this.pollInterval);
this.observable.dispose();
},
});
},
disconnect: () => {
clearInterval(this.pollInterval);
this.observable.dispose();
},
},
},
handler: (request, reply) => {
if (request.payload.cmd === "PING") {
reply(JSON.stringify({ result: "PONG" }));
return;
}
},
});
},
handler: (request, reply) => {
if (request.payload.cmd === "PING") {
reply(JSON.stringify({ result: "PONG" }));
return;
}
},
});
server.route({
method: 'GET',
path: '/{param*}',
handler: {
directory: {
path: require('path').join(__dirname, 'public'),
},
},
});
server.route({
method: 'GET',
path: '/ping',
config: {
auth: false,
},
handler: (request, reply) => {
console.log("---> Ping!")
reply("pong")
},
});
server.start((startErr) => {
if (startErr) { throw startErr; }
console.log('Dashboard running at:', server.info.uri);
});
server.route({
method: 'GET',
path: '/{param*}',
handler: {
directory: {
path: require('path').join(__dirname, 'public'),
},
},
});
server.start((startErr) => {
if (startErr) { throw startErr; }
console.log('Dashboard running at:', server.info.uri);
});
});

View file

@ -29,7 +29,7 @@ class SyncWorker {
this._expirationTimer = null;
this._destroyed = false;
this.syncNow();
this.syncNow({reason: 'Initial'});
this._onMessage = this._onMessage.bind(this);
this._listener = PubsubConnector.observeAccount(account.id).subscribe(this._onMessage)
@ -45,7 +45,6 @@ class SyncWorker {
if (this._conn) {
this._conn.end();
}
this._conn = null
}
_onMessage(msg) {
@ -54,22 +53,27 @@ class SyncWorker {
case MessageTypes.ACCOUNT_UPDATED:
this._onAccountUpdated(); break;
case MessageTypes.SYNCBACK_REQUESTED:
this.syncNow(); break;
this.syncNow({reason: 'Syncback Action Queued'}); break;
default:
throw new Error(`Invalid message: ${msg}`)
}
}
_onAccountUpdated() {
console.log("SyncWorker: Detected change to account. Reloading and syncing now.");
if (!this.isWaitingForNextSync()) {
return;
}
this._getAccount().then((account) => {
this._account = account;
this.syncNow();
})
this.syncNow({reason: 'Account Modification'});
});
}
_onConnectionIdleUpdate() {
this.syncNow();
if (!this.isWaitingForNextSync()) {
return;
}
this.syncNow({reason: 'IMAP IDLE Fired'});
}
_getAccount() {
@ -126,7 +130,8 @@ class SyncWorker {
.catch((error) => {
syncbackRequest.error = error
syncbackRequest.status = "FAILED"
}).finally(() => syncbackRequest.save())
})
.finally(() => syncbackRequest.save())
}
syncAllCategories() {
@ -145,23 +150,21 @@ class SyncWorker {
});
}
performSync() {
return this.syncbackMessageActions()
.then(() => this._conn.runOperation(new FetchFolderList(this._account.provider)))
.then(() => this.syncAllCategories())
}
syncNow() {
syncNow({reason} = {}) {
clearTimeout(this._syncTimer);
this._syncTimer = null;
if (!process.env.SYNC_AFTER_ERRORS && this._account.errored()) {
console.log(`SyncWorker: Account ${this._account.emailAddress} is in error state - Skipping sync`)
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) is in error state - Skipping sync`)
return
}
console.log(`SyncWorker: Account ${this._account.emailAddress} (${this._account.id}) sync started (${reason})`)
this.ensureConnection()
.then(() => this._account.update({syncError: null}))
.then(() => this.performSync())
.then(() => this.syncbackMessageActions())
.then(() => this._conn.runOperation(new FetchFolderList(this._account.provider)))
.then(() => this.syncAllCategories())
.then(() => this.onSyncDidComplete())
.catch((error) => this.onSyncError(error))
.finally(() => {
@ -171,13 +174,14 @@ class SyncWorker {
}
onSyncError(error) {
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} `, error)
console.error(`SyncWorker: Error while syncing account ${this._account.emailAddress} (${this._account.id})`, error)
this.closeConnection()
if (error.source === 'socket') {
if (error.source.includes('socket') || error.source.includes('timeout')) {
// Continue to retry if it was a network error
return Promise.resolve()
}
this._account.syncError = jsonError(error)
return this._account.save()
}
@ -216,6 +220,10 @@ class SyncWorker {
throw new Error(`SyncWorker.onSyncDidComplete: Unknown afterSync behavior: ${afterSync}. Closing connection`)
}
isWaitingForNextSync() {
return this._syncTimer != null;
}
scheduleNextSync() {
if (Date.now() - this._startTime > CLAIM_DURATION) {
console.log("SyncWorker: - Has held account for more than CLAIM_DURATION, returning to pool.");
@ -232,7 +240,7 @@ class SyncWorker {
const target = this._lastSyncTime + interval;
console.log(`SyncWorker: Account ${active ? 'active' : 'inactive'}. Next sync scheduled for ${new Date(target).toLocaleString()}`);
this._syncTimer = setTimeout(() => {
this.syncNow();
this.syncNow({reason: 'Scheduled'});
}, target - Date.now());
}
});

View file

@ -1,4 +1,8 @@
apps:
- script : redis-server
name : redis
- script : packages/nylas-api/app.js
name : api
env :
@ -8,17 +12,23 @@ apps:
GMAIL_CLIENT_ID : "271342407743-nibas08fua1itr1utq9qjladbkv3esdm.apps.googleusercontent.com"
GMAIL_CLIENT_SECRET : "WhmxErj-ei6vJXLocNhBbfBF"
GMAIL_REDIRECT_URL : "http://localhost:5100/auth/gmail/oauthcallback"
- script : packages/nylas-sync/app.js
name : sync
env :
DB_ENCRYPTION_ALGORITHM : "aes-256-ctr"
DB_ENCRYPTION_PASSWORD : "d6F3Efeq"
- script : packages/nylas-dashboard/app.js
name : dashboard
env :
PORT: 5101
DB_ENCRYPTION_ALGORITHM : "aes-256-ctr"
DB_ENCRYPTION_PASSWORD : "d6F3Efeq"
- script : packages/nylas-message-processor/app.js
name : processor
env :

View file

@ -3,7 +3,5 @@ apps:
name : api
instances: 0
exec_mode: cluster
- script : packages/nylas-dashboard/app.js
name : dashboard
instances: 1
exec_mode: cluster
env :
PORT: 5100

7
pm2-prod-dashboard.yml Normal file
View file

@ -0,0 +1,7 @@
apps:
- script : packages/nylas-dashboard/app.js
name : dashboard
instances: 0
exec_mode: cluster
env :
PORT: 5100