[Core] Removed flusher, helpers now save on changes

After some considerations, while the flusher was an interesting experiment, it doesn't seem to be reliable enough. It's been removed in favor of the helpers autosaving on any change in a threadsafe way.
This commit is contained in:
Twentysix 2017-04-29 19:25:10 +02:00
parent bd341f1875
commit 9fc4e54ece
6 changed files with 48 additions and 137 deletions

View File

@ -65,12 +65,12 @@ class Red(commands.Bot):
"""Lists packages present in the cogs the folder""" """Lists packages present in the cogs the folder"""
return os.listdir("cogs") return os.listdir("cogs")
def save_packages_status(self): async def save_packages_status(self):
loaded = [] loaded = []
for package in self.extensions: for package in self.extensions:
if package.startswith("cogs."): if package.startswith("cogs."):
loaded.append(package) loaded.append(package)
self.db.set_global("packages", loaded) await self.db.set_global("packages", loaded)
class ExitCodes(Enum): class ExitCodes(Enum):

View File

@ -1,82 +0,0 @@
import asyncio
import logging
from core.json_io import JsonIO, PRETTY
# This is where individual cogs can queue low priority writes to files
#
# Only the last queued write to a file actually gets executed.
# This helps considerably in reducing the total writes (especially in poorly
# coded cogs that would otherwise hammer the system with them)
#
# The flusher is used by the DB helpers in autosave mode
#
# The JSONFlusher class is supposed to be instanced only once, at boot
log = logging.getLogger("red")
_flusher = None
class JSONFlusher(JsonIO):
def __init__(self, interval=60, **settings):
self.interval = interval
self._queue = {}
self._lock = asyncio.Lock()
self._json_settings = settings.pop("json_settings", PRETTY)
self._loop = asyncio.get_event_loop()
self.task = self._loop.create_task(self._process_queue())
def add_to_queue(self, path, data):
"""Schedules a json file for later write
Calling this function multiple times with the same path will
result in only the last one getting scheduled"""
self._queue[path] = data
def remove_from_queue(self, path):
"""Removes json file from the writing queue"""
try:
del self._queue[path]
except:
pass
async def _process_queue(self):
log.debug("The flusher is now active with an interval of {} "
"seconds".format(self.interval))
try:
while True:
queue = self._queue.copy()
self._queue = {}
for path, data in queue.items():
await self._process_file(path, data, self._json_settings)
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
if self._queue:
log.debug("Flusher interrupted with non-empty queue. "
"Saving files...")
queue = self._queue.copy()
for path, data in queue.items():
await self._process_file(path, data, self._json_settings)
else:
log.debug("The queue has been processed.")
log.debug("Flusher shutting down.")
async def _process_file(self, path, data, settings):
with await self._lock:
try:
await self._threadsafe_save_json(path, data, settings)
except Exception as e:
log.critical("Flusher failed to write: {}".format(e))
def init_flusher():
"""Instances the flusher and initializes its task"""
global _flusher
_flusher = JSONFlusher()
def get_flusher():
"""Returns the global flusher instance"""
if _flusher is None:
raise RuntimeError("The flusher has not been initialized.")
return _flusher

View File

@ -16,9 +16,8 @@ MINIFIED = {"sort_keys": True, "separators": (',', ':')}
class JsonIO: class JsonIO:
"""Basic functions for atomic saving / loading of json files """Basic functions for atomic saving / loading of json files"""
_lock = asyncio.Lock()
This is inherited by the flusher and db helpers"""
def _save_json(self, path, data, settings=PRETTY): def _save_json(self, path, data, settings=PRETTY):
log.debug("Saving file {}".format(path)) log.debug("Saving file {}".format(path))
@ -31,6 +30,7 @@ class JsonIO:
async def _threadsafe_save_json(self, path, data, settings=PRETTY): async def _threadsafe_save_json(self, path, data, settings=PRETTY):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
func = functools.partial(self._save_json, path, data, settings) func = functools.partial(self._save_json, path, data, settings)
with await self._lock:
await loop.run_in_executor(None, func) await loop.run_in_executor(None, func)
def _load_json(self, path): def _load_json(self, path):
@ -43,4 +43,5 @@ class JsonIO:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
func = functools.partial(self._load_json, path) func = functools.partial(self._load_json, path)
task = loop.run_in_executor(None, func) task = loop.run_in_executor(None, func)
with await self._lock:
return await asyncio.wait_for(task) return await asyncio.wait_for(task)

View File

@ -27,7 +27,7 @@ class Owner:
await ctx.send("Failed to load package. Check your console or " await ctx.send("Failed to load package. Check your console or "
"logs for details.") "logs for details.")
else: else:
ctx.bot.save_packages_status() await ctx.bot.save_packages_status()
await ctx.send("Done.") await ctx.send("Done.")
@commands.group() @commands.group()
@ -39,7 +39,7 @@ class Owner:
if cog_name in ctx.bot.extensions: if cog_name in ctx.bot.extensions:
ctx.bot.unload_extension(cog_name) ctx.bot.unload_extension(cog_name)
ctx.bot.save_packages_status() await ctx.bot.save_packages_status()
await ctx.send("Done.") await ctx.send("Done.")
else: else:
await ctx.send("That extension is not loaded.") await ctx.send("That extension is not loaded.")
@ -60,7 +60,7 @@ class Owner:
await ctx.send("Failed to reload package. Check your console or " await ctx.send("Failed to reload package. Check your console or "
"logs for details.") "logs for details.")
else: else:
ctx.bot.save_packages_status() await ctx.bot.save_packages_status()
await ctx.send("Done.") await ctx.send("Done.")
def refresh_modules(self, module): def refresh_modules(self, module):

View File

@ -1,8 +1,9 @@
import os import os
import discord import discord
import asyncio
import functools
from collections import defaultdict from collections import defaultdict
from core.json_io import JsonIO from core.json_io import JsonIO
from core import json_flusher
GLOBAL_KEY = '__global__' GLOBAL_KEY = '__global__'
@ -20,18 +21,15 @@ class JsonDB(JsonIO):
create_dirs: bool=False create_dirs: bool=False
If True, it will create any missing directory leading to If True, it will create any missing directory leading to
the file you want to create the file you want to create
autosave: bool=False
If True, any change to the "database" will be queued to the
flusher and scheduled for a later write
default_value: Optional=None default_value: Optional=None
Same behaviour as a defaultdict Same behaviour as a defaultdict
""" """
def __init__(self, file_path, **kwargs): def __init__(self, file_path, **kwargs):
create_dirs = kwargs.pop("create_dirs", False) create_dirs = kwargs.pop("create_dirs", False)
default_value = kwargs.pop("default_value", SENTINEL) default_value = kwargs.pop("default_value", SENTINEL)
self.autosave = kwargs.pop("autosave", False) self.autosave = kwargs.pop("autosave", False)
self.path = file_path self.path = file_path
self._flusher = json_flusher.get_flusher()
file_exists = os.path.isfile(file_path) file_exists = os.path.isfile(file_path)
@ -48,49 +46,51 @@ class JsonDB(JsonIO):
self._data = self._load_json(file_path) self._data = self._load_json(file_path)
else: else:
self._data = {} self._data = {}
self._save() self._blocking_save()
if default_value is not SENTINEL: if default_value is not SENTINEL:
def _get_default(): def _get_default():
return default_value return default_value
self._data = defaultdict(_get_default, self._data) self._data = defaultdict(_get_default, self._data)
def set(self, key, value): self._loop = asyncio.get_event_loop()
self._task = functools.partial(self._threadsafe_save_json, self._data)
async def set(self, key, value):
"""Sets a DB's entry""" """Sets a DB's entry"""
self._data[key] = value self._data[key] = value
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def get(self, key, default=None): def get(self, key, default=None):
"""Returns a DB's entry""" """Returns a DB's entry"""
return self._data.get(key, default) return self._data.get(key, default)
def remove(self, key): async def remove(self, key):
"""Removes a DB's entry""" """Removes a DB's entry"""
del self._data[key] del self._data[key]
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def pop(self, key, default=None): async def pop(self, key, default=None):
"""Removes and returns a DB's entry""" """Removes and returns a DB's entry"""
return self._data.pop(key, default) value = self._data.pop(key, default)
await self.save()
return value
def wipe(self): async def wipe(self):
"""Wipes DB""" """Wipes DB"""
self._data = {} self._data = {}
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def all(self): def all(self):
"""Returns all DB's data""" """Returns all DB's data"""
return self._data return self._data
def _save(self): def _blocking_save(self):
"""Using this should be avoided. Let's stick to threadsafe saves""" """Using this should be avoided. Let's stick to threadsafe saves"""
self._save_json(self.path, self._data) self._save_json(self.path, self._data)
async def save(self): async def save(self):
self._flusher.remove_from_queue(self.path) """Threadsafe save to file"""
await self._threadsafe_save_json(self.path, self._data) await self._threadsafe_save_json(self.path, self._data)
def __contains__(self, key): def __contains__(self, key):
@ -118,15 +118,14 @@ class JsonGuildDB(JsonDB):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def set(self, guild, key, value): async def set(self, guild, key, value):
"""Sets a guild's entry""" """Sets a guild's entry"""
if not isinstance(guild, discord.Guild): if not isinstance(guild, discord.Guild):
raise TypeError('Can only set guild data') raise TypeError('Can only set guild data')
if str(guild.id) not in self._data: if str(guild.id) not in self._data:
self._data[str(guild.id)] = {} self._data[str(guild.id)] = {}
self._data[str(guild.id)][key] = value self._data[str(guild.id)][key] = value
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def get(self, guild, key, default=None): def get(self, guild, key, default=None):
"""Returns a guild's entry""" """Returns a guild's entry"""
@ -136,23 +135,22 @@ class JsonGuildDB(JsonDB):
return default return default
return self._data[str(guild.id)].get(key, default) return self._data[str(guild.id)].get(key, default)
def remove(self, guild, key): async def remove(self, guild, key):
"""Removes a guild's entry""" """Removes a guild's entry"""
if not isinstance(guild, discord.Guild): if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guild data') raise TypeError('Can only remove guild data')
if str(guild.id) not in self._data: if str(guild.id) not in self._data:
raise KeyError('Guild data is not present') raise KeyError('Guild data is not present')
del self._data[str(guild.id)][key] del self._data[str(guild.id)][key]
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def pop(self, guild, key, default=None): async def pop(self, guild, key, default=None):
"""Removes and returns a guild's entry""" """Removes and returns a guild's entry"""
if not isinstance(guild, discord.Guild): if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guild data') raise TypeError('Can only remove guild data')
return self._data.get(str(guild.id), {}).pop(key, default) value = self._data.get(str(guild.id), {}).pop(key, default)
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data) return value
def get_all(self, guild, default): def get_all(self, guild, default):
"""Returns all entries of a guild""" """Returns all entries of a guild"""
@ -160,21 +158,18 @@ class JsonGuildDB(JsonDB):
raise TypeError('Can only get guild data') raise TypeError('Can only get guild data')
return self._data.get(str(guild.id), default) return self._data.get(str(guild.id), default)
def remove_all(self, guild): async def remove_all(self, guild):
"""Removes all entries of a guild""" """Removes all entries of a guild"""
if not isinstance(guild, discord.Guild): if not isinstance(guild, discord.Guild):
raise TypeError('Can only remove guilds') raise TypeError('Can only remove guilds')
super().remove(str(guild.id)) await super().remove(str(guild.id))
if self.autosave:
self._flusher.add_to_queue(self.path, self._data)
def set_global(self, key, value): async def set_global(self, key, value):
"""Sets a global value""" """Sets a global value"""
if GLOBAL_KEY not in self._data: if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {} self._data[GLOBAL_KEY] = {}
self._data[GLOBAL_KEY][key] = value self._data[GLOBAL_KEY][key] = value
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def get_global(self, key, default=None): def get_global(self, key, default=None):
"""Gets a global value""" """Gets a global value"""
@ -183,18 +178,17 @@ class JsonGuildDB(JsonDB):
return self._data[GLOBAL_KEY].get(key, default) return self._data[GLOBAL_KEY].get(key, default)
def remove_global(self, key): async def remove_global(self, key):
"""Removes a global value""" """Removes a global value"""
if GLOBAL_KEY not in self._data: if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {} self._data[GLOBAL_KEY] = {}
del self._data[key] del self._data[key]
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data)
def pop_global(self, key, default=None): async def pop_global(self, key, default=None):
"""Removes and returns a global value""" """Removes and returns a global value"""
if GLOBAL_KEY not in self._data: if GLOBAL_KEY not in self._data:
self._data[GLOBAL_KEY] = {} self._data[GLOBAL_KEY] = {}
return self._data.pop(key, default) value = self._data[GLOBAL_KEY].pop(key, default)
if self.autosave: await self.save()
self._flusher.add_to_queue(self.path, self._data) return value

View File

@ -1,7 +1,6 @@
from core.bot import Red, ExitCodes from core.bot import Red, ExitCodes
from core.global_checks import init_global_checks from core.global_checks import init_global_checks
from core.events import init_events from core.events import init_events
from core.json_flusher import init_flusher
from core.settings import parse_cli_flags from core.settings import parse_cli_flags
import asyncio import asyncio
import discord import discord
@ -53,7 +52,6 @@ def init_loggers(cli_flags):
if __name__ == '__main__': if __name__ == '__main__':
cli_flags = parse_cli_flags() cli_flags = parse_cli_flags()
log = init_loggers(cli_flags) log = init_loggers(cli_flags)
init_flusher()
description = "Red v3 - Alpha" description = "Red v3 - Alpha"
red = Red(cli_flags, description=description, pm_help=None) red = Red(cli_flags, description=description, pm_help=None)
init_global_checks(red) init_global_checks(red)