Red-DiscordBot/cogs/downloader/repo_manager.py
Will 3d76f3a787 [Core V3] Make the bot data path configurable (#879)
* Initial commit

* Fix sentry

* Make cog manager install path work relative to the bot's dir

* Fix downloader to save data relative to the defined data folder

* Fix sentry test

* Fix downloader tests

* Change logfile location

* Add another line to codeowners

* Basic tests

* Fix versioning

* Add in FutureWarning for config file changes

* Add reference to issue
2017-08-20 15:49:51 -04:00

561 lines
18 KiB
Python

import asyncio
import json
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Tuple, MutableMapping, Union
from subprocess import run as sp_run, PIPE
from sys import executable
import pkgutil
import shutil
import functools
from discord.ext import commands
from core import Config
from core import data_manager
from .errors import *
from .installable import Installable, InstallableType
from .log import log
from .json_mixins import RepoJSONMixin
class Repo(RepoJSONMixin):
GIT_CLONE = "git clone -b {branch} {url} {folder}"
GIT_CLONE_NO_BRANCH = "git clone {url} {folder}"
GIT_CURRENT_BRANCH = "git -C {path} rev-parse --abbrev-ref HEAD"
GIT_LATEST_COMMIT = "git -C {path} rev-parse {branch}"
GIT_HARD_RESET = "git -C {path} reset --hard origin/{branch} -q"
GIT_PULL = "git -C {path} pull -q --ff-only"
GIT_DIFF_FILE_STATUS = ("git -C {path} diff --no-commit-id --name-status"
" {old_hash} {new_hash}")
GIT_LOG = ("git -C {path} log --relative-date --reverse {old_hash}.."
" {relative_file_path}")
PIP_INSTALL = "{python} -m pip install -U -t {target_dir} {reqs}"
def __init__(self, name: str, url: str, branch: str, folder_path: Path,
available_modules: Tuple[Installable]=(), loop: asyncio.AbstractEventLoop=None):
self.url = url
self.branch = branch
self.name = name
self.folder_path = folder_path
self.folder_path.mkdir(parents=True, exist_ok=True)
super().__init__(self.folder_path)
self.available_modules = available_modules
self._executor = ThreadPoolExecutor(1)
self._repo_lock = asyncio.Lock()
self._loop = loop
if self._loop is None:
self._loop = asyncio.get_event_loop()
@classmethod
async def convert(cls, ctx: commands.Context, argument: str):
downloader_cog = ctx.bot.get_cog("Downloader")
if downloader_cog is None:
raise commands.CommandError("No Downloader cog found.")
# noinspection PyProtectedMember
repo_manager = downloader_cog._repo_manager
poss_repo = repo_manager.get_repo(argument)
if poss_repo is None:
raise commands.BadArgument("Repo by the name {} does not exist.".format(argument))
return poss_repo
def _existing_git_repo(self) -> (bool, Path):
git_path = self.folder_path / '.git'
return git_path.exists(), git_path
async def _get_file_update_statuses(
self, old_hash: str, new_hash: str) -> MutableMapping[str, str]:
"""
Gets the file update status letters for each changed file between
the two hashes.
:param old_hash: Pre-update
:param new_hash: Post-update
:return: Mapping of filename -> status_letter
"""
p = await self._run(
self.GIT_DIFF_FILE_STATUS.format(
path=self.folder_path,
old_hash=old_hash,
new_hash=new_hash
)
)
if p.returncode != 0:
raise GitDiffError("Git diff failed for repo at path:"
" {}".format(self.folder_path))
stdout = p.stdout.strip().decode().split('\n')
ret = {}
for filename in stdout:
# TODO: filter these filenames by ones in self.available_modules
status, _, filepath = filename.partition('\t')
ret[filepath] = status
return ret
async def _get_commit_notes(self, old_commit_hash: str,
relative_file_path: str) -> str:
"""
Gets the commit notes from git log.
:param old_commit_hash: Point in time to start getting messages
:param relative_file_path: Path relative to the repo folder of the file
to get messages for.
:return: Git commit note log
"""
p = await self._run(
self.GIT_LOG.format(
path=self.folder_path,
old_hash=old_commit_hash,
relative_file_path=relative_file_path
)
)
if p.returncode != 0:
raise GitException("An exception occurred while executing git log on"
" this repo: {}".format(self.folder_path))
return p.stdout.decode().strip()
def _update_available_modules(self) -> Tuple[str]:
"""
Updates the available modules attribute for this repo.
:return: List of available modules.
"""
curr_modules = []
"""
for name in self.folder_path.iterdir():
if name.is_dir():
spec = importlib.util.spec_from_file_location(
name.stem, location=str(name.parent)
)
if spec is not None:
curr_modules.append(
Installable(location=name)
)
"""
for file_finder, name, is_pkg in pkgutil.walk_packages(path=[str(self.folder_path), ]):
curr_modules.append(
Installable(location=self.folder_path / name)
)
self.available_modules = curr_modules
# noinspection PyTypeChecker
return tuple(self.available_modules)
async def _run(self, *args, **kwargs):
env = os.environ.copy()
env['GIT_TERMINAL_PROMPT'] = '0'
kwargs['env'] = env
async with self._repo_lock:
return await self._loop.run_in_executor(
self._executor,
functools.partial(sp_run, *args, stdout=PIPE, **kwargs)
)
async def clone(self) -> Tuple[str]:
"""
Clones a new repo.
:return: List of available module names from this repo.
"""
exists, path = self._existing_git_repo()
if exists:
raise ExistingGitRepo(
"A git repo already exists at path: {}".format(path)
)
if self.branch is not None:
p = await self._run(
self.GIT_CLONE.format(
branch=self.branch,
url=self.url,
folder=self.folder_path
).split()
)
else:
p = await self._run(
self.GIT_CLONE_NO_BRANCH.format(
url=self.url,
folder=self.folder_path
).split()
)
if p.returncode != 0:
raise CloningError("Error when running git clone.")
if self.branch is None:
self.branch = await self.current_branch()
self._read_info_file()
return self._update_available_modules()
async def current_branch(self) -> str:
"""
Determines the current branch using git commands.
:return: Current branch name
"""
exists, _ = self._existing_git_repo()
if not exists:
raise MissingGitRepo(
"A git repo does not exist at path: {}".format(self.folder_path)
)
p = await self._run(
self.GIT_CURRENT_BRANCH.format(
path=self.folder_path
).split()
)
if p.returncode != 0:
raise GitException("Could not determine current branch"
" at path: {}".format(self.folder_path))
return p.stdout.decode().strip()
async def current_commit(self, branch: str=None) -> str:
"""
Determines the current commit hash of the repo.
:param branch: Override for repo's branch attribute
:return: Commit hash string
"""
if branch is None:
branch = self.branch
exists, _ = self._existing_git_repo()
if not exists:
raise MissingGitRepo(
"A git repo does not exist at path: {}".format(self.folder_path)
)
p = await self._run(
self.GIT_LATEST_COMMIT.format(
path=self.folder_path,
branch=branch
).split()
)
if p.returncode != 0:
raise CurrentHashError("Unable to determine old commit hash.")
return p.stdout.decode().strip()
async def hard_reset(self, branch: str=None) -> None:
"""
Performs a hard reset on the current repo.
:param branch: Override for repo branch attribute.
"""
if branch is None:
branch = self.branch
exists, _ = self._existing_git_repo()
if not exists:
raise MissingGitRepo(
"A git repo does not exist at path: {}".format(self.folder_path)
)
p = await self._run(
self.GIT_HARD_RESET.format(
path=self.folder_path,
branch=branch
).split()
)
if p.returncode != 0:
raise HardResetError("Some error occurred when trying to"
" execute a hard reset on the repo at"
" the following path: {}".format(self.folder_path))
async def update(self) -> (str, str):
"""
Updates the current branch of this repo.
:return: tuple of (old commit hash, new commit hash)
:rtype: tuple
"""
curr_branch = await self.current_branch()
old_commit = await self.current_commit(branch=curr_branch)
await self.hard_reset(branch=curr_branch)
p = await self._run(
self.GIT_PULL.format(
path=self.folder_path
).split()
)
if p.returncode != 0:
raise UpdateError("Git pull returned a non zero exit code"
" for the repo located at path: {}".format(self.folder_path))
new_commit = await self.current_commit(branch=curr_branch)
self._update_available_modules()
self._read_info_file()
return old_commit, new_commit
async def install_cog(self, cog: Installable, target_dir: Path) -> bool:
"""
Copies a cog to the target directory.
:param Installable cog: Cog to install.
:param pathlib.Path target_dir: Directory to install the cog in.
:return: Installation success status.
:rtype: bool
"""
if cog not in self.available_cogs:
raise DownloaderException("That cog does not exist in this repo")
if not target_dir.is_dir():
raise ValueError("That target directory is not actually a directory.")
if not target_dir.exists():
raise ValueError("That target directory does not exist.")
return await cog.copy_to(target_dir=target_dir)
async def install_libraries(self, target_dir: Path, libraries: Tuple[Installable]=()) -> bool:
"""
Copies all shared libraries (or a given subset) to the target
directory.
:param pathlib.Path target_dir: Directory to install shared libraries to.
:param tuple(Installable) libraries: A subset of available libraries.
:return: Status of all installs.
:rtype: bool
"""
if libraries:
if not all([i in self.available_libraries for i in libraries]):
raise ValueError("Some given libraries are not available in this repo.")
else:
libraries = self.available_libraries
if libraries:
return all([lib.copy_to(target_dir=target_dir) for lib in libraries])
return True
async def install_requirements(self, cog: Installable, target_dir: Path) -> bool:
"""
Installs the requirements defined by the requirements
attribute on the cog object and puts them in the given
target directory.
:param Installable cog: Cog for which to install requirements.
:param pathlib.Path target_dir: Path to which to install requirements.
:return: Status of requirements install.
:rtype: bool
"""
if not target_dir.is_dir():
raise ValueError("Target directory is not a directory.")
target_dir.mkdir(parents=True, exist_ok=True)
return await self.install_raw_requirements(cog.requirements, target_dir)
async def install_raw_requirements(self, requirements: Tuple[str], target_dir: Path) -> bool:
"""
Installs a list of requirements using pip and places them into
the given target directory.
:param tuple(str) requirements: List of requirement names to install via pip.
:param pathlib.Path target_dir: Directory to install requirements to.
:return: Status of all requirements install.
:rtype: bool
"""
if len(requirements) == 0:
return True
# TODO: Check and see if any of these modules are already available
p = await self._run(
self.PIP_INSTALL.format(
python=executable,
target_dir=target_dir,
reqs=" ".join(requirements)
).split()
)
if p.returncode != 0:
log.error("Something went wrong when installing"
" the following requirements:"
" {}".format(", ".join(requirements)))
return False
return True
@property
def available_cogs(self) -> Tuple[Installable]:
"""
Returns a list of available cogs (not shared libraries and not hidden).
:rtype: tuple(Installable)
"""
# noinspection PyTypeChecker
return tuple(
[m for m in self.available_modules
if m.type == InstallableType.COG and not m.hidden]
)
@property
def available_libraries(self) -> Tuple[Installable]:
"""
Returns a list of available shared libraries in this repo.
:rtype: tuple(Installable)
"""
# noinspection PyTypeChecker
return tuple(
[m for m in self.available_modules
if m.type == InstallableType.SHARED_LIBRARY]
)
def to_json(self):
return {
"url": self.url,
"name": self.name,
"branch": self.branch,
"folder_path": self.folder_path.relative_to(Path.cwd()).parts,
"available_modules": [m.to_json() for m in self.available_modules]
}
@classmethod
def from_json(cls, data):
# noinspection PyTypeChecker
return Repo(data['name'], data['url'], data['branch'],
Path.cwd() / Path(*data['folder_path']),
tuple([Installable.from_json(m) for m in data['available_modules']]))
class RepoManager:
def __init__(self, downloader_config: Config):
self.downloader_config = downloader_config
self._repos = {}
loop = asyncio.get_event_loop()
loop.run_until_complete(self._load_repos(set=True)) # str_name: Repo
@property
def repos_folder(self) -> Path:
data_folder = data_manager.cog_data_path(self)
return data_folder / 'repos'
def does_repo_exist(self, name: str) -> bool:
return name in self._repos
@staticmethod
def validate_and_normalize_repo_name(name: str) -> str:
if not name.isidentifier():
raise InvalidRepoName("Not a valid Python variable name.")
return name.lower()
async def add_repo(self, url: str, name: str, branch: str="master") -> Repo:
"""
Adds a repo and clones it.
:param url: URL of git repo to clone.
:param name: Internal name of repo.
:param branch: Branch to clone.
:return: New repo object representing cloned repo.
:rtype: Repo
"""
name = self.validate_and_normalize_repo_name(name)
if self.does_repo_exist(name):
raise InvalidRepoName(
"That repo name you provided already exists."
" Please choose another."
)
# noinspection PyTypeChecker
r = Repo(url=url, name=name, branch=branch,
folder_path=self.repos_folder / name)
await r.clone()
self._repos[name] = r
await self._save_repos()
return r
def get_repo(self, name: str) -> Union[Repo, None]:
"""
Returns a repo object with the given name.
:param name: Repo name
:return: Repo object or ``None`` if repo does not exist.
:rtype: Union[Repo, None]
"""
return self._repos.get(name, None)
def get_all_repo_names(self) -> Tuple[str]:
"""
Returns a tuple of all repo names
:rtype: tuple(str)
"""
# noinspection PyTypeChecker
return tuple(self._repos.keys())
async def delete_repo(self, name: str):
"""
Deletes a repo and its folders with the given name.
:param name: Name of the repo to delete.
:raises MissingGitRepo: If the repo does not exist.
"""
repo = self.get_repo(name)
if repo is None:
raise MissingGitRepo("There is no repo with the name {}".format(name))
shutil.rmtree(str(repo.folder_path))
try:
del self._repos[name]
except KeyError:
pass
await self._save_repos()
async def update_all_repos(self) -> MutableMapping[Repo, Tuple[str, str]]:
"""
Calls :py:meth:`Repo.update` on all repos.
:return:
A mapping of :py:class:`Repo` objects that received new commits to a tuple containing old and
new commit hashes.
"""
ret = {}
for _, repo in self._repos.items():
old, new = await repo.update()
if old != new:
ret[repo] = (old, new)
await self._save_repos()
return ret
async def _load_repos(self, set=False) -> MutableMapping[str, Repo]:
ret = {
name: Repo.from_json(data) for name, data in
(await self.downloader_config.repos()).items()
}
if set:
self._repos = ret
return ret
async def _save_repos(self):
repo_json_info = {name: r.to_json() for name, r in self._repos.items()}
await self.downloader_config.repos.set(repo_json_info)