[local-sync, cloud-api, cloud-workers] Fix msg id collision, tracking and sending issues, some refactoring

Summary:
This diff solves a few separate issues from T7313, T7316, T7282, and it refactors
the send code a little bit.

Initially, the problem that led to this diff was generating message ids that
wouldn't collide (which was causing errors in the message-processor). Collisions
in ids were being caused by messages that contained the exact same participants,
subject and date (most likely due bots or scripts sending emails in quick
succession)

To prevent collisions this commit adds the `message-id` header as part of the
database message id, and ensures that we set it correctly before sending, and
that it remains consistent through send, multi-send, and the sync loop.

During the refactor and review, I removed some code that assumed that we were
syncing drafts (which we aren't), and also fixes a few other known and
unknown issues around sending, message creation, and tracking, like assigning
the correct date header (we were previously assigning the draft creation date
from within N1), fixing the tracking regex, among other smaller bugs/typos.

Will address inline TODOs in a separate diff

Test Plan: TODO!!! I will add tests in another diff

Reviewers: evan, halla, jackie, khamidou

Reviewed By: halla, jackie

Differential Revision: https://phab.nylas.com/D3507
This commit is contained in:
Juan Tejada 2016-12-14 19:35:19 -08:00
parent 28f792558d
commit b79488ae43
20 changed files with 397 additions and 338 deletions

2
.gitignore vendored
View file

@ -1,7 +1,7 @@
*.swp
*~
.DS_Store
node_modules/*
node_modules
dump.rdb
*npm-debug.log
storage/

View file

@ -1,30 +1,44 @@
const Sequelize = require('sequelize');
module.exports = {
buildJSONColumnOptions: (fieldName, {defaultValue = {}} = {}) => ({
type: Sequelize.TEXT,
get: function get() {
const val = this.getDataValue(fieldName);
if (!val) {
return defaultValue ? Object.assign({}, defaultValue) : null;
}
return JSON.parse(val);
},
set: function set(val) {
this.setDataValue(fieldName, JSON.stringify(val));
},
}),
buildJSONARRAYColumnOptions: (fieldName) => ({
type: Sequelize.TEXT,
get: function get() {
const val = this.getDataValue(fieldName);
if (!val) {
return [];
}
return JSON.parse(val);
},
set: function set(val) {
this.setDataValue(fieldName, JSON.stringify(val));
},
}),
JSONColumn(fieldName, options = {}) {
return Object.assign(options, {
type: Sequelize.TEXT,
get() {
const val = this.getDataValue(fieldName);
if (!val) {
const {defaultValue} = options
return defaultValue ? Object.assign({}, defaultValue) : {};
}
return JSON.parse(val);
},
set(val) {
this.setDataValue(fieldName, JSON.stringify(val));
},
defaultValue: undefined,
})
},
JSONArrayColumn(fieldName, options = {}) {
return Object.assign(options, {
type: Sequelize.TEXT,
get() {
const val = this.getDataValue(fieldName);
if (!val) {
return [];
}
const arr = JSON.parse(val)
if (!Array.isArray(arr)) {
throw new Error('JSONArrayType should be an array')
}
return JSON.parse(val);
},
set(val) {
if (!Array.isArray(val)) {
throw new Error('JSONArrayType should be an array')
}
this.setDataValue(fieldName, JSON.stringify(val));
},
defaultValue: undefined,
})
},
}

View file

@ -26,7 +26,7 @@ class IMAPConnection extends EventEmitter {
return new IMAPConnection(...args).connect()
}
constructor({db, settings, logger} = {}) {
constructor({db, account, settings, logger} = {}) {
super();
if (!(settings instanceof Object)) {
@ -38,6 +38,7 @@ class IMAPConnection extends EventEmitter {
this._logger = logger;
this._db = db;
this._account = account;
this._queue = [];
this._currentOperation = null;
this._settings = settings;
@ -45,6 +46,14 @@ class IMAPConnection extends EventEmitter {
this._connectPromise = null;
}
get account() {
return this._account
}
get logger() {
return this._logger
}
connect() {
if (!this._connectPromise) {
this._connectPromise = this._resolveIMAPSettings().then((settings) => {

View file

@ -1,5 +1,5 @@
const crypto = require('crypto');
const {buildJSONColumnOptions, buildJSONARRAYColumnOptions} = require('../database-types');
const {JSONColumn, JSONArrayColumn} = require('../database-types');
const {DB_ENCRYPTION_ALGORITHM, DB_ENCRYPTION_PASSWORD} = process.env;
@ -9,16 +9,16 @@ module.exports = (sequelize, Sequelize) => {
name: Sequelize.STRING,
provider: Sequelize.STRING,
emailAddress: Sequelize.STRING,
connectionSettings: buildJSONColumnOptions('connectionSettings'),
connectionSettings: JSONColumn('connectionSettings'),
connectionCredentials: Sequelize.TEXT,
syncPolicy: buildJSONColumnOptions('syncPolicy'),
syncError: buildJSONColumnOptions('syncError', {defaultValue: null}),
syncPolicy: JSONColumn('syncPolicy'),
syncError: JSONColumn('syncError'),
firstSyncCompletion: {
type: Sequelize.STRING(14),
allowNull: true,
defaultValue: null,
},
lastSyncCompletions: buildJSONARRAYColumnOptions('lastSyncCompletions'),
lastSyncCompletions: JSONArrayColumn('lastSyncCompletions'),
}, {
indexes: [
{

View file

@ -1,4 +1,4 @@
const {buildJSONARRAYColumnOptions} = require('../database-types');
const {JSONArrayColumn} = require('../database-types');
module.exports = (sequelize, Sequelize) => {
return sequelize.define('transaction', {
@ -6,7 +6,7 @@ module.exports = (sequelize, Sequelize) => {
object: Sequelize.STRING,
objectId: Sequelize.STRING,
accountId: Sequelize.STRING,
changedFields: buildJSONARRAYColumnOptions('changedFields'),
changedFields: JSONArrayColumn('changedFields'),
}, {
instanceMethods: {
toJSON: function toJSON() {

View file

@ -46,10 +46,8 @@ module.exports = (server) => {
description: `Create ${term}`,
tags: [term],
validate: {
params: {
payload: {
display_name: Joi.string().required(),
},
payload: {
display_name: Joi.string().required(),
},
},
response: {
@ -84,9 +82,9 @@ module.exports = (server) => {
validate: {
params: {
id: Joi.string().required(),
payload: {
display_name: Joi.string().required(),
},
},
payload: {
display_name: Joi.string().required(),
},
},
response: {

View file

@ -1,10 +1,12 @@
const Joi = require('joi');
const Utils = require('../../shared/utils');
const SendmailClient = require('../../shared/sendmail-client');
const MessageFactory = require('../../shared/message-factory');
const {HTTPError} = require('../../shared/errors');
const LocalDatabaseConnector = require('../../shared/local-database-connector');
const Errors = require('../../shared/errors');
const SendingUtils = require('../sending-utils');
const SendmailClient = require('../sendmail-client');
const SEND_TIMEOUT = 1000 * 60; // millliseconds
const SEND_TIMEOUT_MS = 1000 * 60; // millliseconds
const recipient = Joi.object().keys({
name: Joi.string().required(),
@ -17,14 +19,15 @@ const recipient = Joi.object().keys({
server_id: Joi.string(),
object: Joi.string(),
});
const recipientList = Joi.array().items(recipient);
const respondWithError = (request, reply, error) => {
const replyWithError = (request, reply, error) => {
if (!error.httpCode) {
error.type = 'apiError';
error.type = 'ApiError';
error.httpCode = 500;
}
request.logger.error('responding with error', error, error.logContext);
request.logger.error('Replying with error', error, error.logContext);
reply(JSON.stringify(error)).code(error.httpCode);
}
@ -32,19 +35,55 @@ module.exports = (server) => {
server.route({
method: 'POST',
path: '/send',
handler: async (request, reply) => {
config: {
validate: {
payload: {
to: recipientList,
cc: recipientList,
bcc: recipientList,
from: recipientList.length(1).required(),
reply_to: recipientList.min(0).max(1),
subject: Joi.string().required(),
body: Joi.string().required(),
thread_id: Joi.string(),
reply_to_message_id: Joi.string(),
client_id: Joi.string().required(),
account_id: Joi.string(),
id: Joi.string(),
object: Joi.string().equal('draft'),
metadata: Joi.array().items(Joi.object()),
date: Joi.number(),
files: Joi.array().items(Joi.string()),
file_ids: Joi.array(),
uploads: Joi.array(),
events: Joi.array(),
pristine: Joi.boolean(),
categories: Joi.array().items(Joi.string()),
draft: Joi.boolean(),
},
},
},
async handler(request, reply) {
// TODO make this a task to trigger a sync loop run
try {
const account = request.auth.credentials;
const db = await LocalDatabaseConnector.forAccount(account.id)
const draft = await SendingUtils.findOrCreateMessageFromJSON(request.payload, db);
// Calculate the response now to prevent errors after the draft has
// already been sent.
const responseOnSuccess = draft.toJSON();
const message = await MessageFactory.buildForSend(db, request.payload)
const sender = new SendmailClient(account, request.logger);
await sender.send(draft);
reply(responseOnSuccess);
await sender.send(message);
// We don't save the message until after successfully sending it.
// In the next sync loop, the message's labels and other data will be
// updated, and we can guarantee this because we control message id
// generation.
// The thread will be created or updated when we detect this
// message in the sync loop
message.setIsSent(true)
await message.save();
// TODO save to sent folder if non-gmail
reply(message.toJSON());
} catch (err) {
respondWithError(request, reply, err);
replyWithError(request, reply, err);
}
},
});
@ -65,10 +104,10 @@ module.exports = (server) => {
body: Joi.string().required(),
thread_id: Joi.string(),
reply_to_message_id: Joi.string(),
client_id: Joi.string(),
client_id: Joi.string().required(),
account_id: Joi.string(),
id: Joi.string(),
object: Joi.string(),
object: Joi.string().equal('draft'),
metadata: Joi.array().items(Joi.object()),
date: Joi.number(),
files: Joi.array().items(Joi.string()),
@ -81,22 +120,18 @@ module.exports = (server) => {
},
},
},
handler: async (request, reply) => {
async handler(request, reply) {
try {
const accountId = request.auth.credentials.id;
const db = await LocalDatabaseConnector.forAccount(accountId)
const draftData = Object.assign(request.payload, {
unread: true,
is_draft: false,
is_sent: false,
version: 0,
})
const draft = await SendingUtils.findOrCreateMessageFromJSON(draftData, db)
await (draft.isSending = true);
const savedDraft = await draft.save();
reply(savedDraft.toJSON());
const message = await MessageFactory.buildForSend(db,
Object.assign(request.payload, {draft: false})
)
message.setIsSending(true)
await message.save();
reply(message.toJSON());
} catch (err) {
respondWithError(request, reply, err);
replyWithError(request, reply, err);
}
},
});
@ -108,48 +143,53 @@ module.exports = (server) => {
// deleted from it.
server.route({
method: 'POST',
path: '/send-multiple/{draftId}',
path: '/send-multiple/{messageId}',
config: {
validate: {
params: {
draftId: Joi.string(),
messageId: Joi.string(),
},
payload: {
send_to: recipient,
body: Joi.string(),
send_to: recipient.required(),
body: Joi.string().required(),
},
},
},
handler: async (request, reply) => {
async handler(request, reply) {
try {
const requestStarted = new Date();
const requestStarted = Date.now()
const account = request.auth.credentials;
const {draftId} = request.params;
SendingUtils.validateBase36(draftId, 'draftId')
const {messageId} = request.params;
const sendTo = request.payload.send_to;
if (!Utils.isValidId(messageId)) {
throw new HTTPError(`messageId is not a base-36 integer`, 400)
}
const db = await LocalDatabaseConnector.forAccount(account.id)
const draft = await SendingUtils.findMultiSendDraft(draftId, db)
const {to, cc, bcc} = draft;
const recipients = [].concat(to, cc, bcc);
if (!recipients.find(contact => contact.email === sendTo.email)) {
throw new Errors.HTTPError(
const {Message} = db
const baseMessage = await Message.findMultiSendMessage(messageId)
if (!baseMessage.getRecipients().find(contact => contact.email === sendTo.email)) {
throw new HTTPError(
"Invalid sendTo, not present in message recipients",
400
);
}
const sender = new SendmailClient(account, request.logger);
if (new Date() - requestStarted > SEND_TIMEOUT) {
if (Date.now() - requestStarted > SEND_TIMEOUT_MS) {
// Preemptively time out the request if we got stuck doing database work
// -- we don't want clients to disconnect and then still send the
// message.
reply('Request timeout out.').code(504);
reply('Request timed out.').code(504);
}
const response = await sender.sendCustomBody(draft, request.payload.body, {to: [sendTo]})
const customMessage = Utils.copyModel(Message, baseMessage, {
body: MessageFactory.replaceBodyMessageIds(baseMessage.id, request.payload.body),
})
const sender = new SendmailClient(account, request.logger);
const response = await sender.sendCustom(customMessage, {to: [sendTo]})
reply(response);
} catch (err) {
respondWithError(request, reply, err);
replyWithError(request, reply, err);
}
},
});
@ -158,22 +198,26 @@ module.exports = (server) => {
// and moving it to the user's Sent folder.
server.route({
method: 'DELETE',
path: '/send-multiple/{draftId}',
path: '/send-multiple/{messageId}',
config: {
validate: {
params: {
draftId: Joi.string(),
messageId: Joi.string(),
},
},
},
handler: async (request, reply) => {
async handler(request, reply) {
try {
const account = request.auth.credentials;
const {draftId} = request.params;
SendingUtils.validateBase36(draftId);
const {messageId} = request.params;
if (!Utils.isValidId(messageId)) {
throw new HTTPError(`messageId is not a base-36 integer`, 400)
}
const db = await LocalDatabaseConnector.forAccount(account.id);
const draft = await SendingUtils.findMultiSendDraft(draftId, db);
const {Message} = db
const baseMessage = await Message.findMultiSendMessage(messageId);
// gmail creates sent messages for each one, go through and delete them
if (account.provider === 'gmail') {
@ -181,7 +225,7 @@ module.exports = (server) => {
await db.SyncbackRequest.create({
accountId: account.id,
type: "DeleteSentMessage",
props: { messageId: `${draft.id}@nylas.com` },
props: { headerMessageId: baseMessage.headerMessageId },
});
} catch (err) {
// Even if this fails, we need to finish the multi-send session,
@ -190,19 +234,19 @@ module.exports = (server) => {
}
const sender = new SendmailClient(account, request.logger);
const rawMime = await sender.buildMime(draft);
const rawMime = await sender.buildMime(baseMessage);
await db.SyncbackRequest.create({
accountId: account.id,
type: "SaveSentMessage",
props: {rawMime, messageId: `${draft.id}@nylas.com`},
props: {rawMime, headerMessageId: baseMessage.headerMessageId},
});
await (draft.isSent = true);
const savedDraft = await draft.save();
reply(savedDraft.toJSON());
baseMessage.setIsSent(true)
await baseMessage.save();
reply(baseMessage.toJSON());
} catch (err) {
respondWithError(request, reply, err);
replyWithError(request, reply, err);
}
},
});

View file

@ -1,39 +0,0 @@
const MessageFactory = require('../shared/message-factory')
const Errors = require('../shared/errors')
module.exports = {
findOrCreateMessageFromJSON: async (data, db) => {
const {Message} = db;
const existingMessage = await Message.findById(data.id);
if (existingMessage) {
return existingMessage;
}
return MessageFactory.associateFromJSON(data, db)
},
findMultiSendDraft: async (draftId, db) => {
const draft = await db.Message.findById(draftId)
if (!draft) {
throw new Errors.HTTPError(`Couldn't find multi-send draft ${draftId}`, 400);
}
if (draft.isSent || !draft.isSending) {
throw new Errors.HTTPError(`Message ${draftId} is not a multi-send draft`, 400);
}
return draft;
},
validateRecipientsPresent: (draft) => {
const {to, cc, bcc} = draft;
const recipients = [].concat(to, cc, bcc);
if (recipients.length === 0) {
throw new Errors.HTTPError("No recipients specified", 400);
}
},
validateBase36: (value, name) => {
if (value == null) { return; }
if (isNaN(parseInt(value, 36))) {
throw new Errors.HTTPError(`${name} is not a base-36 integer`, 400)
}
},
}

View file

@ -100,6 +100,7 @@ class SyncWorker {
const conn = new IMAPConnection({
db: this._db,
account: this._account,
settings: Object.assign({}, settings, credentials),
logger: this._logger,
});

View file

@ -10,7 +10,7 @@ class DeleteSentMessageGMAIL extends SyncbackTask {
}
async run(db, imap) {
const {messageId} = this.syncbackRequestObject().props
const {headerMessageId} = this.syncbackRequestObject().props
const trash = await db.Folder.find({where: {role: 'trash'}});
if (!trash) { throw new Error(`Could not find folder with role 'trash'.`) }
@ -26,7 +26,7 @@ class DeleteSentMessageGMAIL extends SyncbackTask {
for (const {folder, deleteFn} of steps) {
const box = await imap.openBox(folder.name);
const uids = await box.search([['HEADER', 'Message-ID', messageId]])
const uids = await box.search([['HEADER', 'Message-ID', headerMessageId]])
for (const uid of uids) {
await deleteFn(box, uid);
}

View file

@ -10,7 +10,7 @@ class SaveSentMessageIMAP extends SyncbackTask {
}
async run(db, imap) {
const {rawMime, messageId} = this.syncbackRequestObject().props;
const {rawMime, headerMessageId} = this.syncbackRequestObject().props;
// Non-gmail
const sentFolder = await db.Folder.find({where: {role: 'sent'}});
@ -23,9 +23,9 @@ class SaveSentMessageIMAP extends SyncbackTask {
const sentLabel = await db.Label.find({where: {role: 'sent'}});
const allMail = await db.Folder.find({where: {role: 'all'}});
if (sentLabel && allMail) {
let box = await imap.openBox(allMail.name);
const box = await imap.openBox(allMail.name);
await box.append(rawMime, {flags: 'SEEN'})
const uids = await box.search([['HEADER', 'Message-ID', messageId]])
const uids = await box.search([['HEADER', 'Message-ID', headerMessageId]])
// There should only be one uid in the array
return box.setLabels(uids[0], '\\Sent');
}

View file

@ -22,7 +22,7 @@ module.exports = (sequelize, Sequelize) => {
},
],
instanceMethods: {
fetch: function fetch({account, db, logger}) {
fetch({account, db, logger}) {
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
return PromiseUtils.props({
message: this.getMessage(),
@ -44,7 +44,7 @@ module.exports = (sequelize, Sequelize) => {
.finally(() => connection.end())
})
},
toJSON: function toJSON() {
toJSON() {
return {
id: this.id,
object: 'file',

View file

@ -1,5 +1,5 @@
const crypto = require('crypto')
const {DatabaseTypes: {buildJSONColumnOptions}} = require('isomorphic-core');
const {DatabaseTypes: {JSONColumn}} = require('isomorphic-core');
const {formatImapPath} = require('../shared/imap-paths-utils');
module.exports = (sequelize, Sequelize) => {
@ -9,7 +9,7 @@ module.exports = (sequelize, Sequelize) => {
version: Sequelize.INTEGER,
name: Sequelize.STRING,
role: Sequelize.STRING,
syncState: buildJSONColumnOptions('syncState'),
syncState: JSONColumn('syncState', {defaultValue: {}}),
}, {
indexes: [
{

View file

@ -1,11 +1,13 @@
const crypto = require('crypto')
const {PromiseUtils, IMAPConnection} = require('isomorphic-core')
const {DatabaseTypes: {buildJSONColumnOptions, buildJSONARRAYColumnOptions}} = require('isomorphic-core');
const {DatabaseTypes: {JSONColumn, JSONArrayColumn}} = require('isomorphic-core');
const striptags = require('striptags');
const SendingUtils = require('../local-api/sending-utils');
const {HTTPError} = require('../shared/errors');
const SNIPPET_LENGTH = 191;
const getValidateArrayLength = (fieldName, min, max) => {
function getLengthValidator(fieldName, min, max) {
return (stringifiedArr) => {
const arr = JSON.parse(stringifiedArr);
if ((arr.length < min) || (arr.length > max)) {
@ -14,6 +16,12 @@ const getValidateArrayLength = (fieldName, min, max) => {
};
}
function validateRecipientsPresent(message) {
if (message.getRecipients().length === 0) {
throw new HTTPError(`No recipients specified`, 400);
}
}
module.exports = (sequelize, Sequelize) => {
return sequelize.define('message', {
id: { type: Sequelize.STRING(65), primaryKey: true },
@ -22,58 +30,34 @@ module.exports = (sequelize, Sequelize) => {
headerMessageId: Sequelize.STRING,
gMsgId: { type: Sequelize.STRING, allowNull: true },
body: Sequelize.TEXT('long'),
headers: buildJSONColumnOptions('headers'),
headers: JSONColumn('headers'),
subject: Sequelize.STRING(500),
snippet: Sequelize.STRING(255),
date: Sequelize.DATE,
isDraft: Sequelize.BOOLEAN,
isSent: {
type: Sequelize.BOOLEAN,
set: async function set(val) {
if (val) {
this.isDraft = false;
this.date = (new Date()).getTime();
const thread = await this.getThread();
await thread.updateFromMessage(this)
}
this.setDataValue('isSent', val);
},
},
isSent: Sequelize.BOOLEAN,
isSending: Sequelize.BOOLEAN,
unread: Sequelize.BOOLEAN,
starred: Sequelize.BOOLEAN,
processed: Sequelize.INTEGER,
to: buildJSONARRAYColumnOptions('to'),
from: Object.assign(buildJSONARRAYColumnOptions('from'), {
validate: {validateArrayLength1: getValidateArrayLength('Message.from', 1, 1)},
to: JSONArrayColumn('to'),
from: JSONArrayColumn('from', {
validate: {validateArrayLength1: getLengthValidator('Message.from', 1, 1)},
allowNull: true,
}),
cc: buildJSONARRAYColumnOptions('cc'),
bcc: buildJSONARRAYColumnOptions('bcc'),
replyTo: Object.assign(buildJSONARRAYColumnOptions('replyTo'), {
validate: {validateArrayLength1: getValidateArrayLength('Message.replyTo', 0, 1)},
cc: JSONArrayColumn('cc'),
bcc: JSONArrayColumn('bcc'),
replyTo: JSONArrayColumn('replyTo', {
validate: {validateArrayLength1: getLengthValidator('Message.replyTo', 0, 1)},
allowNull: true,
}),
inReplyTo: { type: Sequelize.STRING, allowNull: true},
references: buildJSONARRAYColumnOptions('references'),
references: JSONArrayColumn('references'),
folderImapUID: { type: Sequelize.STRING, allowNull: true},
folderImapXGMLabels: { type: Sequelize.TEXT, allowNull: true},
isSending: {
type: Sequelize.BOOLEAN,
set: function set(val) {
if (val) {
if (this.isSent) {
throw new Error("Cannot mark a sent message as sending");
}
SendingUtils.validateRecipientsPresent(this);
this.isDraft = false;
this.regenerateHeaderMessageId();
}
this.setDataValue('isSending', val);
},
},
uploads: Object.assign(buildJSONARRAYColumnOptions('testFiles'), {
uploads: JSONArrayColumn('uploads', {
validate: {
uploadStructure: function uploadStructure(stringifiedArr) {
uploadStructure(stringifiedArr) {
const arr = JSON.parse(stringifiedArr);
const requiredKeys = ['filename', 'targetPath', 'id']
arr.forEach((upload) => {
@ -93,6 +77,16 @@ module.exports = (sequelize, Sequelize) => {
fields: ['id'],
},
],
hooks: {
beforeUpdate(message) {
// Update the snippet if the body has changed
if (!message.changed('body')) { return; }
const plainText = striptags(message.body);
// consolidate whitespace groups into single spaces and then truncate
message.snippet = plainText.split(/\s+/).join(" ").substring(0, SNIPPET_LENGTH)
},
},
classMethods: {
associate({Message, Folder, Label, File, Thread, MessageLabel}) {
Message.belongsTo(Thread)
@ -100,7 +94,32 @@ module.exports = (sequelize, Sequelize) => {
Message.belongsToMany(Label, {through: MessageLabel})
Message.hasMany(File)
},
requiredAssociationsForJSON: ({Folder, Label}) => {
hash({from = [], to = [], cc = [], bcc = [], date = '', subject = '', headerMessageId = ''} = {}) {
const emails = from.concat(to, cc, bcc)
.map(participant => participant.email)
.sort();
const participants = emails.join('')
const data = `${date}-${subject}-${participants}-${headerMessageId}`;
return crypto.createHash('sha256').update(data, 'utf8').digest('hex');
},
buildHeaderMessageId(id) {
return `<${id}@mailer.nylas.com>`
},
async findMultiSendMessage(messageId) {
const message = await this.findById(messageId)
if (!message) {
throw new HTTPError(`Couldn't find multi-send message ${messageId}`, 400);
}
if (message.isSent || !message.isSending) {
throw new HTTPError(`Message ${messageId} is not a multi-send message`, 400);
}
return message;
},
requiredAssociationsForJSON({Folder, Label}) {
return [
{model: Folder},
{model: Label},
@ -108,12 +127,36 @@ module.exports = (sequelize, Sequelize) => {
},
},
instanceMethods: {
getRecipients() {
const {to, cc, bcc} = this;
return [].concat(to, cc, bcc);
},
async setLabelsFromXGM(xGmLabels, {Label, preloadedLabels} = {}) {
this.folderImapXGMLabels = JSON.stringify(xGmLabels);
const labels = await Label.findXGMLabels(xGmLabels, {preloadedLabels})
return this.setLabels(labels);
},
setIsSent(val) {
if (val) {
this.isDraft = false
this.isSending = false
}
this.isSent = val
},
setIsSending(val) {
if (val) {
if (this.isSent || this.isSending) {
throw new HTTPError('Cannot mark a sent message as sending', 400);
}
validateRecipientsPresent(this);
this.isDraft = false;
}
this.isSending = val
},
fetchRaw({account, db, logger}) {
const settings = Object.assign({}, account.connectionSettings, account.decryptedCredentials())
return PromiseUtils.props({
@ -133,14 +176,6 @@ module.exports = (sequelize, Sequelize) => {
})
},
// The uid in this header is simply the draft id and version concatenated.
// Because this uid identifies the draft on the remote provider, we
// regenerate it on each draft revision so that we can delete the old draft
// and add the new one on the remote.
regenerateHeaderMessageId() {
this.headerMessageId = `<${this.id}-${this.version}@mailer.nylas.com>`
},
toJSON() {
if (this.folderId && !this.folder) {
throw new Error("Message.toJSON called on a message where folder were not eagerly loaded.")
@ -153,7 +188,7 @@ module.exports = (sequelize, Sequelize) => {
return {
id: this.id,
account_id: this.accountId,
object: 'message',
object: this.isDraft ? 'draft' : 'message',
body: this.body,
subject: this.subject,
snippet: this.snippet,
@ -171,16 +206,5 @@ module.exports = (sequelize, Sequelize) => {
};
},
},
hooks: {
beforeUpdate: (message) => {
// Update the snippet if the body has changed
if (!message.changed('body')) { return; }
const plainText = striptags(message.body);
// consolidate whitespace groups into single spaces and then truncate
message.snippet = plainText.split(/\s+/).join(" ").substring(0, SNIPPET_LENGTH)
},
},
});
};

View file

@ -1,4 +1,4 @@
const {DatabaseTypes: {buildJSONColumnOptions}} = require('isomorphic-core');
const {DatabaseTypes: {JSONColumn}} = require('isomorphic-core');
module.exports = (sequelize, Sequelize) => {
return sequelize.define('syncbackRequest', {
@ -8,8 +8,8 @@ module.exports = (sequelize, Sequelize) => {
defaultValue: "NEW",
allowNull: false,
},
error: buildJSONColumnOptions('error'),
props: buildJSONColumnOptions('props'),
error: JSONColumn('error'),
props: JSONColumn('props'),
accountId: { type: Sequelize.STRING, allowNull: false },
}, {
instanceMethods: {

View file

@ -1,4 +1,4 @@
const {DatabaseTypes: {buildJSONARRAYColumnOptions}} = require('isomorphic-core');
const {DatabaseTypes: {JSONArrayColumn}} = require('isomorphic-core');
module.exports = (sequelize, Sequelize) => {
return sequelize.define('thread', {
@ -20,7 +20,7 @@ module.exports = (sequelize, Sequelize) => {
lastMessageDate: Sequelize.DATE,
lastMessageReceivedDate: Sequelize.DATE,
lastMessageSentDate: Sequelize.DATE,
participants: buildJSONARRAYColumnOptions('participants'),
participants: JSONArrayColumn('participants'),
}, {
indexes: [
{ fields: ['id'], unique: true },

View file

@ -11,11 +11,21 @@ function processNewMessage(message, imapMessage) {
const {accountId} = message;
const logger = global.Logger.forAccount({id: accountId}).child({message})
const db = await LocalDatabaseConnector.forAccount(accountId);
const {Message} = db
try {
const existingMessage = await Message.findById(message.id)
if (existingMessage) {
// This is an extremely rare case when 2 or more /new/ messages with
// the exact same headers were queued for creation (same subject,
// participants, timestamp, and message-id header). In this case, we
// will ignore it and report the error
logger.warn({message}, 'MessageProcessor: Encountered 2 new messages with the same id')
return
}
const thread = await detectThread({db, message});
message.threadId = thread.id;
await db.Message.create(message);
await Message.create(message);
await extractFiles({db, message, imapMessage});
await extractContacts({db, message});
logger.info({

View file

@ -1,5 +1,4 @@
const _ = require('underscore');
const cryptography = require('crypto');
/* eslint no-useless-escape: 0 */
const mimelib = require('mimelib');
const encoding = require('encoding');
@ -27,30 +26,6 @@ function extractContacts(input) {
})
}
function getHeadersForId(data) {
let participants = "";
const emails = _.pluck(data.from.concat(data.to, data.cc, data.bcc), 'email');
emails.sort().forEach((email) => {
participants += email
});
return `${data.date}-${data.subject}-${participants}`;
}
function hashForHeaders(headers) {
return cryptography.createHash('sha256').update(headers, 'utf8').digest('hex');
}
function setReplyHeaders(newMessage, prevMessage) {
if (prevMessage.messageIdHeader) {
newMessage.inReplyTo = prevMessage.headerMessageId;
if (prevMessage.references) {
newMessage.references = prevMessage.references.concat(prevMessage.headerMessageId);
} else {
newMessage.references = [prevMessage.messageIdHeader];
}
}
}
/*
Iteratively walk the DOM of this document's <body>, calling the callback on
each node. Skip any nodes and the skipTags set, including their children.
@ -122,7 +97,7 @@ the message, and have to do fun stuff like deal with character sets and
content-transfer-encodings ourselves.
*/
async function parseFromImap(imapMessage, desiredParts, {db, accountId, folder}) {
const {Label} = db
const {Message, Label} = db
const {attributes} = imapMessage
const body = {}
@ -150,7 +125,6 @@ async function parseFromImap(imapMessage, desiredParts, {db, accountId, folder})
}
const parsedMessage = {
id: hashForHeaders(getHeadersForId(parsedHeaders)),
to: extractContacts(parsedHeaders.to),
cc: extractContacts(parsedHeaders.cc),
bcc: extractContacts(parsedHeaders.bcc),
@ -171,6 +145,7 @@ async function parseFromImap(imapMessage, desiredParts, {db, accountId, folder})
gMsgId: parsedHeaders['x-gm-msgid'],
subject: parsedHeaders.subject[0],
}
parsedMessage.id = Message.hash(parsedMessage)
if (!body['text/html'] && body['text/plain']) {
parsedMessage.body = HTMLifyPlaintext(body['text/plain']);
@ -189,48 +164,50 @@ async function parseFromImap(imapMessage, desiredParts, {db, accountId, folder})
return parsedMessage;
}
function fromJSON(db, data) {
// TODO: events, metadata?
const {Message} = db;
const id = hashForHeaders(getHeadersForId(data))
return Message.build({
accountId: data.account_id,
from: data.from,
to: data.to,
cc: data.cc,
bcc: data.bcc,
replyTo: data.reply_to,
subject: data.subject,
body: data.body,
unread: true,
isDraft: data.is_draft,
isSent: false,
version: 0,
date: data.date,
id: id,
uploads: data.uploads,
});
function getReplyHeaders(messageReplyingTo) {
let inReplyTo;
let references;
if (messageReplyingTo.headerMessageId) {
inReplyTo = messageReplyingTo.headerMessageId;
if (messageReplyingTo.references) {
references = messageReplyingTo.references.concat(messageReplyingTo.headerMessageId);
} else {
references = [messageReplyingTo.headerMessageId];
}
}
return {inReplyTo, references}
}
async function associateFromJSON(data, db) {
const {Thread, Message} = db;
const message = fromJSON(db, data);
function replaceBodyMessageIds(messageId, originalBody) {
const env = NylasEnv.config.get('env')
const serverUrl = {
local: 'http:\/\/lvh\.me:5100',
development: 'http:\/\/lvh\.me:5100',
staging: 'https:\/\/n1-staging\.nylas\.com',
production: 'https:\/\/n1\.nylas\.com',
}[env];
const regex = new RegExp(`(${serverUrl}.+?)MESSAGE_ID`, 'g')
return originalBody.replace(regex, `$1${messageId}`)
}
async function buildForSend(db, json) {
const {Thread, Message} = db
let replyToThread;
let replyToMessage;
if (data.thread_id != null) {
if (json.thread_id != null) {
replyToThread = await Thread.find({
where: {id: data.thread_id},
where: {id: json.thread_id},
include: [{
model: Message,
as: 'messages',
attributes: _.without(Object.keys(Message.attributes), 'body'),
attributes: ['id'],
}],
});
}
if (data.reply_to_message_id != null) {
replyToMessage = await Message.findById(data.reply_to_message_id);
if (json.reply_to_message_id != null) {
replyToMessage = await Message.findById(json.reply_to_message_id);
}
if (replyToThread && replyToMessage) {
@ -243,36 +220,48 @@ async function associateFromJSON(data, db) {
}
let thread;
let replyHeaders = {};
if (replyToMessage) {
setReplyHeaders(message, replyToMessage);
thread = await message.getThread();
replyHeaders = getReplyHeaders(replyToMessage);
thread = await replyToMessage.getThread();
} else if (replyToThread) {
thread = replyToThread;
const previousMessages = thread.messages.filter(msg => !msg.isDraft);
if (previousMessages.length > 0) {
const lastMessage = previousMessages[previousMessages.length - 1]
setReplyHeaders(message, lastMessage);
replyHeaders = getReplyHeaders(lastMessage);
}
} else {
thread = Thread.build({
accountId: message.accountId,
subject: message.subject,
firstMessageDate: message.date,
lastMessageDate: message.date,
lastMessageSentDate: message.date,
})
}
const savedMessage = await message.save();
const savedThread = await thread.save();
await savedThread.addMessage(savedMessage);
return savedMessage;
const {inReplyTo, references} = replyHeaders
const message = {
accountId: json.account_id,
threadId: thread ? thread.id : null,
headerMessageId: Message.buildHeaderMessageId(json.client_id),
from: json.from,
to: json.to,
cc: json.cc,
bcc: json.bcc,
references,
inReplyTo,
replyTo: json.reply_to,
subject: json.subject,
body: json.body,
unread: true,
isDraft: json.draft,
isSent: false,
version: 0,
date: new Date(),
uploads: json.uploads,
}
message.id = Message.hash(message)
message.body = replaceBodyMessageIds(message.id, message.body)
return Message.build(message)
}
module.exports = {
buildForSend,
parseFromImap,
extractSnippet,
fromJSON,
associateFromJSON,
replaceBodyMessageIds,
}

View file

@ -3,7 +3,7 @@
const fs = require('fs');
const nodemailer = require('nodemailer');
const mailcomposer = require('mailcomposer');
const {HTTPError} = require('../shared/errors');
const {HTTPError} = require('./errors');
const MAX_RETRIES = 1;
@ -12,6 +12,7 @@ const formatParticipants = (participants) => {
}
class SendmailClient {
constructor(account, logger) {
this._transporter = nodemailer.createTransport(account.smtpConfig());
this._logger = logger;
@ -59,20 +60,20 @@ class SendmailClient {
throw new HTTPError('Sending failed', 500, err);
}
_draftToMsgData(draft) {
_getSendPayload(message) {
const msgData = {};
for (const field of ['from', 'to', 'cc', 'bcc']) {
if (draft[field]) {
msgData[field] = formatParticipants(draft[field])
if (message[field]) {
msgData[field] = formatParticipants(message[field])
}
}
msgData.date = draft.date;
msgData.subject = draft.subject;
msgData.html = draft.body;
msgData.messageId = `${draft.id}@nylas.com`;
msgData.date = message.date;
msgData.subject = message.subject;
msgData.html = message.body;
msgData.messageId = message.headerMessageId;
msgData.attachments = []
for (const upload of draft.uploads) {
for (const upload of message.uploads) {
msgData.attachments.push({
filename: upload.filename,
content: fs.createReadStream(upload.targetPath),
@ -80,62 +81,45 @@ class SendmailClient {
})
}
if (draft.replyTo) {
msgData.replyTo = formatParticipants(draft.replyTo);
if (message.replyTo) {
msgData.replyTo = formatParticipants(message.replyTo);
}
msgData.inReplyTo = draft.inReplyTo;
msgData.references = draft.references;
msgData.headers = draft.headers;
msgData.inReplyTo = message.inReplyTo;
msgData.references = message.references;
msgData.headers = message.headers;
msgData.headers['User-Agent'] = `NylasMailer-K2`
return msgData;
}
_replaceBodyMessageIds(body, id) {
const serverUrl = {
local: 'http:\/\/lvh\.me:5100',
development: 'http:\/\/lvh\.me:5100',
staging: 'https:\/\/n1-staging\.nylas\.com',
production: 'https:\/\/n1\.nylas\.com',
}[process.env];
const regex = new RegExp(`${serverUrl}.+MESSAGE_ID`, 'g')
return body.replace(regex, (match) => {
return match.replace('MESSAGE_ID', id)
})
}
async buildMime(draft) {
const builder = mailcomposer(this._draftToMsgData(draft))
async buildMime(message) {
const payload = this._getSendPayload(message)
const builder = mailcomposer(payload)
const mimeNode = await (new Promise((resolve, reject) => {
builder.build((error, result) => {
builder.build((error, result) => (
error ? reject(error) : resolve(result)
})
))
}));
return mimeNode.toString('ascii')
}
async send(draft) {
if (draft.isSent) {
throw new Error(`Cannot send message ${draft.id}, it has already been sent`);
async send(message) {
if (message.isSent) {
throw new Error(`Cannot send message ${message.id}, it has already been sent`);
}
await this._send(this._draftToMsgData(draft));
await (draft.isSent = true);
await draft.save();
const payload = this._getSendPayload(message)
await this._send(payload);
}
async sendCustomBody(draft, body, recipients) {
const origBody = draft.body;
draft.body = this._replaceBodyMessageIds(body);
async sendCustom(customMessage, recipients) {
const envelope = {};
for (const field of Object.keys(recipients)) {
envelope[field] = recipients[field].map(r => r.email);
}
const raw = await this.buildMime(draft);
const responseOnSuccess = draft.toJSON();
draft.body = origBody;
const raw = await this.buildMime(customMessage);
await this._send({raw, envelope});
return responseOnSuccess;
return customMessage.toJSON();
}
}

View file

@ -0,0 +1,25 @@
module.exports = {
copyModel(Model, model, updates = {}) {
const fields = Object.keys(model.dataValues)
const data = {}
for (const field of fields) {
// We can't just copy over the values directly from `dataValues` because
// they are the raw values, and we would ignore custom getters.
// Rather, we access them from the model instance.
// For example our JSON database type, is simply a string and the custom
// getter parses it into json. We want to get the parsed json, not the
// string
data[field] = model[field]
}
return Model.build(Object.assign({}, data, updates))
},
isValidId(value) {
if (value == null) { return false; }
if (isNaN(parseInt(value, 36))) {
return false
}
return true
},
}