[Core] Properly end tasks and process flusher's queue on quit

This commit is contained in:
Twentysix 2017-04-28 18:13:02 +02:00
parent b113a94c52
commit bd341f1875
2 changed files with 25 additions and 13 deletions

View File

@ -17,7 +17,7 @@ _flusher = None
class JSONFlusher(JsonIO):
def __init__(self, interval=5, **settings):
def __init__(self, interval=60, **settings):
self.interval = interval
self._queue = {}
self._lock = asyncio.Lock()
@ -40,26 +40,33 @@ class JSONFlusher(JsonIO):
pass
async def _process_queue(self):
log.debug("The flusher is now active with an interval of {} "
"seconds".format(self.interval))
try:
while True:
queue = self._queue.copy()
self._queue = {}
for path, data in queue.items():
with await self._lock:
try:
await self._threadsafe_save_json(path,
data,
self._json_settings)
except Exception as e:
log.critical("Flusher failed to write: {}"
"".format(e))
await self._process_file(path, data, self._json_settings)
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
if self._queue:
log.warning("Flusher interrupted with "
"non-empty queue")
else:
log.debug("Flusher shutting down.")
log.debug("Flusher interrupted with non-empty queue. "
"Saving files...")
queue = self._queue.copy()
for path, data in queue.items():
await self._process_file(path, data, self._json_settings)
else:
log.debug("The queue has been processed.")
log.debug("Flusher shutting down.")
async def _process_file(self, path, data, settings):
with await self._lock:
try:
await self._threadsafe_save_json(path, data, settings)
except Exception as e:
log.critical("Flusher failed to write: {}".format(e))
def init_flusher():

View File

@ -85,5 +85,10 @@ if __name__ == '__main__':
log.critical("Fatal exception", exc_info=e)
loop.run_until_complete(red.logout())
finally:
pending = asyncio.Task.all_tasks(loop=red.loop)
gathered = asyncio.gather(*pending, loop=red.loop)
gathered.cancel()
red.loop.run_until_complete(gathered)
gathered.exception()
sys.exit(red._shutdown_mode.value)