2018-03-25 23:09:17 +08:00
import utils from './utils.js' ;
2019-10-20 16:00:18 +08:00
import toastService from "./toast.js" ;
2019-12-10 06:07:45 +08:00
import server from "./server.js" ;
2017-11-29 06:52:47 +08:00
2018-07-25 03:43:15 +08:00
const $outstandingSyncsCount = $ ( "#outstanding-syncs-count" ) ;
2017-11-29 06:52:47 +08:00
2019-08-07 04:39:27 +08:00
const allSyncMessageHandlers = [ ] ;
const outsideSyncMessageHandlers = [ ] ;
2018-03-26 09:16:57 +08:00
const messageHandlers = [ ] ;
2018-03-26 01:08:58 +08:00
let ws ;
2019-12-03 05:27:06 +08:00
let lastAcceptedSyncId = window . glob . maxSyncIdAtLoad ;
let lastProcessedSyncId = window . glob . maxSyncIdAtLoad ;
2018-03-26 01:08:58 +08:00
let lastPingTs ;
2019-10-20 23:49:58 +08:00
let syncDataQueue = [ ] ;
2018-03-26 01:08:58 +08:00
2018-03-25 23:09:17 +08:00
function logError ( message ) {
console . log ( utils . now ( ) , message ) ; // needs to be separate from .trace()
console . trace ( ) ;
2017-12-20 12:22:21 +08:00
2018-03-25 23:09:17 +08:00
if ( ws && ws . readyState === 1 ) {
ws . send ( JSON . stringify ( {
type : 'log-error' ,
error : message
} ) ) ;
}
}
2017-12-18 02:46:18 +08:00
2018-03-26 09:16:57 +08:00
function subscribeToMessages ( messageHandler ) {
messageHandlers . push ( messageHandler ) ;
}
2019-08-07 04:39:27 +08:00
function subscribeToOutsideSyncMessages ( messageHandler ) {
outsideSyncMessageHandlers . push ( messageHandler ) ;
}
function subscribeToAllSyncMessages ( messageHandler ) {
allSyncMessageHandlers . push ( messageHandler ) ;
2018-08-01 15:26:02 +08:00
}
2019-10-20 23:49:58 +08:00
// used to serialize sync operations
let consumeQueuePromise = null ;
async function handleMessage ( event ) {
2018-03-25 23:09:17 +08:00
const message = JSON . parse ( event . data ) ;
2017-12-20 12:22:21 +08:00
2018-08-01 15:26:02 +08:00
for ( const messageHandler of messageHandlers ) {
messageHandler ( message ) ;
}
2018-03-25 23:09:17 +08:00
if ( message . type === 'sync' ) {
2019-10-29 02:45:36 +08:00
const syncRows = message . data ;
2019-02-10 23:36:25 +08:00
lastPingTs = Date . now ( ) ;
2018-01-07 11:56:54 +08:00
2019-10-20 23:49:58 +08:00
$outstandingSyncsCount . html ( message . outstandingSyncs ) ;
2019-10-29 02:45:36 +08:00
if ( syncRows . length > 0 ) {
console . debug ( utils . now ( ) , "Sync data: " , syncRows ) ;
2017-11-29 06:52:47 +08:00
2019-10-29 02:45:36 +08:00
syncDataQueue . push ( ... syncRows ) ;
2017-11-29 06:52:47 +08:00
2019-12-17 05:47:07 +08:00
// we set lastAcceptedSyncId even before sync processing and send ping so that backend can start sending more updates
lastAcceptedSyncId = Math . max ( lastAcceptedSyncId , syncRows [ syncRows . length - 1 ] . id ) ;
sendPing ( ) ;
2019-10-20 23:49:58 +08:00
// first wait for all the preceding consumers to finish
while ( consumeQueuePromise ) {
await consumeQueuePromise ;
}
2019-08-07 04:39:27 +08:00
2019-12-17 05:00:44 +08:00
try {
// it's my turn so start it up
consumeQueuePromise = consumeSyncData ( ) ;
2017-11-29 06:52:47 +08:00
2019-12-17 05:00:44 +08:00
await consumeQueuePromise ;
}
finally {
// finish and set to null to signal somebody else can pick it up
consumeQueuePromise = null ;
}
2019-10-20 23:49:58 +08:00
}
2018-03-25 23:09:17 +08:00
}
else if ( message . type === 'sync-hash-check-failed' ) {
2019-10-20 16:00:18 +08:00
toastService . showError ( "Sync check failed!" , 60000 ) ;
2017-11-29 06:52:47 +08:00
}
2018-03-25 23:09:17 +08:00
else if ( message . type === 'consistency-checks-failed' ) {
2019-10-20 16:00:18 +08:00
toastService . showError ( "Consistency checks failed! See logs for details." , 50 * 60000 ) ;
2018-03-25 23:09:17 +08:00
}
}
2019-10-20 23:49:58 +08:00
let syncIdReachedListeners = [ ] ;
function waitForSyncId ( desiredSyncId ) {
2019-12-03 05:27:06 +08:00
if ( desiredSyncId <= lastProcessedSyncId ) {
2019-10-20 23:49:58 +08:00
return Promise . resolve ( ) ;
}
return new Promise ( ( res , rej ) => {
syncIdReachedListeners . push ( {
desiredSyncId ,
2019-10-25 05:02:29 +08:00
resolvePromise : res ,
start : Date . now ( )
2019-10-20 23:49:58 +08:00
} )
} ) ;
}
2019-12-10 06:07:45 +08:00
function waitForMaxKnownSyncId ( ) {
return waitForSyncId ( server . getMaxKnownSyncId ( ) ) ;
}
2019-10-29 02:45:36 +08:00
function checkSyncIdListeners ( ) {
syncIdReachedListeners
2019-12-03 05:27:06 +08:00
. filter ( l => l . desiredSyncId <= lastProcessedSyncId )
2019-10-29 02:45:36 +08:00
. forEach ( l => l . resolvePromise ( ) ) ;
syncIdReachedListeners = syncIdReachedListeners
2019-12-03 05:27:06 +08:00
. filter ( l => l . desiredSyncId > lastProcessedSyncId ) ;
2019-10-29 02:45:36 +08:00
syncIdReachedListeners . filter ( l => Date . now ( ) > l . start - 60000 )
2019-12-03 05:27:06 +08:00
. forEach ( l => console . log ( ` Waiting for syncId ${ l . desiredSyncId } while current is ${ lastProcessedSyncId } for ${ Math . floor ( ( Date . now ( ) - l . start ) / 1000 ) } s ` ) ) ;
2019-10-29 02:45:36 +08:00
}
2019-12-17 05:00:44 +08:00
async function runSafely ( syncHandler , syncData ) {
try {
return await syncHandler ( syncData ) ;
}
catch ( e ) {
console . log ( ` Sync handler failed with ${ e . message } : ${ e . stack } ` ) ;
}
}
2019-10-20 23:49:58 +08:00
async function consumeSyncData ( ) {
2019-10-31 02:43:17 +08:00
if ( syncDataQueue . length > 0 ) {
const allSyncData = syncDataQueue ;
2019-10-20 23:49:58 +08:00
syncDataQueue = [ ] ;
const outsideSyncData = allSyncData . filter ( sync => sync . sourceId !== glob . sourceId ) ;
2019-12-17 05:47:07 +08:00
try {
// the update process should be synchronous as a whole but individual handlers can run in parallel
await Promise . all ( [
... allSyncMessageHandlers . map ( syncHandler => runSafely ( syncHandler , allSyncData ) ) ,
... outsideSyncMessageHandlers . map ( syncHandler => runSafely ( syncHandler , outsideSyncData ) )
] ) ;
}
catch ( e ) {
logError ( ` Encountered error ${ e . message } , reloading frontend. ` ) ;
2019-12-03 05:27:06 +08:00
2019-12-17 05:47:07 +08:00
// if there's an error in updating the frontend then the easy option to recover is to reload the frontend completely
utils . reloadApp ( ) ;
}
2019-10-20 23:49:58 +08:00
2019-12-03 05:27:06 +08:00
lastProcessedSyncId = Math . max ( lastProcessedSyncId , allSyncData [ allSyncData . length - 1 ] . id ) ;
2019-10-20 23:49:58 +08:00
}
2019-12-10 06:07:45 +08:00
checkSyncIdListeners ( ) ;
2019-10-20 23:49:58 +08:00
}
2018-03-25 23:09:17 +08:00
function connectWebSocket ( ) {
2019-11-26 04:44:46 +08:00
const loc = window . location ;
const webSocketUri = ( loc . protocol === "https:" ? "wss:" : "ws:" )
+ "//" + loc . host + loc . pathname ;
2018-03-25 23:09:17 +08:00
// use wss for secure messaging
2019-11-26 04:44:46 +08:00
const ws = new WebSocket ( webSocketUri ) ;
ws . onopen = ( ) => console . debug ( utils . now ( ) , ` Connected to server ${ webSocketUri } with WebSocket ` ) ;
2018-03-26 09:16:57 +08:00
ws . onmessage = handleMessage ;
2019-07-06 18:03:51 +08:00
// we're not handling ws.onclose here because reconnection is done in sendPing()
2017-11-29 06:52:47 +08:00
2018-03-25 23:09:17 +08:00
return ws ;
}
2019-12-03 05:27:06 +08:00
async function sendPing ( ) {
if ( Date . now ( ) - lastPingTs > 30000 ) {
2019-12-21 03:17:58 +08:00
console . log ( utils . now ( ) , "Lost websocket connection to the backend" ) ;
2019-12-03 05:27:06 +08:00
}
if ( ws . readyState === ws . OPEN ) {
ws . send ( JSON . stringify ( {
type : 'ping' ,
lastSyncId : lastAcceptedSyncId
} ) ) ;
}
else if ( ws . readyState === ws . CLOSED || ws . readyState === ws . CLOSING ) {
console . log ( utils . now ( ) , "WS closed or closing, trying to reconnect" ) ;
ws = connectWebSocket ( ) ;
}
}
2018-03-26 01:08:58 +08:00
setTimeout ( ( ) => {
ws = connectWebSocket ( ) ;
2019-02-10 23:36:25 +08:00
lastPingTs = Date . now ( ) ;
2018-03-26 01:08:58 +08:00
2019-12-03 05:27:06 +08:00
setInterval ( sendPing , 1000 ) ;
2018-04-06 11:17:19 +08:00
} , 0 ) ;
2017-12-02 11:28:22 +08:00
2019-10-26 04:20:14 +08:00
subscribeToMessages ( message => {
if ( message . type === 'sync-pull-in-progress' ) {
toastService . showPersistent ( {
id : 'sync' ,
title : "Sync status" ,
message : "Sync update in progress" ,
icon : "refresh"
} ) ;
}
2019-10-29 02:45:36 +08:00
else if ( message . type === 'sync-pull-finished' ) {
2019-10-29 03:26:40 +08:00
// this gives user a chance to see the toast in case of fast sync finish
setTimeout ( ( ) => toastService . closePersistent ( 'sync' ) , 1000 ) ;
2019-10-26 04:20:14 +08:00
}
} ) ;
2018-03-25 23:09:17 +08:00
export default {
2018-03-26 09:16:57 +08:00
logError ,
2018-08-01 15:26:02 +08:00
subscribeToMessages ,
2019-08-07 04:39:27 +08:00
subscribeToAllSyncMessages ,
2019-10-20 23:49:58 +08:00
subscribeToOutsideSyncMessages ,
2019-12-10 06:07:45 +08:00
waitForSyncId ,
waitForMaxKnownSyncId
2018-03-25 23:09:17 +08:00
} ;