Michael H b88b5a2601 [V3] Update code standards (black code format pass) (#1650)
* ran black: code formatter against `redbot/` with `-l 99`

* badge
2018-05-14 15:33:24 -04:00

166 lines
6.2 KiB
Python

from itertools import chain, starmap
from pathlib import Path
from datetime import datetime
from redbot.core.bot import Red
from redbot.core.utils.data_converter import DataConverter as dc
from redbot.core.config import Config
class SpecResolver(object):
"""
Resolves Certain things for DataConverter
"""
def __init__(self, path: Path):
self.v2path = path
self.resolved = set()
self.available_core_conversions = {
"Bank Accounts": {
"cfg": ("Bank", None, 384734293238749),
"file": self.v2path / "data" / "economy" / "bank.json",
"converter": self.bank_accounts_conv_spec,
},
"Economy Settings": {
"cfg": ("Economy", "config", 1256844281),
"file": self.v2path / "data" / "economy" / "settings.json",
"converter": self.economy_conv_spec,
},
"Mod Log Cases": {
"cfg": ("ModLog", None, 1354799444),
"file": self.v2path / "data" / "mod" / "modlog.json",
"converter": None, # prevents from showing as available
},
"Filter": {
"cfg": ("Filter", "settings", 4766951341),
"file": self.v2path / "data" / "mod" / "filter.json",
"converter": self.filter_conv_spec,
},
"Past Names": {
"cfg": ("Mod", "settings", 4961522000),
"file": self.v2path / "data" / "mod" / "past_names.json",
"converter": self.past_names_conv_spec,
},
"Past Nicknames": {
"cfg": ("Mod", "settings", 4961522000),
"file": self.v2path / "data" / "mod" / "past_nicknames.json",
"converter": self.past_nicknames_conv_spec,
},
"Custom Commands": {
"cfg": ("CustomCommands", "config", 414589031223512),
"file": self.v2path / "data" / "customcom" / "commands.json",
"converter": self.customcom_conv_spec,
},
}
@property
def available(self):
return sorted(
k
for k, v in self.available_core_conversions.items()
if v["file"].is_file() and v["converter"] is not None and k not in self.resolved
)
def unpack(self, parent_key, parent_value):
"""Unpack one level of nesting in a dictionary"""
try:
items = parent_value.items()
except AttributeError:
yield (parent_key, parent_value)
else:
for key, value in items:
yield (parent_key + (key,), value)
def flatten_dict(self, dictionary: dict):
"""Flatten a nested dictionary structure"""
dictionary = {(key,): value for key, value in dictionary.items()}
while True:
dictionary = dict(chain.from_iterable(starmap(self.unpack, dictionary.items())))
if not any(isinstance(value, dict) for value in dictionary.values()):
break
return dictionary
def apply_scope(self, scope: str, data: dict):
return {(scope,) + k: v for k, v in data.items()}
def bank_accounts_conv_spec(self, data: dict):
flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data))
ret = {}
for k, v in flatscoped.items():
outerkey, innerkey = tuple(k[:-1]), (k[-1],)
if outerkey not in ret:
ret[outerkey] = {}
if innerkey[0] == "created_at":
x = int(datetime.strptime(v, "%Y-%m-%d %H:%M:%S").timestamp())
ret[outerkey].update({innerkey: x})
else:
ret[outerkey].update({innerkey: v})
return ret
def economy_conv_spec(self, data: dict):
flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data))
ret = {}
for k, v in flatscoped.items():
outerkey, innerkey = (*k[:-1],), (k[-1],)
if outerkey not in ret:
ret[outerkey] = {}
ret[outerkey].update({innerkey: v})
return ret
def mod_log_cases(self, data: dict):
raise NotImplementedError("This one isn't ready yet")
def filter_conv_spec(self, data: dict):
return {(Config.GUILD, k): {("filter",): v} for k, v in data.items()}
def past_names_conv_spec(self, data: dict):
return {(Config.USER, k): {("past_names",): v} for k, v in data.items()}
def past_nicknames_conv_spec(self, data: dict):
flatscoped = self.apply_scope(Config.MEMBER, self.flatten_dict(data))
ret = {}
for k, v in flatscoped.items():
outerkey, innerkey = (*k[:-1],), (k[-1],)
if outerkey not in ret:
ret[outerkey] = {}
ret[outerkey].update({innerkey: v})
return ret
def customcom_conv_spec(self, data: dict):
flatscoped = self.apply_scope(Config.GUILD, self.flatten_dict(data))
ret = {}
for k, v in flatscoped.items():
outerkey, innerkey = (*k[:-1],), ("commands", k[-1])
if outerkey not in ret:
ret[outerkey] = {}
ccinfo = {
"author": {"id": 42, "name": "Converted from a v2 instance"},
"command": k[-1],
"created_at": "{:%d/%m/%Y %H:%M:%S}".format(datetime.utcnow()),
"editors": [],
"response": v,
}
ret[outerkey].update({innerkey: ccinfo})
return ret
async def convert(self, bot: Red, prettyname: str):
if prettyname not in self.available:
raise NotImplementedError("No Conversion Specs for this")
info = self.available_core_conversions[prettyname]
filepath, converter = info["file"], info["converter"]
(cogname, attr, _id) = info["cfg"]
try:
config = getattr(bot.get_cog(cogname), attr)
except (TypeError, AttributeError):
config = Config.get_conf(None, _id, cog_name=cogname)
try:
items = converter(dc.json_load(filepath))
await dc(config).dict_import(items)
except Exception:
raise
else:
self.resolved.add(prettyname)