mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
331 lines
11 KiB
Python
331 lines
11 KiB
Python
import os
|
|
import shutil
|
|
from typing import Tuple
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from pathlib import Path
|
|
from sys import path as syspath
|
|
|
|
from core import Config
|
|
from core.bot import Red
|
|
from core import checks
|
|
from core.utils.chat_formatting import box
|
|
|
|
from .repo_manager import RepoManager, Repo
|
|
from .installable import Installable
|
|
from .converters import RepoName, InstalledCog
|
|
from .log import log
|
|
from .errors import CloningError, ExistingGitRepo
|
|
from .checks import install_agreement
|
|
|
|
|
|
class Downloader:
|
|
|
|
COG_PATH = Path.cwd() / "cogs"
|
|
LIB_PATH = Path.cwd() / "lib"
|
|
SHAREDLIB_PATH = LIB_PATH / "cog_shared"
|
|
SHAREDLIB_INIT = SHAREDLIB_PATH / "__init__.py"
|
|
|
|
def __init__(self, bot: Red):
|
|
self.bot = bot
|
|
|
|
self.conf = Config.get_conf(self, unique_identifier=998240343,
|
|
force_registration=True)
|
|
|
|
self.conf.register_global(
|
|
repos={},
|
|
installed=[]
|
|
)
|
|
|
|
self.already_agreed = False
|
|
|
|
self.LIB_PATH.mkdir(parents=True, exist_ok=True)
|
|
self.SHAREDLIB_PATH.mkdir(parents=True, exist_ok=True)
|
|
if not self.SHAREDLIB_INIT.exists():
|
|
with self.SHAREDLIB_INIT.open(mode='w') as _:
|
|
pass
|
|
|
|
if str(self.LIB_PATH) not in syspath:
|
|
syspath.insert(1, str(self.LIB_PATH))
|
|
|
|
self._repo_manager = RepoManager(self.conf)
|
|
|
|
@property
|
|
def installed_cogs(self) -> Tuple[Installable]:
|
|
"""
|
|
Returns the dictionary mapping cog name to install location
|
|
and repo name.
|
|
:return:
|
|
"""
|
|
installed = self.conf.installed()
|
|
# noinspection PyTypeChecker
|
|
return tuple(Installable.from_json(v) for v in installed)
|
|
|
|
async def _add_to_installed(self, cog: Installable):
|
|
"""
|
|
Marks a cog as installed.
|
|
:param cog:
|
|
:return:
|
|
"""
|
|
installed = self.conf.installed()
|
|
cog_json = cog.to_json()
|
|
|
|
if cog_json not in installed:
|
|
installed.append(cog_json)
|
|
await self.conf.set("installed", installed)
|
|
|
|
async def _remove_from_installed(self, cog: Installable):
|
|
"""
|
|
Removes a cog from the saved list of installed cogs.
|
|
:param cog:
|
|
:return:
|
|
"""
|
|
installed = self.conf.installed()
|
|
cog_json = cog.to_json()
|
|
|
|
if cog_json in installed:
|
|
installed.remove(cog_json)
|
|
await self.conf.set("installed", installed)
|
|
|
|
async def _reinstall_cogs(self, cogs: Tuple[Installable]) -> Tuple[Installable]:
|
|
"""
|
|
Installs a list of cogs, used when updating.
|
|
:param cogs:
|
|
:return: Any cogs that failed to copy
|
|
"""
|
|
failed = []
|
|
for cog in cogs:
|
|
if not await cog.copy_to(self.COG_PATH):
|
|
failed.append(cog)
|
|
|
|
# noinspection PyTypeChecker
|
|
return tuple(failed)
|
|
|
|
async def _reinstall_libraries(self, cogs: Tuple[Installable]) -> Tuple[Installable]:
|
|
"""
|
|
Reinstalls any shared libraries from the repos of cogs that
|
|
were updated.
|
|
:param cogs:
|
|
:return: Any libraries that failed to copy
|
|
"""
|
|
repo_names = set(cog.repo_name for cog in cogs)
|
|
unfiltered_repos = (self._repo_manager.get_repo(r) for r in repo_names)
|
|
repos = filter(lambda r: r is not None, unfiltered_repos)
|
|
|
|
failed = []
|
|
|
|
for repo in repos:
|
|
if not await repo.install_libraries(target_dir=self.SHAREDLIB_PATH):
|
|
failed.extend(repo.available_libraries)
|
|
|
|
# noinspection PyTypeChecker
|
|
return tuple(failed)
|
|
|
|
async def _reinstall_requirements(self, cogs: Tuple[Installable]) -> bool:
|
|
"""
|
|
Reinstalls requirements for given cogs that have been updated.
|
|
Returns a bool that indicates if all requirement installations
|
|
were successful.
|
|
:param cogs:
|
|
:return:
|
|
"""
|
|
|
|
# Reduces requirements to a single list with no repeats
|
|
requirements = set(r for c in cogs for r in c.requirements)
|
|
repo_names = self._repo_manager.get_all_repo_names()
|
|
repos = [(self._repo_manager.get_repo(rn), []) for rn in repo_names]
|
|
|
|
# This for loop distributes the requirements across all repos
|
|
# which will allow us to concurrently install requirements
|
|
for i, req in enumerate(requirements):
|
|
repo_index = i % len(repos)
|
|
repos[repo_index][1].append(req)
|
|
|
|
has_reqs = list(filter(lambda item: len(item[1]) > 0, repos))
|
|
|
|
ret = True
|
|
for repo, reqs in has_reqs:
|
|
for req in reqs:
|
|
# noinspection PyTypeChecker
|
|
ret = ret and await repo.install_raw_requirements([req, ], self.LIB_PATH)
|
|
return ret
|
|
|
|
@staticmethod
|
|
async def _delete_cog(target: Path):
|
|
"""
|
|
Removes an (installed) cog.
|
|
:param target: Path pointing to an existing file or directory
|
|
:return:
|
|
"""
|
|
if not target.exists():
|
|
return
|
|
|
|
if target.is_dir():
|
|
shutil.rmtree(str(target))
|
|
elif target.is_file():
|
|
os.remove(str(target))
|
|
|
|
@commands.group()
|
|
@checks.is_owner()
|
|
async def repo(self, ctx):
|
|
"""
|
|
Command group for managing Downloader repos.
|
|
"""
|
|
if ctx.invoked_subcommand is None:
|
|
await self.bot.send_cmd_help(ctx)
|
|
|
|
@repo.command(name="add")
|
|
@install_agreement()
|
|
async def _repo_add(self, ctx, name: RepoName, repo_url: str, branch: str=None):
|
|
"""
|
|
Add a new repo to Downloader.
|
|
:param name: Name that must follow python variable naming rules and
|
|
contain only characters A-Z and numbers 0-9 and _
|
|
:param repo_url: Clone url for the cog repo
|
|
:param branch: Specify branch if you want it to be different than
|
|
repo default.
|
|
"""
|
|
try:
|
|
# noinspection PyTypeChecker
|
|
await self._repo_manager.add_repo(
|
|
name=name,
|
|
url=repo_url,
|
|
branch=branch
|
|
)
|
|
except ExistingGitRepo:
|
|
await ctx.send("That git repo has already been added under another name.")
|
|
except CloningError:
|
|
await ctx.send("Something went wrong during the cloning process.")
|
|
log.exception("Something went wrong during the cloning process.")
|
|
else:
|
|
await ctx.send("Repo `{}` successfully added.".format(name))
|
|
|
|
@repo.command(name="del")
|
|
async def _repo_del(self, ctx, repo_name: Repo):
|
|
"""
|
|
Removes a repo from Downloader and its' files.
|
|
:param repo_name: Repo name in Downloader
|
|
"""
|
|
await self._repo_manager.delete_repo(repo_name.name)
|
|
|
|
await ctx.send("The repo `{}` has been deleted successfully.".format(repo_name.name))
|
|
|
|
@repo.command(name="list")
|
|
async def _repo_list(self, ctx):
|
|
"""
|
|
Lists all installed repos.
|
|
"""
|
|
repos = self._repo_manager.get_all_repo_names()
|
|
joined = "Installed Repos:\n" + "\n".join(["+ " + r for r in repos])
|
|
|
|
await ctx.send(box(joined, lang="diff"))
|
|
|
|
@commands.group()
|
|
@checks.is_owner()
|
|
async def cog(self, ctx):
|
|
"""
|
|
Command group for managing installable Cogs.
|
|
"""
|
|
if ctx.invoked_subcommand is None:
|
|
await self.bot.send_cmd_help(ctx)
|
|
|
|
@cog.command(name="install")
|
|
async def _cog_install(self, ctx, repo_name: Repo, cog_name: str):
|
|
"""
|
|
Installs a cog from the given repo.
|
|
:param repo_name:
|
|
:param cog_name: Cog name available from `[p]cog list <repo_name>`
|
|
"""
|
|
cog = discord.utils.get(repo_name.available_cogs, name=cog_name)
|
|
if cog is None:
|
|
await ctx.send("Error, there is no cog by the name of"
|
|
" `{}` in the `{}` repo.".format(cog_name, repo_name.name))
|
|
return
|
|
|
|
if not await repo_name.install_requirements(cog, self.LIB_PATH):
|
|
await ctx.send("Failed to install the required libraries for"
|
|
" `{}`: `{}`".format(cog.name, cog.requirements))
|
|
return
|
|
|
|
await repo_name.install_cog(cog, self.COG_PATH)
|
|
|
|
await self._add_to_installed(cog)
|
|
|
|
await repo_name.install_libraries(self.SHAREDLIB_PATH)
|
|
|
|
await ctx.send("`{}` cog successfully installed.".format(cog_name))
|
|
|
|
@cog.command(name="uninstall")
|
|
async def _cog_uninstall(self, ctx, cog_name: InstalledCog):
|
|
"""
|
|
Allows you to uninstall cogs that were previously installed
|
|
through Downloader.
|
|
:param cog_name:
|
|
"""
|
|
# noinspection PyUnresolvedReferences,PyProtectedMember
|
|
real_name = cog_name.name
|
|
|
|
poss_installed_path = self.COG_PATH / real_name
|
|
if poss_installed_path.exists():
|
|
await self._delete_cog(poss_installed_path)
|
|
# noinspection PyTypeChecker
|
|
await self._remove_from_installed(cog_name)
|
|
await ctx.send("`{}` was successfully removed.".format(real_name))
|
|
else:
|
|
await ctx.send("That cog was installed but can no longer"
|
|
" be located. You may need to remove it's"
|
|
" files manually if it is still usable.")
|
|
|
|
@cog.command(name="update")
|
|
async def _cog_update(self, ctx, cog_name: InstalledCog=None):
|
|
"""
|
|
Updates all cogs or one of your choosing.
|
|
:param cog_name:
|
|
"""
|
|
if cog_name is None:
|
|
updated = await self._repo_manager.update_all_repos()
|
|
installed_cogs = set(self.installed_cogs)
|
|
updated_cogs = set(cog for repo in updated.keys() for cog in repo.available_cogs)
|
|
|
|
installed_and_updated = updated_cogs & installed_cogs
|
|
|
|
# noinspection PyTypeChecker
|
|
await self._reinstall_requirements(installed_and_updated)
|
|
|
|
# noinspection PyTypeChecker
|
|
await self._reinstall_cogs(installed_and_updated)
|
|
|
|
# noinspection PyTypeChecker
|
|
await self._reinstall_libraries(installed_and_updated)
|
|
await ctx.send("Cog update completed successfully.")
|
|
|
|
@cog.command(name="list")
|
|
async def _cog_list(self, ctx, repo_name: Repo):
|
|
"""
|
|
Lists all available cogs from a single repo.
|
|
:param repo_name: Repo name available from `[p]repo list`
|
|
"""
|
|
cogs = repo_name.available_cogs
|
|
cogs = "Available Cogs:\n" + "\n".join(
|
|
["+ {}: {}".format(c.name, c.short or "") for c in cogs])
|
|
|
|
await ctx.send(box(cogs, lang="diff"))
|
|
|
|
@cog.command(name="info")
|
|
async def _cog_info(self, ctx, repo_name: Repo, cog_name: str):
|
|
"""
|
|
Lists information about a single cog.
|
|
:param repo_name:
|
|
:param cog_name:
|
|
"""
|
|
cog = discord.utils.get(repo_name.available_cogs, name=cog_name)
|
|
if cog is None:
|
|
await ctx.send("There is no cog `{}` in the repo `{}`".format(
|
|
cog_name, repo_name.name
|
|
))
|
|
return
|
|
|
|
msg = "Information on {}:\n{}".format(cog.name, cog.description or "")
|
|
await ctx.send(box(msg))
|