mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 11:18:54 -05:00
416 lines
13 KiB
Python
416 lines
13 KiB
Python
import asyncio
|
|
import pathlib
|
|
from collections import namedtuple
|
|
from typing import Any, NamedTuple
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from pytest_mock import MockFixture
|
|
|
|
from redbot.pytest.downloader import *
|
|
|
|
from redbot.cogs.downloader.repo_manager import Installable
|
|
from redbot.cogs.downloader.repo_manager import Candidate, ProcessFormatter, RepoManager, Repo
|
|
from redbot.cogs.downloader.errors import (
|
|
AmbiguousRevision,
|
|
ExistingGitRepo,
|
|
GitException,
|
|
UnknownRevision,
|
|
)
|
|
|
|
|
|
class FakeCompletedProcess(NamedTuple):
|
|
returncode: int
|
|
stdout: bytes = b""
|
|
stderr: bytes = b""
|
|
|
|
|
|
def _mock_run(
|
|
mocker: MockFixture, repo: Repo, returncode: int, stdout: bytes = b"", stderr: bytes = b""
|
|
):
|
|
return mocker.patch.object(
|
|
repo, "_run", autospec=True, return_value=FakeCompletedProcess(returncode, stdout, stderr)
|
|
)
|
|
|
|
|
|
def _mock_setup_repo(mocker: MockFixture, repo: Repo, commit: str):
|
|
def update_commit(*args, **kwargs):
|
|
repo.commit = commit
|
|
return mocker.DEFAULT
|
|
|
|
return mocker.patch.object(
|
|
repo, "_setup_repo", autospec=True, side_effect=update_commit, return_value=None
|
|
)
|
|
|
|
|
|
def test_existing_git_repo(tmp_path):
|
|
repo_folder = tmp_path / "repos" / "squid" / ".git"
|
|
repo_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
r = Repo(
|
|
url="https://github.com/tekulvw/Squid-Plugins",
|
|
name="squid",
|
|
branch="rewrite_cogs",
|
|
commit="6acb5decbb717932e5dc0cda7fca0eff452c47dd",
|
|
folder_path=repo_folder.parent,
|
|
)
|
|
|
|
exists, git_path = r._existing_git_repo()
|
|
|
|
assert exists is True
|
|
assert git_path == repo_folder
|
|
|
|
|
|
ancestor_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
descendant_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"maybe_ancestor_rev,descendant_rev,returncode,expected",
|
|
[(ancestor_rev, descendant_rev, 0, True), (descendant_rev, ancestor_rev, 1, False)],
|
|
)
|
|
async def test_is_ancestor(mocker, repo, maybe_ancestor_rev, descendant_rev, returncode, expected):
|
|
m = _mock_run(mocker, repo, returncode)
|
|
ret = await repo.is_ancestor(maybe_ancestor_rev, descendant_rev)
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(
|
|
repo.GIT_IS_ANCESTOR,
|
|
path=repo.folder_path,
|
|
maybe_ancestor_rev=maybe_ancestor_rev,
|
|
descendant_rev=descendant_rev,
|
|
),
|
|
valid_exit_codes=(0, 1),
|
|
debug_only=True,
|
|
)
|
|
assert ret is expected
|
|
|
|
|
|
async def test_is_ancestor_object_raise(mocker, repo):
|
|
m = _mock_run(mocker, repo, 128, b"", b"fatal: Not a valid object name invalid1")
|
|
with pytest.raises(UnknownRevision):
|
|
await repo.is_ancestor("invalid1", "invalid2")
|
|
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(
|
|
repo.GIT_IS_ANCESTOR,
|
|
path=repo.folder_path,
|
|
maybe_ancestor_rev="invalid1",
|
|
descendant_rev="invalid2",
|
|
),
|
|
valid_exit_codes=(0, 1),
|
|
debug_only=True,
|
|
)
|
|
|
|
|
|
async def test_is_ancestor_commit_raise(mocker, repo):
|
|
m = _mock_run(
|
|
mocker,
|
|
repo,
|
|
128,
|
|
b"",
|
|
b"fatal: Not a valid commit name 0123456789abcde0123456789abcde0123456789",
|
|
)
|
|
with pytest.raises(UnknownRevision):
|
|
await repo.is_ancestor(
|
|
"0123456789abcde0123456789abcde0123456789", "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
)
|
|
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(
|
|
repo.GIT_IS_ANCESTOR,
|
|
path=repo.folder_path,
|
|
maybe_ancestor_rev="0123456789abcde0123456789abcde0123456789",
|
|
descendant_rev="c950fc05a540dd76b944719c2a3302da2e2f3090",
|
|
),
|
|
valid_exit_codes=(0, 1),
|
|
debug_only=True,
|
|
)
|
|
|
|
|
|
async def test_get_file_update_statuses(mocker, repo):
|
|
old_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
new_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
|
|
m = _mock_run(
|
|
mocker,
|
|
repo,
|
|
0,
|
|
b"A\x00added_file.txt\x00\t"
|
|
b"M\x00mycog/__init__.py\x00\t"
|
|
b"D\x00sample_file1.txt\x00\t"
|
|
b"D\x00sample_file2.txt\x00\t"
|
|
b"A\x00sample_file3.txt",
|
|
)
|
|
ret = await repo._get_file_update_statuses(old_rev, new_rev)
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(
|
|
repo.GIT_DIFF_FILE_STATUS, path=repo.folder_path, old_rev=old_rev, new_rev=new_rev
|
|
)
|
|
)
|
|
|
|
assert ret == {
|
|
"added_file.txt": "A",
|
|
"mycog/__init__.py": "M",
|
|
"sample_file1.txt": "D",
|
|
"sample_file2.txt": "D",
|
|
"sample_file3.txt": "A",
|
|
}
|
|
|
|
|
|
async def test_is_module_modified(mocker, repo):
|
|
old_rev = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
new_rev = "fb99eb7d2d5bed514efc98fe6686b368f8425745"
|
|
FakeInstallable = namedtuple("Installable", "name commit")
|
|
module = FakeInstallable("mycog", new_rev)
|
|
m = mocker.patch.object(
|
|
repo,
|
|
"_get_file_update_statuses",
|
|
autospec=True,
|
|
return_value={
|
|
"added_file.txt": "A",
|
|
"mycog/__init__.py": "M",
|
|
"sample_file1.txt": "D",
|
|
"sample_file2.txt": "D",
|
|
"sample_file3.txt": "A",
|
|
},
|
|
)
|
|
ret = await repo._is_module_modified(module, old_rev)
|
|
m.assert_called_once_with(old_rev, new_rev)
|
|
|
|
assert ret is True
|
|
|
|
|
|
async def test_get_full_sha1_success(mocker, repo):
|
|
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
m = _mock_run(mocker, repo, 0, commit.encode())
|
|
ret = await repo.get_full_sha1(commit)
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev=commit)
|
|
)
|
|
|
|
assert ret == commit
|
|
|
|
|
|
async def test_get_full_sha1_notfound(mocker, repo):
|
|
m = _mock_run(mocker, repo, 128, b"", b"fatal: Needed a single revision")
|
|
with pytest.raises(UnknownRevision):
|
|
await repo.get_full_sha1("invalid")
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev="invalid")
|
|
)
|
|
|
|
|
|
async def test_get_full_sha1_ambiguous(mocker, repo):
|
|
m = _mock_run(
|
|
mocker,
|
|
repo,
|
|
128,
|
|
b"",
|
|
b"error: short SHA1 c6f0 is ambiguous\n"
|
|
b"hint: The candidates are:\n"
|
|
b"hint: c6f028f tag ambiguous_tag_66387\n"
|
|
b"hint: c6f0e5e commit 2019-10-24 - Commit ambiguous with tag.\n"
|
|
b"fatal: Needed a single revision",
|
|
)
|
|
with pytest.raises(AmbiguousRevision) as exc_info:
|
|
await repo.get_full_sha1("c6f0")
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(repo.GIT_GET_FULL_SHA1, path=repo.folder_path, rev="c6f0")
|
|
)
|
|
|
|
assert exc_info.value.candidates == [
|
|
Candidate("c6f028f", "tag", "ambiguous_tag_66387"),
|
|
Candidate("c6f0e5e", "commit", "2019-10-24 - Commit ambiguous with tag."),
|
|
]
|
|
|
|
|
|
def test_update_available_modules(repo):
|
|
module = repo.folder_path / "mycog" / "__init__.py"
|
|
submodule = module.parent / "submodule" / "__init__.py"
|
|
module.parent.mkdir(parents=True)
|
|
module.touch()
|
|
submodule.parent.mkdir()
|
|
submodule.touch()
|
|
ret = repo._update_available_modules()
|
|
assert (
|
|
ret
|
|
== repo.available_modules
|
|
== (Installable(location=module.parent, repo=repo, commit=repo.commit),)
|
|
)
|
|
|
|
|
|
async def test_checkout(mocker, repo):
|
|
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
m = _mock_run(mocker, repo, 0)
|
|
_mock_setup_repo(mocker, repo, commit)
|
|
git_path = repo.folder_path / ".git"
|
|
git_path.mkdir()
|
|
await repo._checkout(commit)
|
|
|
|
assert repo.commit == commit
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(repo.GIT_CHECKOUT, path=repo.folder_path, rev=commit)
|
|
)
|
|
|
|
|
|
async def test_checkout_ctx_manager(mocker, repo):
|
|
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
m = mocker.patch.object(repo, "_checkout", autospec=True, return_value=None)
|
|
old_commit = repo.commit
|
|
async with repo.checkout(commit):
|
|
m.assert_called_with(commit, force_checkout=False)
|
|
m.return_value = None
|
|
|
|
m.assert_called_with(old_commit, force_checkout=False)
|
|
|
|
|
|
async def test_checkout_await(mocker, repo):
|
|
commit = "c950fc05a540dd76b944719c2a3302da2e2f3090"
|
|
m = mocker.patch.object(repo, "_checkout", autospec=True, return_value=None)
|
|
await repo.checkout(commit)
|
|
|
|
m.assert_called_once_with(commit, force_checkout=False)
|
|
|
|
|
|
async def test_clone_with_branch(mocker, repo):
|
|
branch = repo.branch = "dont_add_commits"
|
|
commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
|
|
repo.commit = ""
|
|
m = _mock_run(mocker, repo, 0)
|
|
_mock_setup_repo(mocker, repo, commit)
|
|
|
|
await repo.clone()
|
|
|
|
assert repo.commit == commit
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(
|
|
repo.GIT_CLONE, branch=branch, url=repo.url, folder=repo.folder_path
|
|
)
|
|
)
|
|
|
|
|
|
async def test_clone_without_branch(mocker, repo):
|
|
branch = "dont_add_commits"
|
|
commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
|
|
repo.branch = None
|
|
repo.commit = ""
|
|
m = _mock_run(mocker, repo, 0)
|
|
_mock_setup_repo(mocker, repo, commit)
|
|
mocker.patch.object(repo, "current_branch", autospec=True, return_value=branch)
|
|
|
|
await repo.clone()
|
|
|
|
assert repo.commit == commit
|
|
m.assert_called_once_with(
|
|
ProcessFormatter().format(repo.GIT_CLONE_NO_BRANCH, url=repo.url, folder=repo.folder_path)
|
|
)
|
|
|
|
|
|
async def test_update(mocker, repo):
|
|
old_commit = repo.commit
|
|
new_commit = "a0ccc2390883c85a361f5a90c72e1b07958939fa"
|
|
m = _mock_run(mocker, repo, 0)
|
|
_mock_setup_repo(mocker, repo, new_commit)
|
|
mocker.patch.object(repo, "latest_commit", autospec=True, return_value=old_commit)
|
|
mocker.patch.object(repo, "hard_reset", autospec=True, return_value=None)
|
|
ret = await repo.update()
|
|
|
|
assert ret == (old_commit, new_commit)
|
|
m.assert_called_once_with(ProcessFormatter().format(repo.GIT_PULL, path=repo.folder_path))
|
|
|
|
|
|
# old tests
|
|
|
|
|
|
async def test_add_repo(monkeypatch, repo_manager):
|
|
monkeypatch.setattr("redbot.cogs.downloader.repo_manager.Repo._run", fake_run_noprint)
|
|
monkeypatch.setattr(
|
|
"redbot.cogs.downloader.repo_manager.Repo.current_commit", fake_current_commit
|
|
)
|
|
|
|
squid = await repo_manager.add_repo(
|
|
url="https://github.com/tekulvw/Squid-Plugins", name="squid", branch="rewrite_cogs"
|
|
)
|
|
|
|
assert squid.available_modules == ()
|
|
|
|
|
|
async def test_lib_install_requirements(monkeypatch, library_installable, repo, tmpdir):
|
|
monkeypatch.setattr("redbot.cogs.downloader.repo_manager.Repo._run", fake_run_noprint)
|
|
monkeypatch.setattr(
|
|
"redbot.cogs.downloader.repo_manager.Repo.available_libraries", (library_installable,)
|
|
)
|
|
|
|
lib_path = Path(str(tmpdir)) / "cog_data_path" / "lib"
|
|
sharedlib_path = lib_path / "cog_shared"
|
|
sharedlib_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
installed, failed = await repo.install_libraries(
|
|
target_dir=sharedlib_path, req_target_dir=lib_path
|
|
)
|
|
|
|
assert len(installed) == 1
|
|
assert len(failed) == 0
|
|
|
|
|
|
async def test_remove_repo(monkeypatch, repo_manager):
|
|
monkeypatch.setattr("redbot.cogs.downloader.repo_manager.Repo._run", fake_run_noprint)
|
|
monkeypatch.setattr(
|
|
"redbot.cogs.downloader.repo_manager.Repo.current_commit", fake_current_commit
|
|
)
|
|
|
|
await repo_manager.add_repo(
|
|
url="https://github.com/tekulvw/Squid-Plugins", name="squid", branch="rewrite_cogs"
|
|
)
|
|
assert repo_manager.get_repo("squid") is not None
|
|
await repo_manager.delete_repo("squid")
|
|
assert repo_manager.get_repo("squid") is None
|
|
|
|
|
|
async def test_existing_repo(mocker, repo_manager):
|
|
repo_manager.does_repo_exist = mocker.MagicMock(return_value=True)
|
|
|
|
with pytest.raises(ExistingGitRepo):
|
|
await repo_manager.add_repo("http://test.com", "test")
|
|
|
|
repo_manager.does_repo_exist.assert_called_once_with("test")
|
|
|
|
|
|
def test_tree_url_parse(repo_manager):
|
|
cases = [
|
|
{
|
|
"input": ("https://github.com/Tobotimus/Tobo-Cogs", None),
|
|
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", None),
|
|
},
|
|
{
|
|
"input": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
|
|
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
|
|
},
|
|
{
|
|
"input": ("https://github.com/Tobotimus/Tobo-Cogs/tree/V3", None),
|
|
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V3"),
|
|
},
|
|
{
|
|
"input": ("https://github.com/Tobotimus/Tobo-Cogs/tree/V3", "V4"),
|
|
"expected": ("https://github.com/Tobotimus/Tobo-Cogs", "V4"),
|
|
},
|
|
]
|
|
|
|
for test_case in cases:
|
|
assert test_case["expected"] == repo_manager._parse_url(*test_case["input"])
|
|
|
|
|
|
def test_tree_url_non_github(repo_manager):
|
|
cases = [
|
|
{
|
|
"input": ("https://gitlab.com/Tobotimus/Tobo-Cogs", None),
|
|
"expected": ("https://gitlab.com/Tobotimus/Tobo-Cogs", None),
|
|
},
|
|
{
|
|
"input": ("https://my.usgs.gov/bitbucket/scm/Tobotimus/Tobo-Cogs", "V3"),
|
|
"expected": ("https://my.usgs.gov/bitbucket/scm/Tobotimus/Tobo-Cogs", "V3"),
|
|
},
|
|
]
|
|
|
|
for test_case in cases:
|
|
assert test_case["expected"] == repo_manager._parse_url(*test_case["input"])
|