mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 11:18:54 -05:00
The findcog command errored out if a repo was removed. This change informs the user that the repo is not installed instead of erroring out.
557 lines
19 KiB
Python
557 lines
19 KiB
Python
import asyncio
|
|
import contextlib
|
|
import os
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
from sys import path as syspath
|
|
from typing import Tuple, Union, Iterable
|
|
|
|
import discord
|
|
from redbot.core import checks, commands, Config
|
|
from redbot.core.bot import Red
|
|
from redbot.core.data_manager import cog_data_path
|
|
from redbot.core.i18n import Translator, cog_i18n
|
|
from redbot.core.utils.chat_formatting import box, pagify, humanize_list, inline
|
|
from redbot.core.utils.menus import start_adding_reactions
|
|
from redbot.core.utils.predicates import MessagePredicate, ReactionPredicate
|
|
|
|
from . import errors
|
|
from .checks import do_install_agreement
|
|
from .converters import InstalledCog
|
|
from .installable import Installable
|
|
from .log import log
|
|
from .repo_manager import RepoManager, Repo
|
|
|
|
_ = Translator("Downloader", __file__)
|
|
|
|
|
|
@cog_i18n(_)
|
|
class Downloader(commands.Cog):
|
|
def __init__(self, bot: Red):
|
|
super().__init__()
|
|
self.bot = bot
|
|
|
|
self.conf = Config.get_conf(self, identifier=998240343, force_registration=True)
|
|
|
|
self.conf.register_global(installed=[])
|
|
|
|
self.already_agreed = False
|
|
|
|
self.LIB_PATH = cog_data_path(self) / "lib"
|
|
self.SHAREDLIB_PATH = self.LIB_PATH / "cog_shared"
|
|
self.SHAREDLIB_INIT = self.SHAREDLIB_PATH / "__init__.py"
|
|
|
|
self.LIB_PATH.mkdir(parents=True, exist_ok=True)
|
|
self.SHAREDLIB_PATH.mkdir(parents=True, exist_ok=True)
|
|
if not self.SHAREDLIB_INIT.exists():
|
|
with self.SHAREDLIB_INIT.open(mode="w", encoding="utf-8") as _:
|
|
pass
|
|
|
|
if str(self.LIB_PATH) not in syspath:
|
|
syspath.insert(1, str(self.LIB_PATH))
|
|
|
|
self._repo_manager = RepoManager()
|
|
|
|
async def initialize(self):
|
|
await self._repo_manager.initialize()
|
|
|
|
async def cog_install_path(self):
|
|
"""Get the current cog install path.
|
|
|
|
Returns
|
|
-------
|
|
pathlib.Path
|
|
The default cog install path.
|
|
|
|
"""
|
|
return await self.bot.cog_mgr.install_path()
|
|
|
|
async def installed_cogs(self) -> Tuple[Installable]:
|
|
"""Get info on installed cogs.
|
|
|
|
Returns
|
|
-------
|
|
`tuple` of `Installable`
|
|
All installed cogs / shared lib directories.
|
|
|
|
"""
|
|
installed = await self.conf.installed()
|
|
# noinspection PyTypeChecker
|
|
return tuple(Installable.from_json(v, self._repo_manager) for v in installed)
|
|
|
|
async def _add_to_installed(self, cog: Installable):
|
|
"""Mark a cog as installed.
|
|
|
|
Parameters
|
|
----------
|
|
cog : Installable
|
|
The cog to check off.
|
|
|
|
"""
|
|
installed = await self.conf.installed()
|
|
cog_json = cog.to_json()
|
|
|
|
if cog_json not in installed:
|
|
installed.append(cog_json)
|
|
await self.conf.installed.set(installed)
|
|
|
|
async def _remove_from_installed(self, cog: Installable):
|
|
"""Remove a cog from the saved list of installed cogs.
|
|
|
|
Parameters
|
|
----------
|
|
cog : Installable
|
|
The cog to remove.
|
|
|
|
"""
|
|
installed = await self.conf.installed()
|
|
cog_json = cog.to_json()
|
|
|
|
if cog_json in installed:
|
|
installed.remove(cog_json)
|
|
await self.conf.installed.set(installed)
|
|
|
|
async def _reinstall_cogs(self, cogs: Iterable[Installable]) -> Tuple[Installable]:
|
|
"""
|
|
Installs a list of cogs, used when updating.
|
|
:param cogs:
|
|
:return: Any cogs that failed to copy
|
|
"""
|
|
failed = []
|
|
for cog in cogs:
|
|
if not await cog.copy_to(await self.cog_install_path()):
|
|
failed.append(cog)
|
|
|
|
# noinspection PyTypeChecker
|
|
return tuple(failed)
|
|
|
|
async def _reinstall_libraries(self, cogs: Iterable[Installable]) -> Tuple[Installable]:
|
|
"""
|
|
Reinstalls any shared libraries from the repos of cogs that
|
|
were updated.
|
|
:param cogs:
|
|
:return: Any libraries that failed to copy
|
|
"""
|
|
repo_names = set(cog.repo_name for cog in cogs)
|
|
unfiltered_repos = (self._repo_manager.get_repo(r) for r in repo_names)
|
|
repos = filter(lambda r: r is not None, unfiltered_repos)
|
|
|
|
failed = []
|
|
|
|
for repo in repos:
|
|
if not await repo.install_libraries(target_dir=self.SHAREDLIB_PATH):
|
|
failed.extend(repo.available_libraries)
|
|
|
|
# noinspection PyTypeChecker
|
|
return tuple(failed)
|
|
|
|
async def _reinstall_requirements(self, cogs: Iterable[Installable]) -> bool:
|
|
"""
|
|
Reinstalls requirements for given cogs that have been updated.
|
|
Returns a bool that indicates if all requirement installations
|
|
were successful.
|
|
:param cogs:
|
|
:return:
|
|
"""
|
|
|
|
# Reduces requirements to a single list with no repeats
|
|
requirements = set(r for c in cogs for r in c.requirements)
|
|
repo_names = self._repo_manager.get_all_repo_names()
|
|
repos = [(self._repo_manager.get_repo(rn), []) for rn in repo_names]
|
|
|
|
# This for loop distributes the requirements across all repos
|
|
# which will allow us to concurrently install requirements
|
|
for i, req in enumerate(requirements):
|
|
repo_index = i % len(repos)
|
|
repos[repo_index][1].append(req)
|
|
|
|
has_reqs = list(filter(lambda item: len(item[1]) > 0, repos))
|
|
|
|
ret = True
|
|
for repo, reqs in has_reqs:
|
|
for req in reqs:
|
|
# noinspection PyTypeChecker
|
|
ret = ret and await repo.install_raw_requirements([req], self.LIB_PATH)
|
|
return ret
|
|
|
|
@staticmethod
|
|
async def _delete_cog(target: Path):
|
|
"""
|
|
Removes an (installed) cog.
|
|
:param target: Path pointing to an existing file or directory
|
|
:return:
|
|
"""
|
|
if not target.exists():
|
|
return
|
|
|
|
if target.is_dir():
|
|
shutil.rmtree(str(target))
|
|
elif target.is_file():
|
|
os.remove(str(target))
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def pipinstall(self, ctx, *deps: str):
|
|
"""Install a group of dependencies using pip."""
|
|
repo = Repo("", "", "", Path.cwd(), loop=ctx.bot.loop)
|
|
success = await repo.install_raw_requirements(deps, self.LIB_PATH)
|
|
|
|
if success:
|
|
await ctx.send(_("Libraries installed."))
|
|
else:
|
|
await ctx.send(
|
|
_(
|
|
"Some libraries failed to install. Please check"
|
|
" your logs for a complete list."
|
|
)
|
|
)
|
|
|
|
@commands.group()
|
|
@checks.is_owner()
|
|
async def repo(self, ctx):
|
|
"""Repo management commands."""
|
|
pass
|
|
|
|
@repo.command(name="add")
|
|
async def _repo_add(self, ctx, name: str, repo_url: str, branch: str = None):
|
|
"""Add a new repo.
|
|
|
|
The name can only contain characters A-z, numbers and underscores.
|
|
The branch will be the default branch if not specified.
|
|
"""
|
|
agreed = await do_install_agreement(ctx)
|
|
if not agreed:
|
|
return
|
|
try:
|
|
# noinspection PyTypeChecker
|
|
repo = await self._repo_manager.add_repo(name=name, url=repo_url, branch=branch)
|
|
except errors.ExistingGitRepo:
|
|
await ctx.send(_("That git repo has already been added under another name."))
|
|
except errors.CloningError as err:
|
|
await ctx.send(_("Something went wrong during the cloning process."))
|
|
log.exception(
|
|
"Something went wrong whilst cloning %s (to revision: %s)",
|
|
repo_url,
|
|
branch,
|
|
exc_info=err,
|
|
)
|
|
else:
|
|
await ctx.send(_("Repo `{name}` successfully added.").format(name=name))
|
|
if repo.install_msg is not None:
|
|
await ctx.send(repo.install_msg.replace("[p]", ctx.prefix))
|
|
|
|
@repo.command(name="delete", aliases=["remove"], usage="<repo_name>")
|
|
async def _repo_del(self, ctx, repo: Repo):
|
|
"""Remove a repo and its files."""
|
|
await self._repo_manager.delete_repo(repo.name)
|
|
|
|
await ctx.send(
|
|
_("The repo `{repo.name}` has been deleted successfully.").format(repo=repo)
|
|
)
|
|
|
|
@repo.command(name="list")
|
|
async def _repo_list(self, ctx):
|
|
"""List all installed repos."""
|
|
repos = self._repo_manager.get_all_repo_names()
|
|
repos = sorted(repos, key=str.lower)
|
|
joined = _("Installed Repos:\n\n")
|
|
for repo_name in repos:
|
|
repo = self._repo_manager.get_repo(repo_name)
|
|
joined += "+ {}: {}\n".format(repo.name, repo.short or "")
|
|
|
|
for page in pagify(joined, ["\n"], shorten_by=16):
|
|
await ctx.send(box(page.lstrip(" "), lang="diff"))
|
|
|
|
@repo.command(name="info", usage="<repo_name>")
|
|
async def _repo_info(self, ctx, repo: Repo):
|
|
"""Show information about a repo."""
|
|
if repo is None:
|
|
await ctx.send(_("Repo `{repo.name}` not found.").format(repo=repo))
|
|
return
|
|
|
|
msg = _("Information on {repo.name}:\n{description}").format(
|
|
repo=repo, description=repo.description or ""
|
|
)
|
|
await ctx.send(box(msg))
|
|
|
|
@commands.group()
|
|
@checks.is_owner()
|
|
async def cog(self, ctx):
|
|
"""Cog installation management commands."""
|
|
pass
|
|
|
|
@cog.command(name="install", usage="<repo_name> <cog_name>")
|
|
async def _cog_install(self, ctx, repo: Repo, cog_name: str):
|
|
"""Install a cog from the given repo."""
|
|
cog: Installable = discord.utils.get(repo.available_cogs, name=cog_name)
|
|
if cog is None:
|
|
await ctx.send(
|
|
_(
|
|
"Error: there is no cog by the name of `{cog_name}` in the `{repo.name}` repo."
|
|
).format(cog_name=cog_name, repo=repo)
|
|
)
|
|
return
|
|
elif cog.min_python_version > sys.version_info:
|
|
await ctx.send(
|
|
_("This cog requires at least python version {version}, aborting install.").format(
|
|
version=".".join([str(n) for n in cog.min_python_version])
|
|
)
|
|
)
|
|
return
|
|
|
|
if not await repo.install_requirements(cog, self.LIB_PATH):
|
|
await ctx.send(
|
|
_(
|
|
"Failed to install the required libraries for `{cog_name}`: `{libraries}`"
|
|
).format(cog_name=cog.name, libraries=cog.requirements)
|
|
)
|
|
return
|
|
|
|
await repo.install_cog(cog, await self.cog_install_path())
|
|
|
|
await self._add_to_installed(cog)
|
|
|
|
await repo.install_libraries(self.SHAREDLIB_PATH)
|
|
|
|
await ctx.send(_("Cog `{cog_name}` successfully installed.").format(cog_name=cog_name))
|
|
if cog.install_msg is not None:
|
|
await ctx.send(cog.install_msg.replace("[p]", ctx.prefix))
|
|
|
|
@cog.command(name="uninstall", usage="<cog_name>")
|
|
async def _cog_uninstall(self, ctx, cog: InstalledCog):
|
|
"""Uninstall a cog.
|
|
|
|
You may only uninstall cogs which were previously installed
|
|
by Downloader.
|
|
"""
|
|
real_name = cog.name
|
|
|
|
poss_installed_path = (await self.cog_install_path()) / real_name
|
|
if poss_installed_path.exists():
|
|
ctx.bot.unload_extension(real_name)
|
|
await self._delete_cog(poss_installed_path)
|
|
await self._remove_from_installed(cog)
|
|
await ctx.send(
|
|
_("Cog `{cog_name}` was successfully uninstalled.").format(cog_name=real_name)
|
|
)
|
|
else:
|
|
await ctx.send(
|
|
_(
|
|
"That cog was installed but can no longer"
|
|
" be located. You may need to remove it's"
|
|
" files manually if it is still usable."
|
|
" Also make sure you've unloaded the cog"
|
|
" with `{prefix}unload {cog_name}`."
|
|
).format(prefix=ctx.prefix, cog_name=real_name)
|
|
)
|
|
|
|
@cog.command(name="update")
|
|
async def _cog_update(self, ctx, cog_name: InstalledCog = None):
|
|
"""Update all cogs, or one of your choosing."""
|
|
installed_cogs = set(await self.installed_cogs())
|
|
|
|
async with ctx.typing():
|
|
if cog_name is None:
|
|
updated = await self._repo_manager.update_all_repos()
|
|
|
|
else:
|
|
try:
|
|
updated = await self._repo_manager.update_repo(cog_name.repo_name)
|
|
except KeyError:
|
|
# Thrown if the repo no longer exists
|
|
updated = {}
|
|
|
|
updated_cogs = set(cog for repo in updated for cog in repo.available_cogs)
|
|
installed_and_updated = updated_cogs & installed_cogs
|
|
|
|
if installed_and_updated:
|
|
await self._reinstall_requirements(installed_and_updated)
|
|
await self._reinstall_cogs(installed_and_updated)
|
|
await self._reinstall_libraries(installed_and_updated)
|
|
message = _("Cog update completed successfully.")
|
|
|
|
cognames = {c.name for c in installed_and_updated}
|
|
message += _("\nUpdated: ") + humanize_list(tuple(map(inline, cognames)))
|
|
else:
|
|
await ctx.send(_("All installed cogs are already up to date."))
|
|
return
|
|
await ctx.send(message)
|
|
|
|
cognames &= set(ctx.bot.extensions.keys()) # only reload loaded cogs
|
|
if not cognames:
|
|
return await ctx.send(
|
|
_("None of the updated cogs were previously loaded. Update complete.")
|
|
)
|
|
message = _("Would you like to reload the updated cogs?")
|
|
can_react = ctx.channel.permissions_for(ctx.me).add_reactions
|
|
if not can_react:
|
|
message += " (y/n)"
|
|
query: discord.Message = await ctx.send(message)
|
|
if can_react:
|
|
# noinspection PyAsyncCall
|
|
start_adding_reactions(query, ReactionPredicate.YES_OR_NO_EMOJIS, ctx.bot.loop)
|
|
pred = ReactionPredicate.yes_or_no(query, ctx.author)
|
|
event = "reaction_add"
|
|
else:
|
|
pred = MessagePredicate.yes_or_no(ctx)
|
|
event = "message"
|
|
try:
|
|
await ctx.bot.wait_for(event, check=pred, timeout=30)
|
|
except asyncio.TimeoutError:
|
|
await query.delete()
|
|
return
|
|
|
|
if pred.result is True:
|
|
if can_react:
|
|
with contextlib.suppress(discord.Forbidden):
|
|
await query.clear_reactions()
|
|
await ctx.invoke(ctx.bot.get_cog("Core").reload, *cognames)
|
|
else:
|
|
if can_react:
|
|
await query.delete()
|
|
else:
|
|
await ctx.send(_("OK then."))
|
|
|
|
@cog.command(name="list", usage="<repo_name>")
|
|
async def _cog_list(self, ctx, repo: Repo):
|
|
"""List all available cogs from a single repo."""
|
|
installed = await self.installed_cogs()
|
|
installed_str = ""
|
|
if installed:
|
|
installed_str = _("Installed Cogs:\n") + "\n".join(
|
|
[
|
|
"- {}{}".format(i.name, ": {}".format(i.short) if i.short else "")
|
|
for i in installed
|
|
if i.repo_name == repo.name
|
|
]
|
|
)
|
|
cogs = repo.available_cogs
|
|
cogs = _("Available Cogs:\n") + "\n".join(
|
|
[
|
|
"+ {}: {}".format(c.name, c.short or "")
|
|
for c in cogs
|
|
if not (c.hidden or c in installed)
|
|
]
|
|
)
|
|
cogs = cogs + "\n\n" + installed_str
|
|
for page in pagify(cogs, ["\n"], shorten_by=16):
|
|
await ctx.send(box(page.lstrip(" "), lang="diff"))
|
|
|
|
@cog.command(name="info", usage="<repo_name> <cog_name>")
|
|
async def _cog_info(self, ctx, repo: Repo, cog_name: str):
|
|
"""List information about a single cog."""
|
|
cog = discord.utils.get(repo.available_cogs, name=cog_name)
|
|
if cog is None:
|
|
await ctx.send(
|
|
_("There is no cog `{cog_name}` in the repo `{repo.name}`").format(
|
|
cog_name=cog_name, repo=repo
|
|
)
|
|
)
|
|
return
|
|
|
|
msg = _(
|
|
"Information on {cog_name}:\n{description}\n\nRequirements: {requirements}"
|
|
).format(
|
|
cog_name=cog.name,
|
|
description=cog.description or "",
|
|
requirements=", ".join(cog.requirements) or "None",
|
|
)
|
|
await ctx.send(box(msg))
|
|
|
|
async def is_installed(
|
|
self, cog_name: str
|
|
) -> Union[Tuple[bool, Installable], Tuple[bool, None]]:
|
|
"""Check to see if a cog has been installed through Downloader.
|
|
|
|
Parameters
|
|
----------
|
|
cog_name : str
|
|
The name of the cog to check for.
|
|
|
|
Returns
|
|
-------
|
|
`tuple` of (`bool`, `Installable`)
|
|
:code:`(True, Installable)` if the cog is installed, else
|
|
:code:`(False, None)`.
|
|
|
|
"""
|
|
for installable in await self.installed_cogs():
|
|
if installable.name == cog_name:
|
|
return True, installable
|
|
return False, None
|
|
|
|
def format_findcog_info(
|
|
self, command_name: str, cog_installable: Union[Installable, object] = None
|
|
) -> str:
|
|
"""Format a cog's info for output to discord.
|
|
|
|
Parameters
|
|
----------
|
|
command_name : str
|
|
Name of the command which belongs to the cog.
|
|
cog_installable : `Installable` or `object`
|
|
Can be an `Installable` instance or a Cog instance.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
A formatted message for the user.
|
|
|
|
"""
|
|
if isinstance(cog_installable, Installable):
|
|
made_by = ", ".join(cog_installable.author) or _("Missing from info.json")
|
|
repo = self._repo_manager.get_repo(cog_installable.repo_name)
|
|
repo_url = _("Missing from installed repos") if repo is None else repo.url
|
|
cog_name = cog_installable.name
|
|
else:
|
|
made_by = "26 & co."
|
|
repo_url = "https://github.com/Cog-Creators/Red-DiscordBot"
|
|
cog_name = cog_installable.__class__.__name__
|
|
|
|
msg = _("Command: {command}\nMade by: {author}\nRepo: {repo}\nCog name: {cog}")
|
|
|
|
return msg.format(command=command_name, author=made_by, repo=repo_url, cog=cog_name)
|
|
|
|
def cog_name_from_instance(self, instance: object) -> str:
|
|
"""Determines the cog name that Downloader knows from the cog instance.
|
|
|
|
Probably.
|
|
|
|
Parameters
|
|
----------
|
|
instance : object
|
|
The cog instance.
|
|
|
|
Returns
|
|
-------
|
|
str
|
|
The name of the cog according to Downloader..
|
|
|
|
"""
|
|
splitted = instance.__module__.split(".")
|
|
return splitted[-2]
|
|
|
|
@commands.command()
|
|
async def findcog(self, ctx: commands.Context, command_name: str):
|
|
"""Find which cog a command comes from.
|
|
|
|
This will only work with loaded cogs.
|
|
"""
|
|
command = ctx.bot.all_commands.get(command_name)
|
|
|
|
if command is None:
|
|
await ctx.send(_("That command doesn't seem to exist."))
|
|
return
|
|
|
|
# Check if in installed cogs
|
|
cog_name = self.cog_name_from_instance(command.instance)
|
|
installed, cog_installable = await self.is_installed(cog_name)
|
|
if installed:
|
|
msg = self.format_findcog_info(command_name, cog_installable)
|
|
else:
|
|
# Assume it's in a base cog
|
|
msg = self.format_findcog_info(command_name, command.instance)
|
|
|
|
await ctx.send(box(msg))
|