diff --git a/packages/local-sync/src/message-processor/index.js b/packages/local-sync/src/message-processor/index.js index 0d16d9dbc..8e0641bc2 100644 --- a/packages/local-sync/src/message-processor/index.js +++ b/packages/local-sync/src/message-processor/index.js @@ -12,6 +12,9 @@ const LocalDatabaseConnector = require('../shared/local-database-connector'); const MAX_QUEUE_LENGTH = 500 const PROCESSING_DELAY = 0 +const MAX_CPU_USE_ON_AC = 1.0; +const MAX_CPU_USE_ON_BATTERY = 0.10; +const MAX_CHUNK_SIZE = 1; class MessageProcessor { @@ -19,6 +22,15 @@ class MessageProcessor { // The queue is a chain of Promises this._queue = Promise.resolve() this._queueLength = 0 + this._currentChunkSize = 0 + this._currentChunkStart = Date.now(); + this._isBatteryCharging = true; + navigator.getBattery().then((battery) => { + battery.addEventListener('chargingchange', () => { + console.info('charge change', battery.charging); + this._isBatteryCharging = battery.charging + }); + }); } queueLength() { @@ -29,6 +41,19 @@ class MessageProcessor { return this._queueLength >= MAX_QUEUE_LENGTH } + _maxCPUForProcessing() { + if (this._isBatteryCharging) { + return MAX_CPU_USE_ON_AC; + } + return MAX_CPU_USE_ON_BATTERY; + } + + _computeThrottlingTimeout() { + const timeSliceMs = Date.now() - this._currentChunkStart; + const maxCPU = this._maxCPUForProcessing(); + return (timeSliceMs * (1.0 / maxCPU)) - timeSliceMs; + } + /** * @returns Promise that resolves when message has been processed * This promise will never reject, given that this function is meant to be @@ -40,9 +65,21 @@ class MessageProcessor { return new Promise((resolve) => { this._queueLength++ this._queue = this._queue.then(async () => { + if (this._currentChunkSize === 0) { + this._currentChunkStart = Date.now(); + } + this._currentChunkSize++; + await this._processMessage({accountId, folderId, imapMessage, struct, desiredParts}) this._queueLength-- + // Throttle message processing to meter cpu usage + if (this._currentChunkSize === MAX_CHUNK_SIZE) { + const timeout = this._computeThrottlingTimeout(); + await new Promise(r => setTimeout(r, timeout)) + this._currentChunkSize = 0; + } + // To save memory, we reset the Promise chain if the queue reaches a // length of 0, otherwise we will continue referencing the entire chain // of promises that came before @@ -50,9 +87,6 @@ class MessageProcessor { this._queue = Promise.resolve() } resolve() - - // Throttle message processing to meter cpu usage - await new Promise(r => setTimeout(r, PROCESSING_DELAY)) }) }) }