from itertools import chain, starmap from pathlib import Path from datetime import datetime from redbot.core.bot import Red from redbot.core.utils.data_converter import DataConverter as dc from redbot.core.config import Config class SpecResolver(object): """ Resolves Certain things for DataConverter """ def __init__(self, path: Path): self.v2path = path self.resolved = set() self.available_core_conversions = { "Bank Accounts": { "cfg": ("Bank", None, 384734293238749), "file": self.v2path / "data" / "economy" / "bank.json", "converter": self.bank_accounts_conv_spec, }, "Economy Settings": { "cfg": ("Economy", "config", 1256844281), "file": self.v2path / "data" / "economy" / "settings.json", "converter": self.economy_conv_spec, }, "Mod Log Cases": { "cfg": ("ModLog", None, 1354799444), "file": self.v2path / "data" / "mod" / "modlog.json", "converter": None, # prevents from showing as available }, "Filter": { "cfg": ("Filter", "settings", 4766951341), "file": self.v2path / "data" / "mod" / "filter.json", "converter": self.filter_conv_spec, }, "Past Names": { "cfg": ("Mod", "settings", 4961522000), "file": self.v2path / "data" / "mod" / "past_names.json", "converter": self.past_names_conv_spec, }, "Past Nicknames": { "cfg": ("Mod", "settings", 4961522000), "file": self.v2path / "data" / "mod" / "past_nicknames.json", "converter": self.past_nicknames_conv_spec, }, "Custom Commands": { "cfg": ("CustomCommands", "config", 414589031223512), "file": self.v2path / "data" / "customcom" / "commands.json", "converter": self.customcom_conv_spec, }, } @property def available(self): return sorted( k for k, v in self.available_core_conversions.items() if v["file"].is_file() and v["converter"] is not None and k not in self.resolved ) def unpack(self, parent_key, parent_value): """Unpack one level of nesting in a dictionary""" try: items = parent_value.items() except AttributeError: yield (parent_key, parent_value) else: for key, value in items: yield (parent_key + (key,), value) def flatten_dict(self, dictionary: dict): """Flatten a nested dictionary structure""" dictionary = {(key,): value for key, value in dictionary.items()} while True: dictionary = dict(chain.from_iterable(starmap(self.unpack, dictionary.items()))) if not any(isinstance(value, dict) for value in dictionary.values()): break return dictionary def apply_scope(self, scope: str, data: dict): return {(scope,) + k: v for k, v in data.items()} def bank_accounts_conv_spec(self, data: dict): flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data)) ret = {} for k, v in flatscoped.items(): outerkey, innerkey = tuple(k[:-1]), (k[-1],) if outerkey not in ret: ret[outerkey] = {} if innerkey[0] == "created_at": x = int(datetime.strptime(v, "%Y-%m-%d %H:%M:%S").timestamp()) ret[outerkey].update({innerkey: x}) else: ret[outerkey].update({innerkey: v}) return ret def economy_conv_spec(self, data: dict): flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data)) ret = {} for k, v in flatscoped.items(): outerkey, innerkey = (*k[:-1],), (k[-1],) if outerkey not in ret: ret[outerkey] = {} ret[outerkey].update({innerkey: v}) return ret def mod_log_cases(self, data: dict): raise NotImplementedError("This one isn't ready yet") def filter_conv_spec(self, data: dict): return {(Config.GUILD, k): {("filter",): v} for k, v in data.items()} def past_names_conv_spec(self, data: dict): return {(Config.USER, k): {("past_names",): v} for k, v in data.items()} def past_nicknames_conv_spec(self, data: dict): flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data)) ret = {} for config_identifiers, v2data in flatscoped.items(): if config_identifiers not in ret: ret[config_identifiers] = {} ret[config_identifiers].update({("past_nicks",): v2data}) return ret def customcom_conv_spec(self, data: dict): flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data)) ret = {} for k, v in flatscoped.items(): outerkey, innerkey = (*k[:-1],), ("commands", k[-1]) if outerkey not in ret: ret[outerkey] = {} ccinfo = { "author": {"id": 42, "name": "Converted from a v2 instance"}, "command": k[-1], "created_at": "{:%d/%m/%Y %H:%M:%S}".format(datetime.utcnow()), "editors": [], "response": v, } ret[outerkey].update({innerkey: ccinfo}) return ret def get_config_object(self, bot, cogname, attr, _id): try: config = getattr(bot.get_cog(cogname), attr) except (TypeError, AttributeError): config = Config.get_conf(None, _id, cog_name=cogname) return config def get_conversion_info(self, prettyname: str): info = self.available_core_conversions[prettyname] filepath, converter = info["file"], info["converter"] (cogname, attr, _id) = info["cfg"] return filepath, converter, cogname, attr, _id async def convert(self, bot: Red, prettyname: str, config=None): if prettyname not in self.available: raise NotImplementedError("No Conversion Specs for this") filepath, converter, cogname, attr, _id = self.get_conversion_info(prettyname) if config is None: config = self.get_config_object(bot, cogname, attr, _id) try: items = converter(dc.json_load(filepath)) await dc(config).dict_import(items) except Exception: raise else: self.resolved.add(prettyname)