mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
* Fix existing tests permanently modifying attributes * Add dataconverter tests file * Fix past nicks spec converter * Update gitignore for dataconverter data files * Add data file for dataconverter test * Simplify fix
175 lines
6.6 KiB
Python
175 lines
6.6 KiB
Python
from itertools import chain, starmap
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from redbot.core.bot import Red
|
|
from redbot.core.utils.data_converter import DataConverter as dc
|
|
from redbot.core.config import Config
|
|
|
|
|
|
class SpecResolver(object):
|
|
"""
|
|
Resolves Certain things for DataConverter
|
|
"""
|
|
|
|
def __init__(self, path: Path):
|
|
self.v2path = path
|
|
self.resolved = set()
|
|
self.available_core_conversions = {
|
|
"Bank Accounts": {
|
|
"cfg": ("Bank", None, 384734293238749),
|
|
"file": self.v2path / "data" / "economy" / "bank.json",
|
|
"converter": self.bank_accounts_conv_spec,
|
|
},
|
|
"Economy Settings": {
|
|
"cfg": ("Economy", "config", 1256844281),
|
|
"file": self.v2path / "data" / "economy" / "settings.json",
|
|
"converter": self.economy_conv_spec,
|
|
},
|
|
"Mod Log Cases": {
|
|
"cfg": ("ModLog", None, 1354799444),
|
|
"file": self.v2path / "data" / "mod" / "modlog.json",
|
|
"converter": None, # prevents from showing as available
|
|
},
|
|
"Filter": {
|
|
"cfg": ("Filter", "settings", 4766951341),
|
|
"file": self.v2path / "data" / "mod" / "filter.json",
|
|
"converter": self.filter_conv_spec,
|
|
},
|
|
"Past Names": {
|
|
"cfg": ("Mod", "settings", 4961522000),
|
|
"file": self.v2path / "data" / "mod" / "past_names.json",
|
|
"converter": self.past_names_conv_spec,
|
|
},
|
|
"Past Nicknames": {
|
|
"cfg": ("Mod", "settings", 4961522000),
|
|
"file": self.v2path / "data" / "mod" / "past_nicknames.json",
|
|
"converter": self.past_nicknames_conv_spec,
|
|
},
|
|
"Custom Commands": {
|
|
"cfg": ("CustomCommands", "config", 414589031223512),
|
|
"file": self.v2path / "data" / "customcom" / "commands.json",
|
|
"converter": self.customcom_conv_spec,
|
|
},
|
|
}
|
|
|
|
@property
|
|
def available(self):
|
|
return sorted(
|
|
k
|
|
for k, v in self.available_core_conversions.items()
|
|
if v["file"].is_file() and v["converter"] is not None and k not in self.resolved
|
|
)
|
|
|
|
def unpack(self, parent_key, parent_value):
|
|
"""Unpack one level of nesting in a dictionary"""
|
|
try:
|
|
items = parent_value.items()
|
|
except AttributeError:
|
|
yield (parent_key, parent_value)
|
|
else:
|
|
for key, value in items:
|
|
yield (parent_key + (key,), value)
|
|
|
|
def flatten_dict(self, dictionary: dict):
|
|
"""Flatten a nested dictionary structure"""
|
|
dictionary = {(key,): value for key, value in dictionary.items()}
|
|
while True:
|
|
dictionary = dict(chain.from_iterable(starmap(self.unpack, dictionary.items())))
|
|
if not any(isinstance(value, dict) for value in dictionary.values()):
|
|
break
|
|
return dictionary
|
|
|
|
def apply_scope(self, scope: str, data: dict):
|
|
return {(scope,) + k: v for k, v in data.items()}
|
|
|
|
def bank_accounts_conv_spec(self, data: dict):
|
|
flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data))
|
|
ret = {}
|
|
for k, v in flatscoped.items():
|
|
outerkey, innerkey = tuple(k[:-1]), (k[-1],)
|
|
if outerkey not in ret:
|
|
ret[outerkey] = {}
|
|
if innerkey[0] == "created_at":
|
|
x = int(datetime.strptime(v, "%Y-%m-%d %H:%M:%S").timestamp())
|
|
ret[outerkey].update({innerkey: x})
|
|
else:
|
|
ret[outerkey].update({innerkey: v})
|
|
return ret
|
|
|
|
def economy_conv_spec(self, data: dict):
|
|
flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data))
|
|
ret = {}
|
|
for k, v in flatscoped.items():
|
|
outerkey, innerkey = (*k[:-1],), (k[-1],)
|
|
if outerkey not in ret:
|
|
ret[outerkey] = {}
|
|
ret[outerkey].update({innerkey: v})
|
|
return ret
|
|
|
|
def mod_log_cases(self, data: dict):
|
|
raise NotImplementedError("This one isn't ready yet")
|
|
|
|
def filter_conv_spec(self, data: dict):
|
|
return {(Config.GUILD, k): {("filter",): v} for k, v in data.items()}
|
|
|
|
def past_names_conv_spec(self, data: dict):
|
|
return {(Config.USER, k): {("past_names",): v} for k, v in data.items()}
|
|
|
|
def past_nicknames_conv_spec(self, data: dict):
|
|
flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data))
|
|
ret = {}
|
|
for config_identifiers, v2data in flatscoped.items():
|
|
if config_identifiers not in ret:
|
|
ret[config_identifiers] = {}
|
|
ret[config_identifiers].update({("past_nicks",): v2data})
|
|
return ret
|
|
|
|
def customcom_conv_spec(self, data: dict):
|
|
flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data))
|
|
ret = {}
|
|
for k, v in flatscoped.items():
|
|
outerkey, innerkey = (*k[:-1],), ("commands", k[-1])
|
|
if outerkey not in ret:
|
|
ret[outerkey] = {}
|
|
|
|
ccinfo = {
|
|
"author": {"id": 42, "name": "Converted from a v2 instance"},
|
|
"command": k[-1],
|
|
"created_at": "{:%d/%m/%Y %H:%M:%S}".format(datetime.utcnow()),
|
|
"editors": [],
|
|
"response": v,
|
|
}
|
|
ret[outerkey].update({innerkey: ccinfo})
|
|
return ret
|
|
|
|
def get_config_object(self, bot, cogname, attr, _id):
|
|
try:
|
|
config = getattr(bot.get_cog(cogname), attr)
|
|
except (TypeError, AttributeError):
|
|
config = Config.get_conf(None, _id, cog_name=cogname)
|
|
|
|
return config
|
|
|
|
def get_conversion_info(self, prettyname: str):
|
|
info = self.available_core_conversions[prettyname]
|
|
filepath, converter = info["file"], info["converter"]
|
|
(cogname, attr, _id) = info["cfg"]
|
|
return filepath, converter, cogname, attr, _id
|
|
|
|
async def convert(self, bot: Red, prettyname: str, config=None):
|
|
if prettyname not in self.available:
|
|
raise NotImplementedError("No Conversion Specs for this")
|
|
|
|
filepath, converter, cogname, attr, _id = self.get_conversion_info(prettyname)
|
|
if config is None:
|
|
config = self.get_config_object(bot, cogname, attr, _id)
|
|
|
|
try:
|
|
items = converter(dc.json_load(filepath))
|
|
await dc(config).dict_import(items)
|
|
except Exception:
|
|
raise
|
|
else:
|
|
self.resolved.add(prettyname)
|