Change to notify / observe for syncback requests

This commit is contained in:
Evan Morikawa 2016-06-28 15:35:35 -07:00
parent a10543c1c8
commit 14cffcf8a5
10 changed files with 67 additions and 53 deletions

View file

@ -0,0 +1,16 @@
const {PubsubConnector, MessageTypes} = require('nylas-core')
module.exports = {
createSyncbackRequest: function createSyncbackRequest(request, reply, syncRequestArgs) {
request.getAccountDatabase().then((db) => {
db.SyncbackRequest.create(syncRequestArgs).then((syncbackRequest) => {
PubsubConnector.notify({
accountId: db.accountId,
type: MessageTypes.SYNCBACK_REQUESTED,
data: syncbackRequest.id,
});
reply(Serialization.jsonStringify(syncbackRequest))
})
})
}
}

View file

@ -53,7 +53,7 @@ module.exports = (server) => {
request.getAccountDatabase().then((db) => {
const source = Rx.Observable.merge(
PubsubConnector.observableForAccountDeltas(account.id),
PubsubConnector.observeDeltas(account.id),
initialTransactions(db, request),
keepAlive(request)
).subscribe(outputStream.pushJSON)

View file

@ -1,5 +1,6 @@
const Joi = require('joi');
const Serialization = require('../serialization');
const {createSyncbackRequest} = require('../route-helpers')
module.exports = (server) => {
server.route({
@ -50,16 +51,12 @@ module.exports = (server) => {
},
},
handler: (request, reply) => {
request.getAccountDatabase().then((db) => {
db.SyncbackRequest.create({
type: "MoveToFolder",
props: {
folderId: request.params.folder_id,
threadId: request.params.id,
},
}).then((syncbackRequest) => {
reply(Serialization.jsonStringify(syncbackRequest))
})
createSyncbackRequest(request, reply, {
type: "MoveToFolder",
props: {
folderId: request.params.folder_id,
threadId: requres.params.id,
}
})
},
});

View file

@ -1,15 +1,22 @@
const PubsubConnector = require('./pubsub-connector')
const MessageTypes = require('./message-types')
module.exports = (db, sequelize) => {
sequelize.addHook("afterCreate", ({dataValues, $modelOptions}) => {
if ($modelOptions.name.singular === 'Account') {
PubsubConnector.broadcastClient().lpushAsync('accounts:unclaimed', dataValues.id);
PubsubConnector.notifyAccountChange(dataValues.id);
PubsubConnector.notify({
accountId: dataValues.id,
type: MessageTypes.ACCOUNT_UPDATED
});
}
})
sequelize.addHook("afterUpdate", ({dataValues, $modelOptions}) => {
if ($modelOptions.name.singular === 'Account') {
PubsubConnector.notifyAccountChange(dataValues.id);
PubsubConnector.notify({
accountId: dataValues.id,
type: MessageTypes.ACCOUNT_UPDATED
});
}
})
// TODO delete account from redis

View file

@ -22,7 +22,7 @@ module.exports = (db, sequelize) => {
db.Transaction.create(transactionData);
transactionData.object = sequelizeHookData.dataValues;
PubsubConnector.notifyAccountDeltas(db.accountId, transactionData);
PubsubConnector.notifyDelta(db.accountId, transactionData);
}
}

View file

@ -11,5 +11,6 @@ module.exports = {
IMAPConnection: require('./imap-connection'),
SyncPolicy: require('./sync-policy'),
SchedulerUtils: require('./scheduler-utils'),
MessageTypes: require('./message-types'),
NylasError,
}

View file

@ -0,0 +1,4 @@
module.exports = {
ACCOUNT_UPDATED: "ACCOUNT_UPDATED",
SYNCBACK_REQUESTED: "SYNCBACK_REQUESTED",
}

View file

@ -26,16 +26,7 @@ class PubsubConnector {
return this._broadcastClient;
}
channelForAccount(accountId) {
return `a-${accountId}`;
}
channelForAccountDeltas(accountId) {
return `deltas-${accountId}`;
}
// Shared channel
_observableForChannelOnSharedListener(channel) {
if (!this._listenClient) {
this._listenClient = this.buildClient();
@ -63,47 +54,29 @@ class PubsubConnector {
});
}
notifyAccountChange(accountId) {
const channel = this.channelForAccount(accountId);
this.broadcastClient().publish(channel, 'modified');
notify({accountId, type, data}) {
this.broadcastClient().publish(`channel-${accountId}`, {type, data});
}
observableForAccountChanges(accountId) {
const channel = this.channelForAccount(accountId);
return this._observableForChannelOnSharedListener(channel);
observe(accountId) {
return this._observableForChannelOnSharedListener(`channel-${accountId}`);
}
notifyMessageCreated(payload) {
this.broadcastClient().publish('message-created', payload);
notifyDelta(accountId, data) {
this.broadcastClient().publish(`channel-${accountId}-deltas`, JSON.stringify(data))
}
observableForMessageCreation(accountId) {
return this._observableForChannelOnSharedListener('message-created');
}
// Account (delta streaming) channels
notifyAccountDeltas(accountId, data) {
const channel = this.channelForAccountDeltas(accountId);
this.broadcastClient().publish(channel, JSON.stringify(data))
}
observableForAccountDeltas(accountId) {
observeDeltas(accountId) {
return Rx.Observable.create((observer) => {
const sub = this.buildClient();
sub.on("message", (channel, message) => observer.onNext(message));
sub.subscribe(this.channelForAccountDeltas(accountId));
sub.subscribe(`channel-${accountId}-deltas`);
return () => {
sub.unsubscribe();
sub.quit();
}
})
}
queueSyncbackTask({taskName, props}) {
const channel = this.channelForSyncbackTaskQueue(accountId);
this.broadcastClient().publish(channel, JSON.stringify(data))
}
}
module.exports = new PubsubConnector()

View file

@ -10,6 +10,7 @@ const HEARTBEAT_EXPIRES = 30; // 2 min in prod?
const CLAIM_DURATION = 10 * 60 * 1000; // 2 hours on prod?
const PubsubConnector = require('./pubsub-connector');
const MessageTypes = require('./message-types')
const forEachAccountList = (forEachCallback) => {
const client = PubsubConnector.broadcastClient();
@ -44,7 +45,10 @@ const notifyAccountIsActive = (accountId) => {
client.incrAsync(key).then((val) => {
client.expireAsync(key, 15 * 60 * 1000); // 15 min
if (val === 1) {
PubsubConnector.notifyAccountChange(accountId);
PubsubConnector.notify({
accountId: accountId,
type: MessageTypes.ACCOUNT_UPDATED
});
}
});
}

View file

@ -4,6 +4,7 @@ const {
IMAPConnection,
PubsubConnector,
DatabaseConnector,
MessageTypes,
} = require('nylas-core');
const FetchCategoryList = require('./imap/fetch-category-list')
@ -24,8 +25,8 @@ class SyncWorker {
this.syncNow();
this._listener = PubsubConnector.observableForAccountChanges(account.id)
.subscribe(() => this.onAccountChanged())
this._onMessage = this._onMessage.bind(this)
this._listener = PubsubConnector.observe(account.id).subscribe(this._onMessage)
}
cleanup() {
@ -39,7 +40,18 @@ class SyncWorker {
this._conn = null
}
onAccountChanged() {
_onMessage(msg = {}) {
switch(msg.type) {
case MessageTypes.ACCOUNT_UPDATED:
this._onAccountUpdated(); break;
case MessageTypes.SYNCBACK_REQUESTED:
this.syncNow(); break;
default:
throw new Error(`Invalid message: ${JSON.stringify(msg)}`)
}
}
_onAccountUpdated() {
console.log("SyncWorker: Detected change to account. Reloading and syncing now.")
DatabaseConnector.forShared().then(({Account}) => {
Account.find({where: {id: this._account.id}}).then((account) => {