Files
Red-DiscordBot/redbot/_update/updater.py
T
2026-05-13 14:14:43 -08:00

737 lines
30 KiB
Python

import asyncio
import dataclasses
import json
import os
import shutil
import sys
import tarfile
import tempfile
from pathlib import Path
from typing import Any, Dict, List, NoReturn, Optional, Set
import click
from packaging.version import Version
from python_discovery import PythonInfo
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Confirm, IntPrompt, Prompt
from rich.text import Text
from typing_extensions import Self
from redbot.core.utils._internal_utils import (
AvailableVersion,
detailed_progress,
fetch_available_red_versions,
get_installed_extras,
)
from . import changelog, cmd, common, runner
from .cog_compatibility_checker import CompatibilitySummary
from .tui import ChangelogReaderApp, ChangelogReaderResult
@dataclasses.dataclass
class UpdaterOptions:
"""Update options specified by the user."""
instances: List[str]
excluded_instances: Set[str]
ignore_prefix: bool
backup_dir: Optional[Path]
no_backup: bool
red_version: Optional[Version]
no_major_updates: bool
no_full_changelog: bool
no_cog_compatibility_check: bool
new_python_interpreter: Optional[PythonInfo]
update_cogs: Optional[bool]
force_reinstall: bool
interactive: bool
@classmethod
def from_json_dict(cls, data: Dict[str, Any]) -> Self:
backup_dir = data["backup_dir"]
red_version = data["red_version"]
return cls(
instances=data["instances"],
excluded_instances=set(data["excluded_instances"]),
ignore_prefix=data["ignore_prefix"],
backup_dir=backup_dir and Path(data["backup_dir"]),
no_backup=data["no_backup"],
red_version=red_version and Version(red_version),
no_major_updates=data["no_major_updates"],
no_full_changelog=data["no_full_changelog"],
no_cog_compatibility_check=data["no_cog_compatibility_check"],
new_python_interpreter=(
data["new_python_interpreter"]
and PythonInfo.from_dict(data["new_python_interpreter"])
),
update_cogs=data["update_cogs"],
force_reinstall=data["force_reinstall"],
interactive=data["interactive"],
)
def to_json_dict(self) -> Dict[str, Any]:
data = dataclasses.asdict(self)
data["excluded_instances"] = list(self.excluded_instances)
data["backup_dir"] = self.backup_dir and str(self.backup_dir)
data["red_version"] = self.red_version and str(self.red_version)
data["new_python_interpreter"] = (
self.new_python_interpreter and self.new_python_interpreter.to_dict()
)
return data
@dataclasses.dataclass
class UpdaterCompatibilitySummary:
checked: Dict[str, CompatibilitySummary]
failed: List[str]
skipped: List[str]
@classmethod
def from_json_dict(cls, data: Dict[str, Any]) -> Self:
return cls(
checked={
instance_name: CompatibilitySummary.from_json_dict(results_data)
for instance_name, results_data in data["checked"].items()
},
failed=data["failed"],
skipped=data["skipped"],
)
def to_json_dict(self) -> Dict[str, Any]:
return {
"checked": {
instance_name: results.to_json_dict()
for instance_name, results in self.checked.items()
},
"failed": self.failed,
"skipped": self.skipped,
}
@dataclasses.dataclass
class BackupResults:
checked: List[str]
failed: List[str]
skipped: List[str] = dataclasses.field(default_factory=list)
@classmethod
def from_json_dict(cls, data: Dict[str, List[str]]) -> Self:
return cls(checked=data["checked"], failed=data["failed"], skipped=data["skipped"])
def to_json_dict(self) -> Dict[str, List[str]]:
return dataclasses.asdict(self)
_PYTHON_VERSION_PLACEHOLDER = Version("0.0.dev0")
@dataclasses.dataclass
class UpdaterMetadata:
"""Metadata about the update process."""
# options specified by the user
options: UpdaterOptions
# info about Red version to update to (latest available or latest non-major update)
latest: AvailableVersion
latest_major: AvailableVersion
# info about Red/Python versions that we're updating from
current_version: Version = dataclasses.field(default_factory=common.get_current_red_version)
current_python_version: Version = dataclasses.field(
default_factory=common.get_current_python_version
)
# details about the interpreter that will be used for the new venv
interpreter_info: PythonInfo = dataclasses.field(default_factory=PythonInfo.current_system)
interpreter_version: Version = _PYTHON_VERSION_PLACEHOLDER
interpreter_exe: str = ""
# changelogs for version in (current_version, latest> range
changelogs: changelog.Changelogs = dataclasses.field(default_factory=dict)
# cog compatibility check results
cog_compatibility: Optional[UpdaterCompatibilitySummary] = None
# backup info
to_backup: List[str] = dataclasses.field(default_factory=list)
backup_dir: Optional[Path] = None
backup_results: Optional[BackupResults] = None
def __post_init__(self) -> None:
if self.interpreter_version is _PYTHON_VERSION_PLACEHOLDER:
self.interpreter_version = Version(
".".join(map(str, self.interpreter_info.version_info[:3]))
)
if not self.interpreter_exe:
self.interpreter_exe = self.interpreter_info.system_executable
@classmethod
def from_json_dict(cls, data: Dict[str, Any]) -> Self:
"""
Make an instance of this class from a dictionary,
as returned by the `to_json_dict()` method.
This aims to maintain backwards compatibility with data generated by
earlier Red versions as it may be called with such data
after the last update step.
"""
backup_dir = data.get("backup_dir")
return cls(
options=UpdaterOptions.from_json_dict(data["options"]),
latest=AvailableVersion.from_json_dict(data["latest"]),
latest_major=AvailableVersion.from_json_dict(data["latest_major"]),
current_version=Version(data["current_version"]),
current_python_version=Version(data["current_python_version"]),
interpreter_version=Version(data["interpreter_version"]),
interpreter_info=PythonInfo.from_dict(data["interpreter_info"]),
interpreter_exe=data["interpreter_exe"],
changelogs={
Version(raw_version): changelog.VersionChangelog.from_json_dict(raw_changelog)
for raw_version, raw_changelog in data["changelogs"].items()
},
cog_compatibility=UpdaterCompatibilitySummary.from_json_dict(
data["cog_compatibility"]
),
to_backup=data["to_backup"],
backup_dir=backup_dir and Path(backup_dir),
backup_results=BackupResults.from_json_dict(data["backup_results"]),
)
def to_json_dict(self) -> Dict[str, Any]:
return {
"options": self.options.to_json_dict(),
"latest": self.latest.to_json_dict(),
"latest_major": self.latest_major.to_json_dict(),
"current_version": str(self.current_version),
"current_python_version": str(self.current_python_version),
"interpreter_version": str(self.interpreter_version),
"interpreter_info": self.interpreter_info.to_dict(),
"interpreter_exe": self.interpreter_exe,
"changelogs": {str(v): c.to_json_dict() for v, c in self.changelogs.items()},
"cog_compatibility": self.cog_compatibility and self.cog_compatibility.to_json_dict(),
"to_backup": self.to_backup,
"backup_dir": self.backup_dir and str(self.backup_dir),
"backup_results": self.backup_results and self.backup_results.to_json_dict(),
}
@property
def breaking_update(self) -> bool:
return self.current_version.release[:2] != self.latest.version.release[:2]
class Updater:
metadata: UpdaterMetadata
def __init__(self, options: UpdaterOptions) -> None:
self.options = options
self.console = common.get_console()
@property
def latest(self) -> AvailableVersion:
return self.metadata.latest
@property
def current_version(self) -> Version:
return self.metadata.current_version
async def run(self) -> None:
await self._prepare_metadata()
new_version_available = self.current_version < self.latest.version
if not self.options.force_reinstall and not new_version_available:
if self.current_version >= self.metadata.latest_major.version:
common.print_with_prefix_column(
common.ICON_SUCCESS,
"You are already running the latest available version of Red.",
)
else:
common.print_with_prefix_column(
common.ICON_INFO,
"There are no non-major updates available.\n",
"There is a new major version available: ",
Text(str(self.metadata.latest_major.version), style="bold"),
)
return
if new_version_available:
common.print_with_prefix_column(
common.ICON_SUCCESS,
"New version available: ",
Text(str(self.latest.version), style="bold"),
)
await self._show_changelog()
self._check_python_requires()
if self.options.no_cog_compatibility_check:
self.console.print(
"Will not make backups as --no-cog-compatibility-check option was passed."
)
else:
await self._check_cog_compatibility()
if self.options.no_backup:
common.print_with_prefix_column(
common.ICON_INFO, "Will not make backups as --no-backup option was passed."
)
else:
common.print_with_prefix_column(
common.ICON_INFO,
"The following instances will be backed up before performing the update: ",
Text(", ").join(
Text(instance_name, style="bold") for instance_name in self.metadata.to_backup
),
)
if self.metadata.breaking_update:
self.console.print(
"[b]Remember that this is a major release and it may have some breaking changes"
" that the bot or its cogs may be affected by.[/]"
)
if self.options.interactive and not Confirm.ask(
f"Do you want to continue with the update to [b]Red {self.latest.version}[/]?"
):
return
self.console.print()
if self.options.no_backup:
self.console.print("Will not make backups as --no-backup option was passed.")
else:
await self._make_backups()
await self._update_with_fresh_venv()
async def _prepare_metadata(self) -> None:
interpreter_info = self.options.new_python_interpreter or PythonInfo.current_system()
with self.console.status("Checking latest version..."):
available_versions = await fetch_available_red_versions()
latest_major = available_versions[0]
self.metadata = UpdaterMetadata(
self.options,
latest=latest_major,
latest_major=latest_major,
interpreter_info=interpreter_info,
)
if self.options.red_version:
if self.options.red_version <= self.current_version:
common.print_with_prefix_column(
common.ICON_ERROR, "You can only update to a newer version of Red."
)
raise SystemExit(2)
if (
self.options.no_major_updates
and self.options.red_version.release[:2] != self.current_version.release[:2]
):
common.print_with_prefix_column(
common.ICON_ERROR,
"Updating to the specified version would be a major update"
" but --no-major-updates option was specified.",
)
raise SystemExit(2)
for available_version in available_versions:
if available_version.version == self.options.red_version:
break
else:
common.print_with_prefix_column(
common.ICON_ERROR, "The provided version does not seem to exist."
)
raise SystemExit(2)
self.metadata.latest = available_version
elif self.options.no_major_updates:
for available_version in available_versions:
if available_version.version.release[:2] == self.current_version.release[:2]:
self.metadata.latest = available_version
break
else:
if self.current_version < latest_major.version:
common.print_with_prefix_column(
common.ICON_ERROR,
"Could not find any version of Red that would not be a major update.",
)
raise SystemExit(1)
async def _show_changelog(self) -> None:
with self.console.status("Fetching changelogs..."):
changelogs = await changelog.fetch_changelogs()
self.metadata.changelogs = changelogs = changelog.get_changelogs_between(
changelogs, self.current_version, self.latest.version
)
common.print_with_prefix_column(common.ICON_SUCCESS, "Changelogs fetched.")
if not changelogs:
return
if not self.options.interactive or self.options.no_full_changelog:
self.console.print(Panel(Markdown(changelog.render_markdown(changelogs))))
if self.options.interactive and not Confirm.ask("Do you want to continue?"):
raise click.Abort()
return
first_changelog_version = min(changelogs)
last_changelog_version = max(changelogs)
parts = []
if first_changelog_version == last_changelog_version:
parts.append(
"You will now be presented with the changelog for"
f" [b]Red {first_changelog_version}[/]."
)
else:
parts.append(
"You will now be presented with the changelogs for"
f" [b]Red {first_changelog_version}[/]-[b]{last_changelog_version}[/]."
)
parts.append(
f"\n[bold]{common.ICON_WARN}"
' Make sure to read through the [green]"Read before updating"[/] section'
f" before continuing. {common.ICON_WARN}[/bold]\n"
)
if self.metadata.breaking_update:
parts.append(
f"[bold]{common.ICON_WARN}"
" Please note that this is a major release and it may have some changes that"
" your bot or its cogs are affected by.[/bold]\n"
)
parts.append(
"After the changelog is open and you're ready to continue, hit the [b]Q[/] key"
" to close the changelog and continue the update process.\n\n"
"Hit the [b]Enter[/] key to view the changelog."
)
self.console.input(Panel("".join(parts)), password=True)
viewer = ChangelogReaderApp.from_changelogs(changelogs)
result = await viewer.run_async()
if result is None:
raise RuntimeError("Unexpected state")
if result is ChangelogReaderResult.QUIT:
raise click.Abort()
self.console.print("Changelog has been closed.\n")
def _check_python_requires(self) -> None:
if self.metadata.interpreter_version in self.latest.requires_python:
return
if self.options.new_python_interpreter:
common.print_with_prefix_column(
common.ICON_ERROR,
"The latest version of Red requires a different Python version (",
Text(str(self.latest.requires_python), style="bold"),
") from the version of the interpreter passed to with the --new-python-interpreter"
" option (",
Text(str(self.metadata.interpreter_version), style="bold"),
")",
)
raise SystemExit(1)
common.print_with_prefix_column(
common.ICON_WARN if self.options.interactive else common.ICON_ERROR,
"The latest version of Red requires a different Python version (",
Text(str(self.latest.requires_python), style="bold"),
") from the one that you are currently using (",
Text(str(self.metadata.interpreter_version), style="bold"),
")",
(
"\nredbot-update will have to recreate the virtual environment"
" with a compatible version of Python."
if self.options.interactive
else ""
),
)
if not self.options.interactive:
raise SystemExit(1)
interpreters = common.search_for_interpreters(self.latest.requires_python)
def _render_interpreter(interpreter_exe: str, interpreter_version: Version) -> Text:
return Text.assemble(
"CPython ",
(str(interpreter_version), "repr.number"),
" (",
(interpreter_exe, "log.path"),
")",
)
text = Text("Found the following compatible Python interpreters on your system:")
for idx, (interpreter_exe, interpreter_version, python_info) in enumerate(interpreters, 1):
text.append_text(Text(f"\n{idx}. ", style="markdown.item.number"))
text.append_text(_render_interpreter(interpreter_exe, interpreter_version))
self.console.print(Panel(text))
while True:
result = IntPrompt.ask(
"\nEnter the number of the Python interpreter above that you want to use"
" or type 0 to input the path to it yourself. Generally, you should choose"
" the interpreter with the latest version on the above list.\n"
"Enter your selection",
default=1,
)
if result < 0 or result > len(interpreters):
self.console.print("[prompt.invalid] This is not a valid choice.")
continue
if result == 0:
response = Prompt.ask(
"Please input the path to the Python interpreter that you want to use"
)
if not response:
self.console.print("[prompt.invalid] No path was provided.")
continue
info = PythonInfo.from_exe(response)
interpreter_version = Version(info.version_str)
if (
info.implementation != "CPython"
or interpreter_version not in self.latest.requires_python
):
self.console.print(
"[prompt.invalid] The provided path points to an incompatible Python"
" interpreter. Latest version requires CPython"
f" {self.latest.requires_python} but the provided interpreter is"
f" {info.implementation} {interpreter_version}."
)
continue
self.metadata.interpreter_version = interpreter_version
self.metadata.interpreter_info = info
self.metadata.interpreter_exe = info.executable
else:
(
self.metadata.interpreter_exe,
self.metadata.interpreter_version,
self.metadata.interpreter_info,
) = interpreters[result - 1]
self.console.print(
"\n[b]You selected:[/]",
_render_interpreter(
self.metadata.interpreter_exe, self.metadata.interpreter_version
),
)
if Confirm.ask("Do you want to continue with this choice?"):
self.console.print()
break
async def _check_cog_compatibility(self) -> None:
outputs = {}
checked_instances = {}
skipped_instances = []
failed_instances = []
unsupported_storage_instances = []
for instance_name in self.options.instances:
if instance_name in self.options.excluded_instances:
skipped_instances.append(instance_name)
continue
exit_code, stdout, results = await cmd.cog_compatibility.call(
instance_name,
red_version=self.latest.version,
python_version=self.metadata.interpreter_version,
ignore_prefix=self.options.ignore_prefix,
return_results=True,
stdout=asyncio.subprocess.PIPE,
)
if exit_code == cmd.cog_compatibility.EXIT_INSTANCE_BACKEND_UNSUPPORTED:
skipped_instances.append(instance_name)
unsupported_storage_instances.append(instance_name)
elif exit_code == cmd.cog_compatibility.EXIT_INSTANCE_SITE_PREFIX_MISMATCH:
skipped_instances.append(instance_name)
elif exit_code:
failed_instances.append(instance_name)
print(stdout, end="")
Text.assemble(
"\N{UPWARDS ARROW} " * 3,
"Failure for ",
(instance_name, "bold"),
" instance",
)
self.console.rule(
Text.assemble(
"\N{UPWARDS ARROW} " * 3,
"Failure for ",
(instance_name, "bold"),
" instance above",
" \N{UPWARDS ARROW}" * 3,
),
style="red",
)
else:
assert results is not None
outputs[instance_name] = stdout
checked_instances[instance_name] = results
if stdout:
self.console.print()
self.console.print()
if not self.options.no_backup:
self.metadata.to_backup = [*checked_instances, *failed_instances]
if outputs:
for instance_name, stdout in outputs.items():
self.console.rule(Text(instance_name, style="bold"))
print(stdout, end="")
self.console.rule()
common.print_with_prefix_column(
common.ICON_INFO,
"Finished checking cog compatibility.",
(
"\nThe results for each of the checked instances are shown above."
if checked_instances
else ""
),
)
if failed_instances:
common.print_with_prefix_column(
common.ICON_ERROR,
"Failure occurred while trying to check compatibility for following instances: ",
Text(", ").join(
Text(instance_name, style="bold") for instance_name in failed_instances
),
"\nScroll above to find the errors.",
)
if unsupported_storage_instances:
common.print_with_prefix_column(
common.ICON_INFO,
"The following instances were skipped as they use a storage backend that is"
" not supported by the current Red installation (some requirements are missing): ",
Text(", ").join(
Text(instance_name, style="bold")
for instance_name in unsupported_storage_instances
),
)
if not checked_instances:
common.print_with_prefix_column(
common.ICON_INFO,
"There were no",
(" other" if failed_instances or unsupported_storage_instances else ""),
" instances to check cog compatibility for.",
)
self.console.print()
self.metadata.cog_compatibility = UpdaterCompatibilitySummary(
checked=checked_instances, failed=failed_instances, skipped=skipped_instances
)
async def _make_backups(self) -> None:
self.metadata.backup_dir = backup_dir = self.options.backup_dir or Path(
tempfile.mkdtemp(prefix="redbot-update-backup-")
)
console = common.get_console()
console.print("Backups will be created at:", Text(str(backup_dir), style="bold"))
venv_archive = backup_dir / "venv.tar.gz"
with console.status("Making a backup of the virtual environment directory..."):
venv_dir = Path(sys.prefix)
venv_files = []
for current_dir, _, filenames in os.walk(venv_dir):
target_dir = os.path.relpath(current_dir, venv_dir)
if target_dir == ".":
target_dir = ""
for name in filenames:
venv_files.append(
(os.path.join(current_dir, name), os.path.join(target_dir, name))
)
with tarfile.open(venv_archive, "w:gz", compresslevel=6) as tar:
with detailed_progress(unit="files") as progress:
for src, arcname in progress.track(venv_files, description="Compressing..."):
tar.add(src, arcname=arcname, recursive=False)
console.print(
"Created a backup of the virtual environment directory at:",
Text(str(venv_archive), style="bold"),
)
checked = []
failed = []
instance_backups_dir = backup_dir / "instance_backups"
instance_backups_dir.mkdir()
for instance_name in self.metadata.to_backup:
console.print(
"Making a backup of the", Text(instance_name, style="bold"), "instance..."
)
debug_args = (cmd.arg_names.DEBUG,) * common.get_log_cli_level()
proc = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"redbot.setup",
"backup",
*debug_args,
instance_name,
str(instance_backups_dir),
)
if await proc.wait():
failed.append(instance_name)
else:
checked.append(instance_name)
self.metadata.backup_results = BackupResults(checked=checked, failed=failed)
if self.metadata.cog_compatibility:
self.metadata.backup_results.skipped.extend(self.metadata.cog_compatibility.skipped)
if failed:
common.print_with_prefix_column(
common.ICON_ERROR,
"The following instances failed during backup: ",
Text(", ").join(Text(instance_name, style="bold") for instance_name in failed),
"\nScroll above to find the errors.",
)
# If a backup fails, we cannot allow non-interactive update to continue.
# The user can choose to use options such as `--no-backup`, `--instance`,
# and `--exclude-instance` to not have the backup step try to backup something
# that it can't.
if not self.options.interactive or not Confirm.ask(
"Do you want to continue with the update regardless?"
):
raise SystemExit(1)
async def _update_with_fresh_venv(self) -> NoReturn:
console = common.get_console()
venv_dir = Path(sys.prefix)
backup_dir = venv_dir / common.OLD_VENV_BACKUP_DIR_NAME
try:
backup_dir.mkdir()
except FileExistsError:
console.print(
"Found that a partial backup of a virtual environment from a past failed update"
" exists at",
Text(str(backup_dir), style="bold"),
"\nThe update will not proceed to avoid overriding it. If you are certain that"
" you don't need to restore anything from it, remove it and try updating again.",
)
raise SystemExit(1)
with console.status("Determining extras to install..."):
try:
metadata = await self.latest.fetch_core_metadata()
except TypeError:
extras = get_installed_extras()
else:
known_extras = metadata.provides_extra or []
extras = [extra for extra in get_installed_extras() if extra in known_extras]
console.print("Extras to install have been determined.")
old_executable = Path(sys.executable)
rel_executable = old_executable.relative_to(venv_dir)
new_executable = backup_dir / rel_executable
wrapper_exe = runner.get_wrapper_executable()
with console.status("Moving old virtual environment..."):
for path in venv_dir.iterdir():
if path == backup_dir or path == wrapper_exe:
continue
path.rename(backup_dir / path.name)
console.print("Old virtual environment moved.")
with tempfile.NamedTemporaryFile(
"w", encoding="utf-8", prefix="redbot-update-metadata-", suffix=".json", delete=False
) as metadata_file:
json.dump(self.metadata.to_json_dict(), metadata_file)
console.print()
runner.make_exec_request(
str(new_executable),
"reinstall",
# base executable for venv creation
self.metadata.interpreter_exe,
# venv dir
str(venv_dir),
# scripts path
self.metadata.interpreter_info.sysconfig_path("scripts", {"base": str(venv_dir)}),
# Red dependency specifier
common.get_red_dependency_specifier(self.latest.version, extras),
set_env_vars={common.INTERNAL_UPDATER_METADATA_ENV_VAR: metadata_file.name},
)
def get_updater_metadata() -> UpdaterMetadata:
with open(os.environ[common.INTERNAL_UPDATER_METADATA_ENV_VAR], encoding="utf-8") as fp:
return UpdaterMetadata.from_json_dict(json.load(fp))