diff --git a/core/bot.py b/core/bot.py index a37421d46..17efd4136 100644 --- a/core/bot.py +++ b/core/bot.py @@ -65,12 +65,12 @@ class Red(commands.Bot): """Lists packages present in the cogs the folder""" return os.listdir("cogs") - def save_packages_status(self): + async def save_packages_status(self): loaded = [] for package in self.extensions: if package.startswith("cogs."): loaded.append(package) - self.db.set_global("packages", loaded) + await self.db.set_global("packages", loaded) class ExitCodes(Enum): diff --git a/core/json_flusher.py b/core/json_flusher.py deleted file mode 100644 index 2c5bd36f3..000000000 --- a/core/json_flusher.py +++ /dev/null @@ -1,82 +0,0 @@ -import asyncio -import logging -from core.json_io import JsonIO, PRETTY - -# This is where individual cogs can queue low priority writes to files -# -# Only the last queued write to a file actually gets executed. -# This helps considerably in reducing the total writes (especially in poorly -# coded cogs that would otherwise hammer the system with them) -# -# The flusher is used by the DB helpers in autosave mode -# -# The JSONFlusher class is supposed to be instanced only once, at boot - -log = logging.getLogger("red") -_flusher = None - - -class JSONFlusher(JsonIO): - def __init__(self, interval=60, **settings): - self.interval = interval - self._queue = {} - self._lock = asyncio.Lock() - self._json_settings = settings.pop("json_settings", PRETTY) - self._loop = asyncio.get_event_loop() - self.task = self._loop.create_task(self._process_queue()) - - def add_to_queue(self, path, data): - """Schedules a json file for later write - - Calling this function multiple times with the same path will - result in only the last one getting scheduled""" - self._queue[path] = data - - def remove_from_queue(self, path): - """Removes json file from the writing queue""" - try: - del self._queue[path] - except: - pass - - async def _process_queue(self): - log.debug("The flusher is now active with an interval of {} " - "seconds".format(self.interval)) - try: - while True: - queue = self._queue.copy() - self._queue = {} - for path, data in queue.items(): - await self._process_file(path, data, self._json_settings) - await asyncio.sleep(self.interval) - except asyncio.CancelledError: - if self._queue: - log.debug("Flusher interrupted with non-empty queue. " - "Saving files...") - queue = self._queue.copy() - for path, data in queue.items(): - await self._process_file(path, data, self._json_settings) - else: - log.debug("The queue has been processed.") - - log.debug("Flusher shutting down.") - - async def _process_file(self, path, data, settings): - with await self._lock: - try: - await self._threadsafe_save_json(path, data, settings) - except Exception as e: - log.critical("Flusher failed to write: {}".format(e)) - - -def init_flusher(): - """Instances the flusher and initializes its task""" - global _flusher - _flusher = JSONFlusher() - - -def get_flusher(): - """Returns the global flusher instance""" - if _flusher is None: - raise RuntimeError("The flusher has not been initialized.") - return _flusher diff --git a/core/json_io.py b/core/json_io.py index 18b8e8a61..823c2acb6 100644 --- a/core/json_io.py +++ b/core/json_io.py @@ -16,9 +16,8 @@ MINIFIED = {"sort_keys": True, "separators": (',', ':')} class JsonIO: - """Basic functions for atomic saving / loading of json files - - This is inherited by the flusher and db helpers""" + """Basic functions for atomic saving / loading of json files""" + _lock = asyncio.Lock() def _save_json(self, path, data, settings=PRETTY): log.debug("Saving file {}".format(path)) @@ -31,7 +30,8 @@ class JsonIO: async def _threadsafe_save_json(self, path, data, settings=PRETTY): loop = asyncio.get_event_loop() func = functools.partial(self._save_json, path, data, settings) - await loop.run_in_executor(None, func) + with await self._lock: + await loop.run_in_executor(None, func) def _load_json(self, path): log.debug("Reading file {}".format(path)) @@ -43,4 +43,5 @@ class JsonIO: loop = asyncio.get_event_loop() func = functools.partial(self._load_json, path) task = loop.run_in_executor(None, func) - return await asyncio.wait_for(task) + with await self._lock: + return await asyncio.wait_for(task) diff --git a/core/owner.py b/core/owner.py index f040aa471..c066fd033 100644 --- a/core/owner.py +++ b/core/owner.py @@ -27,7 +27,7 @@ class Owner: await ctx.send("Failed to load package. Check your console or " "logs for details.") else: - ctx.bot.save_packages_status() + await ctx.bot.save_packages_status() await ctx.send("Done.") @commands.group() @@ -39,7 +39,7 @@ class Owner: if cog_name in ctx.bot.extensions: ctx.bot.unload_extension(cog_name) - ctx.bot.save_packages_status() + await ctx.bot.save_packages_status() await ctx.send("Done.") else: await ctx.send("That extension is not loaded.") @@ -60,7 +60,7 @@ class Owner: await ctx.send("Failed to reload package. Check your console or " "logs for details.") else: - ctx.bot.save_packages_status() + await ctx.bot.save_packages_status() await ctx.send("Done.") def refresh_modules(self, module): diff --git a/core/utils/helpers.py b/core/utils/helpers.py index 3d4dea118..472b0d605 100644 --- a/core/utils/helpers.py +++ b/core/utils/helpers.py @@ -1,8 +1,9 @@ import os import discord +import asyncio +import functools from collections import defaultdict from core.json_io import JsonIO -from core import json_flusher GLOBAL_KEY = '__global__' @@ -20,18 +21,15 @@ class JsonDB(JsonIO): create_dirs: bool=False If True, it will create any missing directory leading to the file you want to create - autosave: bool=False - If True, any change to the "database" will be queued to the - flusher and scheduled for a later write default_value: Optional=None Same behaviour as a defaultdict """ + def __init__(self, file_path, **kwargs): create_dirs = kwargs.pop("create_dirs", False) default_value = kwargs.pop("default_value", SENTINEL) self.autosave = kwargs.pop("autosave", False) self.path = file_path - self._flusher = json_flusher.get_flusher() file_exists = os.path.isfile(file_path) @@ -48,49 +46,51 @@ class JsonDB(JsonIO): self._data = self._load_json(file_path) else: self._data = {} - self._save() + self._blocking_save() if default_value is not SENTINEL: def _get_default(): return default_value self._data = defaultdict(_get_default, self._data) - def set(self, key, value): + self._loop = asyncio.get_event_loop() + self._task = functools.partial(self._threadsafe_save_json, self._data) + + async def set(self, key, value): """Sets a DB's entry""" self._data[key] = value - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() def get(self, key, default=None): """Returns a DB's entry""" return self._data.get(key, default) - def remove(self, key): + async def remove(self, key): """Removes a DB's entry""" del self._data[key] - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() - def pop(self, key, default=None): + async def pop(self, key, default=None): """Removes and returns a DB's entry""" - return self._data.pop(key, default) + value = self._data.pop(key, default) + await self.save() + return value - def wipe(self): + async def wipe(self): """Wipes DB""" self._data = {} - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() def all(self): """Returns all DB's data""" return self._data - def _save(self): + def _blocking_save(self): """Using this should be avoided. Let's stick to threadsafe saves""" self._save_json(self.path, self._data) async def save(self): - self._flusher.remove_from_queue(self.path) + """Threadsafe save to file""" await self._threadsafe_save_json(self.path, self._data) def __contains__(self, key): @@ -118,15 +118,14 @@ class JsonGuildDB(JsonDB): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def set(self, guild, key, value): + async def set(self, guild, key, value): """Sets a guild's entry""" if not isinstance(guild, discord.Guild): raise TypeError('Can only set guild data') if str(guild.id) not in self._data: self._data[str(guild.id)] = {} self._data[str(guild.id)][key] = value - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() def get(self, guild, key, default=None): """Returns a guild's entry""" @@ -136,23 +135,22 @@ class JsonGuildDB(JsonDB): return default return self._data[str(guild.id)].get(key, default) - def remove(self, guild, key): + async def remove(self, guild, key): """Removes a guild's entry""" if not isinstance(guild, discord.Guild): raise TypeError('Can only remove guild data') if str(guild.id) not in self._data: raise KeyError('Guild data is not present') del self._data[str(guild.id)][key] - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() - def pop(self, guild, key, default=None): + async def pop(self, guild, key, default=None): """Removes and returns a guild's entry""" if not isinstance(guild, discord.Guild): raise TypeError('Can only remove guild data') - return self._data.get(str(guild.id), {}).pop(key, default) - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + value = self._data.get(str(guild.id), {}).pop(key, default) + await self.save() + return value def get_all(self, guild, default): """Returns all entries of a guild""" @@ -160,21 +158,18 @@ class JsonGuildDB(JsonDB): raise TypeError('Can only get guild data') return self._data.get(str(guild.id), default) - def remove_all(self, guild): + async def remove_all(self, guild): """Removes all entries of a guild""" if not isinstance(guild, discord.Guild): raise TypeError('Can only remove guilds') - super().remove(str(guild.id)) - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await super().remove(str(guild.id)) - def set_global(self, key, value): + async def set_global(self, key, value): """Sets a global value""" if GLOBAL_KEY not in self._data: self._data[GLOBAL_KEY] = {} self._data[GLOBAL_KEY][key] = value - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() def get_global(self, key, default=None): """Gets a global value""" @@ -183,18 +178,17 @@ class JsonGuildDB(JsonDB): return self._data[GLOBAL_KEY].get(key, default) - def remove_global(self, key): + async def remove_global(self, key): """Removes a global value""" if GLOBAL_KEY not in self._data: self._data[GLOBAL_KEY] = {} del self._data[key] - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + await self.save() - def pop_global(self, key, default=None): + async def pop_global(self, key, default=None): """Removes and returns a global value""" if GLOBAL_KEY not in self._data: self._data[GLOBAL_KEY] = {} - return self._data.pop(key, default) - if self.autosave: - self._flusher.add_to_queue(self.path, self._data) + value = self._data[GLOBAL_KEY].pop(key, default) + await self.save() + return value diff --git a/main.py b/main.py index 24e3a1df5..6f6bb5972 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,6 @@ from core.bot import Red, ExitCodes from core.global_checks import init_global_checks from core.events import init_events -from core.json_flusher import init_flusher from core.settings import parse_cli_flags import asyncio import discord @@ -53,7 +52,6 @@ def init_loggers(cli_flags): if __name__ == '__main__': cli_flags = parse_cli_flags() log = init_loggers(cli_flags) - init_flusher() description = "Red v3 - Alpha" red = Red(cli_flags, description=description, pm_help=None) init_global_checks(red)