2016-06-22 08:51:24 +08:00
const _ = require ( 'underscore' ) ;
2016-06-28 07:05:31 +08:00
const Imap = require ( 'imap' ) ;
2016-06-29 09:01:43 +08:00
const { IMAPConnection , PubsubConnector } = require ( 'nylas-core' ) ;
2016-06-23 08:19:48 +08:00
const { Capabilities } = IMAPConnection ;
2016-06-21 05:57:54 +08:00
2016-06-30 01:36:32 +08:00
const MessageFlagAttributes = [ 'id' , 'categoryUID' , 'unread' , 'starred' ]
2016-06-21 05:44:02 +08:00
2016-06-24 02:20:47 +08:00
class FetchMessagesInCategory {
2016-06-21 08:33:23 +08:00
constructor ( category , options ) {
2016-06-26 16:57:33 +08:00
this . _imap = null
this . _box = null
this . _db = null
2016-06-21 05:44:02 +08:00
this . _category = category ;
2016-06-21 08:33:23 +08:00
this . _options = options ;
2016-06-21 05:44:02 +08:00
if ( ! this . _category ) {
2016-06-28 07:01:21 +08:00
throw new NylasError ( "FetchMessagesInCategory requires a category" )
2016-06-21 05:44:02 +08:00
}
}
description ( ) {
2016-06-24 02:20:47 +08:00
return ` FetchMessagesInCategory ( ${ this . _category . name } - ${ this . _category . id } ) \n Options: ${ JSON . stringify ( this . _options ) } ` ;
2016-06-21 05:44:02 +08:00
}
2016-06-22 05:58:20 +08:00
_getLowerBoundUID ( count ) {
2016-06-21 08:33:23 +08:00
return count ? Math . max ( 1 , this . _box . uidnext - count ) : 1 ;
}
_recoverFromUIDInvalidity ( ) {
2016-06-22 05:58:20 +08:00
// UID invalidity means the server has asked us to delete all the UIDs for
2016-06-28 07:05:31 +08:00
// this folder and start from scratch. Instead of deleting all the messages,
// we just remove the category ID and UID. We may re-assign the same message
// the same UID. Otherwise they're eventually garbage collected.
2016-06-22 05:58:20 +08:00
const { Message } = this . _db ;
2016-06-21 08:33:23 +08:00
return this . _db . sequelize . transaction ( ( transaction ) =>
2016-06-22 05:58:20 +08:00
Message . update ( {
2016-06-30 01:36:32 +08:00
categoryUID : null ,
categoryId : null ,
2016-06-22 05:58:20 +08:00
} , {
transaction : transaction ,
2016-06-21 08:33:23 +08:00
where : {
2016-06-30 01:36:32 +08:00
categoryId : this . _category . id ,
2016-06-21 08:33:23 +08:00
} ,
2016-06-22 05:58:20 +08:00
} )
2016-06-21 08:33:23 +08:00
)
2016-06-21 05:44:02 +08:00
}
2016-06-29 06:30:51 +08:00
_updateMessageAttributes ( remoteUIDAttributes , localMessageAttributes ) {
2016-06-22 05:58:20 +08:00
const messageAttributesMap = { } ;
for ( const msg of localMessageAttributes ) {
2016-06-30 01:36:32 +08:00
messageAttributesMap [ msg . categoryUID ] = msg ;
2016-06-22 05:58:20 +08:00
}
const createdUIDs = [ ] ;
const changedMessages = [ ] ;
Object . keys ( remoteUIDAttributes ) . forEach ( ( uid ) => {
const msg = messageAttributesMap [ uid ] ;
const flags = remoteUIDAttributes [ uid ] . flags ;
if ( ! msg ) {
createdUIDs . push ( uid ) ;
return ;
}
const unread = ! flags . includes ( '\\Seen' ) ;
const starred = flags . includes ( '\\Flagged' ) ;
if ( msg . unread !== unread || msg . starred !== starred ) {
msg . unread = unread ;
msg . starred = starred ;
changedMessages . push ( msg ) ;
}
} )
2016-06-29 06:30:51 +08:00
console . log ( ` --- found ${ changedMessages . length || 'no' } flag changes ` )
if ( createdUIDs . length > 0 ) {
console . log ( ` --- found ${ createdUIDs . length } new messages. These will not be processed because we assume that they will be assigned uid = uidnext, and will be picked up in the next sync when we discover unseen messages. ` )
}
return this . _db . sequelize . transaction ( ( transaction ) =>
Promise . all ( changedMessages . map ( m => m . save ( {
fields : MessageFlagAttributes ,
transaction ,
} ) ) )
) ;
2016-06-22 05:58:20 +08:00
}
_removeDeletedMessages ( remoteUIDAttributes , localMessageAttributes ) {
const { Message } = this . _db ;
const removedUIDs = localMessageAttributes
2016-06-30 01:36:32 +08:00
. filter ( msg => ! remoteUIDAttributes [ msg . categoryUID ] )
. map ( msg => msg . categoryUID )
2016-06-22 05:58:20 +08:00
2016-06-29 06:30:51 +08:00
console . log ( ` --- found ${ removedUIDs . length } messages no longer in the folder ` )
2016-06-21 05:44:02 +08:00
if ( removedUIDs . length === 0 ) {
return Promise . resolve ( ) ;
}
return this . _db . sequelize . transaction ( ( transaction ) =>
2016-06-22 05:58:20 +08:00
Message . update ( {
2016-06-30 01:36:32 +08:00
categoryUID : null ,
categoryId : null ,
2016-06-22 05:58:20 +08:00
} , {
transaction ,
2016-06-21 08:33:23 +08:00
where : {
2016-06-30 01:36:32 +08:00
categoryId : this . _category . id ,
categoryUID : removedUIDs ,
2016-06-21 08:33:23 +08:00
} ,
2016-06-22 05:58:20 +08:00
} )
2016-06-21 05:44:02 +08:00
) ;
}
2016-06-28 07:05:31 +08:00
_getDesiredMIMEParts ( struct ) {
const desired = [ ] ;
const available = [ ] ;
const unseen = [ struct ] ;
while ( unseen . length > 0 ) {
const part = unseen . shift ( ) ;
if ( part instanceof Array ) {
unseen . push ( ... part ) ;
} else {
const mimetype = ` ${ part . type } / ${ part . subtype } ` ;
available . push ( mimetype ) ;
if ( [ 'text/plain' , 'text/html' , 'application/pgp-encrypted' ] . includes ( mimetype ) ) {
desired . push ( { id : part . partID , mimetype } ) ;
}
}
}
if ( desired . length === 0 ) {
console . warn ( ` Could not find good part. Options are: ${ available . join ( ', ' ) } ` )
}
return desired ;
}
2016-06-26 16:57:33 +08:00
_fetchMessagesAndQueueForProcessing ( range ) {
2016-06-28 07:05:31 +08:00
const uidsByPart = { } ;
const $structs = this . _box . fetch ( range , { struct : true } )
$structs . subscribe ( ( { attributes } ) => {
const desiredParts = this . _getDesiredMIMEParts ( attributes . struct ) ;
if ( desiredParts . length === 0 ) {
return ;
}
const key = JSON . stringify ( desiredParts ) ;
2016-06-29 06:30:51 +08:00
console . log ( key ) ;
2016-06-28 07:05:31 +08:00
uidsByPart [ key ] = uidsByPart [ key ] || [ ] ;
uidsByPart [ key ] . push ( attributes . uid ) ;
} ) ;
return $structs . toPromise ( ) . then ( ( ) => {
return Promise . each ( Object . keys ( uidsByPart ) , ( key ) => {
const uids = uidsByPart [ key ] ;
const desiredParts = JSON . parse ( key ) ;
const bodies = [ 'HEADER' ] . concat ( desiredParts . map ( p => p . id ) ) ;
console . log ( ` Fetching parts ${ key } for ${ uids . length } messages ` )
2016-06-29 06:30:51 +08:00
// note: the order of UIDs in the array doesn't matter, Gmail always
// returns them in ascending (oldest => newest) order.
2016-06-28 07:05:31 +08:00
const $body = this . _box . fetch ( uids , { bodies , struct : true } )
$body . subscribe ( ( msg ) => {
2016-06-29 06:30:51 +08:00
console . log ( ` Fetched message ${ msg . attributes . uid } ` )
2016-06-28 07:05:31 +08:00
msg . body = { } ;
for ( const { id , mimetype } of desiredParts ) {
msg . body [ mimetype ] = msg . parts [ id ] ;
}
this . _processMessage ( msg ) ;
} ) ;
2016-06-29 06:30:51 +08:00
2016-06-28 07:05:31 +08:00
return $body . toPromise ( ) ;
} )
} ) ;
2016-06-26 16:57:33 +08:00
}
2016-06-21 05:44:02 +08:00
2016-06-29 02:32:15 +08:00
_createFilesFromStruct ( { message , struct } ) {
const { File } = this . _db
for ( const part of struct ) {
if ( part . constructor === Array ) {
this . _createFilesFromStruct ( { message , struct : part } )
} else if ( part . disposition ) {
let filename = null
if ( part . disposition . params ) {
2016-06-29 04:56:57 +08:00
filename = part . disposition . params . filename
2016-06-29 02:32:15 +08:00
}
File . create ( {
2016-06-29 04:55:00 +08:00
filename : filename ,
contentId : part . partID ,
contentType : ` ${ part . type } / ${ part . subtype } ` ,
2016-06-30 01:36:32 +08:00
accountId : this . _db . accountId ,
2016-06-29 02:32:15 +08:00
size : part . size ,
} )
. then ( ( file ) => {
file . setMessage ( message )
message . addFile ( file )
} )
}
}
}
2016-06-26 16:57:33 +08:00
_processMessage ( { attributes , headers , body } ) {
const { Message , accountId } = this . _db ;
2016-06-21 05:44:02 +08:00
const hash = Message . hashForHeaders ( headers ) ;
2016-06-28 07:05:31 +08:00
2016-06-29 09:01:43 +08:00
const parsedHeaders = Imap . parseHeader ( headers ) ;
for ( const key of [ 'x-gm-thrid' , 'x-gm-msgid' , 'x-gm-labels' ] ) {
parsedHeaders [ key ] = attributes [ key ] ;
}
2016-06-22 05:58:20 +08:00
const values = {
hash : hash ,
2016-06-30 01:36:32 +08:00
accountId : this . _db . accountId ,
2016-06-28 07:05:31 +08:00
body : body [ 'text/html' ] || body [ 'text/plain' ] || body [ 'application/pgp-encrypted' ] || '' ,
snippet : body [ 'text/plain' ] || null ,
2016-06-22 05:58:20 +08:00
unread : ! attributes . flags . includes ( '\\Seen' ) ,
starred : attributes . flags . includes ( '\\Flagged' ) ,
date : attributes . date ,
2016-06-30 01:36:32 +08:00
categoryUID : attributes . uid ,
categoryId : this . _category . id ,
2016-06-29 09:01:43 +08:00
headers : parsedHeaders ,
messageId : parsedHeaders [ 'message-id' ] [ 0 ] ,
subject : parsedHeaders . subject [ 0 ] ,
2016-06-22 05:58:20 +08:00
}
2016-06-28 07:05:31 +08:00
2016-06-22 05:58:20 +08:00
Message . find ( { where : { hash } } ) . then ( ( existing ) => {
if ( existing ) {
Object . assign ( existing , values ) ;
2016-06-28 07:05:31 +08:00
existing . save ( ) ;
return ;
2016-06-22 05:58:20 +08:00
}
2016-06-28 07:05:31 +08:00
2016-06-29 02:32:15 +08:00
Message . create ( values ) . then ( ( created ) => {
this . _createFilesFromStruct ( { message : created , struct : attributes . struct } )
2016-06-29 09:01:43 +08:00
PubsubConnector . queueProcessMessage ( { accountId , messageId : created . id } ) ;
2016-06-29 02:32:15 +08:00
} )
2016-06-22 05:58:20 +08:00
} )
2016-06-28 07:05:31 +08:00
return null ;
2016-06-21 05:44:02 +08:00
}
2016-06-21 08:33:23 +08:00
_openMailboxAndEnsureValidity ( ) {
2016-06-26 16:57:33 +08:00
return this . _imap . openBox ( this . _category . name )
. then ( ( box ) => {
2016-06-21 05:44:02 +08:00
if ( box . persistentUIDs === false ) {
2016-06-28 07:01:21 +08:00
return Promise . reject ( new NylasError ( "Mailbox does not support persistentUIDs." ) )
2016-06-21 05:44:02 +08:00
}
if ( box . uidvalidity !== this . _category . syncState . uidvalidity ) {
2016-06-26 16:57:33 +08:00
return this . _recoverFromUIDInvalidity ( )
. then ( ( ) => Promise . resolve ( box ) )
2016-06-21 05:44:02 +08:00
}
2016-06-26 16:57:33 +08:00
return Promise . resolve ( box ) ;
2016-06-21 05:44:02 +08:00
} )
}
2016-06-29 06:30:51 +08:00
_fetchUnsyncedMessages ( ) {
2016-06-21 05:44:02 +08:00
const savedSyncState = this . _category . syncState ;
2016-06-29 06:30:51 +08:00
const isFirstSync = ! savedSyncState . fetchedmax ;
const boxUidnext = this . _box . uidnext ;
const boxUidvalidity = this . _box . uidvalidity ;
2016-06-21 05:44:02 +08:00
2016-06-29 06:30:51 +08:00
const desiredRanges = [ ] ;
2016-06-22 05:58:20 +08:00
2016-06-29 06:30:51 +08:00
console . log ( ` - Fetching messages. Currently have range: ${ savedSyncState . fetchedmin } : ${ savedSyncState . fetchedmax } ` )
2016-06-22 05:58:20 +08:00
2016-06-29 06:30:51 +08:00
// Todo: In the future, this is where logic should go that limits
// sync based on number of messages / age of messages.
if ( isFirstSync ) {
const lowerbound = Math . max ( 1 , boxUidnext - 150 ) ;
desiredRanges . push ( { min : lowerbound , max : boxUidnext } )
} else {
if ( savedSyncState . fetchedmax < boxUidnext ) {
desiredRanges . push ( { min : savedSyncState . fetchedmax , max : boxUidnext } )
} else {
console . log ( " --- fetchedmax == uidnext, nothing more recent to fetch." )
}
if ( savedSyncState . fetchedmin > 1 ) {
const lowerbound = Math . max ( 1 , savedSyncState . fetchedmin - 1000 ) ;
desiredRanges . push ( { min : lowerbound , max : savedSyncState . fetchedmin } )
} else {
console . log ( " --- fetchedmin == 1, nothing older to fetch." )
2016-06-21 05:44:02 +08:00
}
}
2016-06-29 06:30:51 +08:00
return Promise . each ( desiredRanges , ( { min , max } ) => {
console . log ( ` --- fetching range: ${ min } : ${ max } ` ) ;
2016-06-21 05:44:02 +08:00
2016-06-29 06:30:51 +08:00
return this . _fetchMessagesAndQueueForProcessing ( ` ${ min } : ${ max } ` ) . then ( ( ) => {
const { fetchedmin , fetchedmax } = this . _category . syncState ;
2016-06-24 02:20:47 +08:00
return this . updateCategorySyncState ( {
2016-06-29 06:30:51 +08:00
fetchedmin : fetchedmin ? Math . min ( fetchedmin , min ) : min ,
fetchedmax : fetchedmax ? Math . max ( fetchedmax , max ) : max ,
uidvalidity : boxUidvalidity ,
timeFetchedUnseen : Date . now ( ) ,
} ) ;
2016-06-24 02:20:47 +08:00
} )
2016-06-29 06:30:51 +08:00
} ) . then ( ( ) => {
console . log ( ` - Fetching messages finished ` ) ;
2016-06-28 07:05:31 +08:00
} ) ;
2016-06-24 02:20:47 +08:00
}
2016-06-29 06:30:51 +08:00
_runScan ( ) {
const { fetchedmin , fetchedmax } = this . _category . syncState ;
if ( ! fetchedmin || ! fetchedmax ) {
throw new Error ( "Unseen messages must be fetched at least once before the first update/delete scan." )
}
return this . _shouldRunDeepScan ( ) ? this . _runDeepScan ( ) : this . _runShallowScan ( )
}
2016-06-21 05:44:02 +08:00
2016-06-29 06:30:51 +08:00
_shouldRunDeepScan ( ) {
const { timeDeepScan } = this . _category . syncState ;
return Date . now ( ) - ( timeDeepScan || 0 ) > this . _options . deepFolderScan ;
}
2016-06-21 05:44:02 +08:00
2016-06-29 06:30:51 +08:00
_runShallowScan ( ) {
const { highestmodseq } = this . _category . syncState ;
const nextHighestmodseq = this . _box . highestmodseq ;
2016-06-21 05:44:02 +08:00
2016-06-22 05:58:20 +08:00
let shallowFetch = null ;
2016-06-29 06:30:51 +08:00
2016-06-22 05:58:20 +08:00
if ( this . _imap . serverSupports ( Capabilities . Condstore ) ) {
2016-06-29 06:30:51 +08:00
console . log ( ` - Shallow attribute scan (using CONDSTORE) ` )
2016-06-22 05:58:20 +08:00
if ( nextHighestmodseq === highestmodseq ) {
console . log ( " --- highestmodseq matches, nothing more to fetch" )
return Promise . resolve ( ) ;
}
2016-06-29 06:30:51 +08:00
shallowFetch = this . _box . fetchUIDAttributes ( ` 1:* ` , { changedsince : highestmodseq } ) ;
2016-06-22 05:58:20 +08:00
} else {
2016-06-29 06:30:51 +08:00
const range = ` ${ this . _getLowerBoundUID ( 1000 ) } :* ` ;
console . log ( ` - Shallow attribute scan (using range: ${ range } ) ` )
shallowFetch = this . _box . fetchUIDAttributes ( range ) ;
2016-06-22 05:58:20 +08:00
}
2016-06-21 05:44:02 +08:00
2016-06-26 16:57:33 +08:00
return shallowFetch
. then ( ( remoteUIDAttributes ) => (
2016-06-24 02:20:47 +08:00
this . _db . Message . findAll ( {
2016-06-30 01:36:32 +08:00
where : { categoryId : this . _category . id } ,
2016-06-22 05:58:20 +08:00
attributes : MessageFlagAttributes ,
2016-06-26 16:57:33 +08:00
} )
. then ( ( localMessageAttributes ) => (
2016-06-29 06:30:51 +08:00
this . _updateMessageAttributes ( remoteUIDAttributes , localMessageAttributes )
2016-06-26 16:57:33 +08:00
) )
. then ( ( ) => {
2016-06-29 06:30:51 +08:00
console . log ( ` - finished fetching changes to messages ` ) ;
2016-06-22 08:51:24 +08:00
return this . updateCategorySyncState ( {
2016-06-22 05:58:20 +08:00
highestmodseq : nextHighestmodseq ,
timeShallowScan : Date . now ( ) ,
2016-06-26 16:57:33 +08:00
} )
2016-06-22 05:58:20 +08:00
} )
2016-06-26 16:57:33 +08:00
) )
2016-06-21 05:44:02 +08:00
}
2016-06-29 06:30:51 +08:00
_runDeepScan ( ) {
const { Message } = this . _db ;
const { fetchedmin , fetchedmax } = this . _category . syncState ;
const range = ` ${ fetchedmin } : ${ fetchedmax } ` ;
console . log ( ` - Deep attribute scan: fetching attributes in range: ${ range } ` )
return this . _box . fetchUIDAttributes ( range )
. then ( ( remoteUIDAttributes ) => {
return Message . findAll ( {
2016-06-30 01:36:32 +08:00
where : { categoryId : this . _category . id } ,
2016-06-29 06:30:51 +08:00
attributes : MessageFlagAttributes ,
} )
. then ( ( localMessageAttributes ) => (
Promise . props ( {
updates : this . _updateMessageAttributes ( remoteUIDAttributes , localMessageAttributes ) ,
deletes : this . _removeDeletedMessages ( remoteUIDAttributes , localMessageAttributes ) ,
} )
) )
. then ( ( ) => {
console . log ( ` - Deep scan finished. ` ) ;
return this . updateCategorySyncState ( {
highestmodseq : this . _box . highestmodseq ,
timeDeepScan : Date . now ( ) ,
timeShallowScan : Date . now ( ) ,
} )
} )
} ) ;
}
2016-06-22 08:51:24 +08:00
updateCategorySyncState ( newState ) {
if ( _ . isMatch ( this . _category . syncState , newState ) ) {
return Promise . resolve ( ) ;
}
this . _category . syncState = Object . assign ( this . _category . syncState , newState ) ;
return this . _category . save ( ) ;
}
2016-06-21 05:44:02 +08:00
run ( db , imap ) {
this . _db = db ;
this . _imap = imap ;
2016-06-29 06:30:51 +08:00
return this . _openMailboxAndEnsureValidity ( ) . then ( ( box ) => {
2016-06-26 16:57:33 +08:00
this . _box = box
2016-06-29 06:30:51 +08:00
return this . _fetchUnsyncedMessages ( ) . then ( ( ) =>
this . _runScan ( )
)
2016-06-26 16:57:33 +08:00
} )
2016-06-21 05:44:02 +08:00
}
}
2016-06-24 02:20:47 +08:00
module . exports = FetchMessagesInCategory ;