diff --git a/core/json_flusher.py b/core/json_flusher.py index 8788469ee..2c5bd36f3 100644 --- a/core/json_flusher.py +++ b/core/json_flusher.py @@ -17,7 +17,7 @@ _flusher = None class JSONFlusher(JsonIO): - def __init__(self, interval=5, **settings): + def __init__(self, interval=60, **settings): self.interval = interval self._queue = {} self._lock = asyncio.Lock() @@ -40,26 +40,33 @@ class JSONFlusher(JsonIO): pass async def _process_queue(self): + log.debug("The flusher is now active with an interval of {} " + "seconds".format(self.interval)) try: while True: queue = self._queue.copy() self._queue = {} for path, data in queue.items(): - with await self._lock: - try: - await self._threadsafe_save_json(path, - data, - self._json_settings) - except Exception as e: - log.critical("Flusher failed to write: {}" - "".format(e)) + await self._process_file(path, data, self._json_settings) await asyncio.sleep(self.interval) except asyncio.CancelledError: if self._queue: - log.warning("Flusher interrupted with " - "non-empty queue") - else: - log.debug("Flusher shutting down.") + log.debug("Flusher interrupted with non-empty queue. " + "Saving files...") + queue = self._queue.copy() + for path, data in queue.items(): + await self._process_file(path, data, self._json_settings) + else: + log.debug("The queue has been processed.") + + log.debug("Flusher shutting down.") + + async def _process_file(self, path, data, settings): + with await self._lock: + try: + await self._threadsafe_save_json(path, data, settings) + except Exception as e: + log.critical("Flusher failed to write: {}".format(e)) def init_flusher(): diff --git a/main.py b/main.py index d251ecc7b..24e3a1df5 100644 --- a/main.py +++ b/main.py @@ -85,5 +85,10 @@ if __name__ == '__main__': log.critical("Fatal exception", exc_info=e) loop.run_until_complete(red.logout()) finally: + pending = asyncio.Task.all_tasks(loop=red.loop) + gathered = asyncio.gather(*pending, loop=red.loop) + gathered.cancel() + red.loop.run_until_complete(gathered) + gathered.exception() sys.exit(red._shutdown_mode.value)