diff --git a/bazarr/main.py b/bazarr/main.py index a364039ab..7139903d3 100644 --- a/bazarr/main.py +++ b/bazarr/main.py @@ -197,9 +197,9 @@ def proxy(protocol, url): if settings.general.getboolean('use_sonarr'): - sonarr_signalr_client.start() + gevent.Greenlet.spawn(sonarr_signalr_client.start) if settings.general.getboolean('use_radarr'): - radarr_signalr_client.start() + gevent.Greenlet.spawn(radarr_signalr_client.start) if __name__ == "__main__": diff --git a/bazarr/signalr_client.py b/bazarr/signalr_client.py index 4c7665e0a..9242a01fb 100644 --- a/bazarr/signalr_client.py +++ b/bazarr/signalr_client.py @@ -38,16 +38,16 @@ class SonarrSignalrClient: 'consider upgrading.') return - logging.debug('BAZARR connecting to Sonarr SignalR feed...') + logging.info('BAZARR trying to connect to Sonarr SignalR feed...') self.configure() - while not self.connection.is_open: + while not self.connection.started: try: self.connection.start() except ConnectionError: gevent.sleep(5) except json.decoder.JSONDecodeError: - logging.error('BAZARR cannot parse JSON returned by SignalR feed. Take a look at: ' - 'https://forums.sonarr.tv/t/signalr-problem/5785/3') + logging.error("BAZARR cannot parse JSON returned by SignalR feed. This is a known issue when Sonarr " + "doesn't have write permission to it's /config/xdg directory.") self.stop() logging.info('BAZARR SignalR client for Sonarr is connected and waiting for events.') if not args.dev: @@ -64,13 +64,16 @@ class SonarrSignalrClient: def restart(self): if self.connection: - if self.connection.is_open: - self.stop(log=False) + if self.connection.started: + try: + self.stop(log=False) + except: + self.connection.started = False if settings.general.getboolean('use_sonarr'): self.start() def exception_handler(self, type, exception, traceback): - logging.error('BAZARR connection to Sonarr SignalR feed has been lost. Reconnecting...') + logging.error('BAZARR connection to Sonarr SignalR feed has been lost.') self.restart() def configure(self): @@ -94,8 +97,12 @@ class RadarrSignalrClient: def start(self): self.configure() - logging.debug('BAZARR connecting to Radarr SignalR feed...') - self.connection.start() + logging.info('BAZARR trying to connect to Radarr SignalR feed...') + while self.connection.transport.state.value not in [0, 1, 2]: + try: + self.connection.start() + except ConnectionError: + gevent.sleep(5) def stop(self): logging.info('BAZARR SignalR client for Radarr is now disconnected.') @@ -133,8 +140,8 @@ class RadarrSignalrClient: "max_attempts": None }).build() self.connection.on_open(self.on_connect_handler) - self.connection.on_reconnect(lambda: logging.info('BAZARR SignalR client for Radarr connection as been lost. ' - 'Trying to reconnect...')) + self.connection.on_reconnect(lambda: logging.error('BAZARR SignalR client for Radarr connection as been lost. ' + 'Trying to reconnect...')) self.connection.on_close(lambda: logging.debug('BAZARR SignalR client for Radarr is disconnected.')) self.connection.on_error(self.exception_handler) self.connection.on("receiveMessage", dispatcher) diff --git a/libs/signalr/__init__.py b/libs/signalr/__init__.py index 7742eeb58..3d155c5c6 100644 --- a/libs/signalr/__init__.py +++ b/libs/signalr/__init__.py @@ -1,3 +1,8 @@ +from gevent import monkey + +monkey.patch_socket() +monkey.patch_ssl() + from ._connection import Connection -__version__ = '0.0.12' +__version__ = '0.0.7' diff --git a/libs/signalr/_connection.py b/libs/signalr/_connection.py index 6471ba670..377606f99 100644 --- a/libs/signalr/_connection.py +++ b/libs/signalr/_connection.py @@ -1,6 +1,6 @@ import json +import gevent import sys -from threading import Thread from signalr.events import EventHook from signalr.hubs import Hub from signalr.transports import AutoTransport @@ -15,16 +15,14 @@ class Connection: self.qs = {} self.__send_counter = -1 self.token = None - self.id = None self.data = None self.received = EventHook() self.error = EventHook() self.starting = EventHook() self.stopping = EventHook() self.exception = EventHook() - self.is_open = False self.__transport = AutoTransport(session, self) - self.__listener_thread = None + self.__greenlet = None self.started = False def handle_error(**kwargs): @@ -50,32 +48,27 @@ class Connection: negotiate_data = self.__transport.negotiate() self.token = negotiate_data['ConnectionToken'] - self.id = negotiate_data['ConnectionId'] listener = self.__transport.start() def wrapped_listener(): - while self.is_open: - try: - listener() - except: - self.exception.fire(*sys.exc_info()) - self.is_open = False + try: + listener() + gevent.sleep() + except: + self.exception.fire(*sys.exc_info()) - self.is_open = True - self.__listener_thread = Thread(target=wrapped_listener) - self.__listener_thread.start() + self.__greenlet = gevent.spawn(wrapped_listener) self.started = True def wait(self, timeout=30): - Thread.join(self.__listener_thread, timeout) + gevent.joinall([self.__greenlet], timeout) def send(self, data): self.__transport.send(data) def close(self): - self.is_open = False - self.__listener_thread.join() + gevent.kill(self.__greenlet) self.__transport.close() def register_hub(self, name): diff --git a/libs/signalr/transports/_sse_transport.py b/libs/signalr/transports/_sse_transport.py index 7faaf936a..63d978643 100644 --- a/libs/signalr/transports/_sse_transport.py +++ b/libs/signalr/transports/_sse_transport.py @@ -12,16 +12,11 @@ class ServerSentEventsTransport(Transport): return 'serverSentEvents' def start(self): - connect_url = self._get_url('connect') - self.__response = iter(sseclient.SSEClient(connect_url, session=self._session)) + self.__response = sseclient.SSEClient(self._get_url('connect'), session=self._session) self._session.get(self._get_url('start')) def _receive(): - try: - notification = next(self.__response) - except StopIteration: - return - else: + for notification in self.__response: if notification.data != 'initialized': self._handle_notification(notification.data) diff --git a/libs/signalr/transports/_transport.py b/libs/signalr/transports/_transport.py index af62672fd..c0d0d4278 100644 --- a/libs/signalr/transports/_transport.py +++ b/libs/signalr/transports/_transport.py @@ -1,12 +1,13 @@ from abc import abstractmethod import json import sys -import threading + if sys.version_info[0] < 3: from urllib import quote_plus else: from urllib.parse import quote_plus +import gevent class Transport: @@ -47,7 +48,7 @@ class Transport: if len(message) > 0: data = json.loads(message) self._connection.received.fire(**data) - #thread.sleep() #TODO: investigate if we should sleep here + gevent.sleep() def _get_url(self, action, **kwargs): args = kwargs.copy() diff --git a/libs/signalr/transports/_ws_transport.py b/libs/signalr/transports/_ws_transport.py index 4d9a80ad1..14fefa6cc 100644 --- a/libs/signalr/transports/_ws_transport.py +++ b/libs/signalr/transports/_ws_transport.py @@ -1,6 +1,7 @@ import json import sys +import gevent if sys.version_info[0] < 3: from urlparse import urlparse, urlunparse @@ -38,14 +39,14 @@ class WebSocketsTransport(Transport): self._session.get(self._get_url('start')) def _receive(): - notification = self.ws.recv() - self._handle_notification(notification) + for notification in self.ws: + self._handle_notification(notification) return _receive def send(self, data): self.ws.send(json.dumps(data)) - #thread.sleep() #TODO: inveistage if we should sleep here or not + gevent.sleep() def close(self): self.ws.close() diff --git a/libs/version.txt b/libs/version.txt index 5c04c9373..ff635d2c7 100644 --- a/libs/version.txt +++ b/libs/version.txt @@ -30,7 +30,7 @@ rarfile=3.0 rebulk=3.0.1 requests=2.18.4 semver=2.13.0 -signalr-client-threads=0.0.12 <-- Modified to work with Sonarr +signalr-client=0.0.7 <-- Modified to work with Sonarr and added exception handler signalrcore=0.9.2 <-- https://github.com/mandrewcito/signalrcore/pull/60 and 61 SimpleConfigParser=0.1.0 <-- modified version: do not update!!! six=1.11.0