From a234fc1e028c45e66ec7ee13bafd7354834bcf86 Mon Sep 17 00:00:00 2001 From: Jakub Kuczys Date: Wed, 13 May 2026 21:18:17 +0200 Subject: [PATCH] Add `redbot-setup restore` cli command (#6709) --- docs/backup_red.rst | 146 ++++++-- redbot/core/_downloader/__init__.py | 156 +++++++++ redbot/core/utils/_internal_utils.py | 128 +++++-- redbot/setup.py | 483 ++++++++++++++++++++++++++- 4 files changed, 866 insertions(+), 47 deletions(-) diff --git a/docs/backup_red.rst b/docs/backup_red.rst index 4003b29b0..87cb4f7dc 100644 --- a/docs/backup_red.rst +++ b/docs/backup_red.rst @@ -4,32 +4,140 @@ Backing Up and Restoring Red ============================ -Red can be backed up and restored to any device as long as it is a supported operating system. See page: :ref:`end-user-guarantees`. +Red can be backed up and restored to any system as long as it is a supported per our `end-user-guarantees`. +The system it's restored to can be different from the system that was backed up. -Backup steps are to be done in order and carefully to avoid any issues. +.. note:: + + Some 3rd-party cogs may not support all systems that Core Red supports and such cogs may therefore not work, + if restored to an unsupported system. This does not affect cogs that do not impose additional restrictions. + +.. contents:: + :local: + :depth: 2 + +Creating backups +**************** + +Windows +------- + +To make a backup, perform the following steps: -#. Take note of the installed cogs with ``[p]cogs``; and cog repositories with ``[p]load downloader``, then ``[p]repo list`` (``[p]`` is your bot's prefix). #. Stop the bot, ideally with ``[p]shutdown``. -#. Activate your venv, and run ``redbot-setup backup ``, replacing ```` with the name of your instance. -#. Copy your backup file to the new machine/location. -#. Extract the file to a location of your choice (remember the full path and make sure that the user you are going to install/run Red under can access this path). -#. :ref:`Install Red ` as normal on the new machine/location. -#. Run ``redbot-setup`` in your venv to create a new instance, using the path you remembered above as your data path. -#. Start your new instance. -#. Re-add the cog repositories using the same names as before. -#. Do ``[p]cog update``. -#. Re-add any cogs that were not re-installed (you may have to uninstall them first as Downloader may think they are still installed). +#. Activate your venv. - .. note:: + .. prompt:: batch - The config (data) from cogs has been saved, but not the code itself. + "%userprofile%\redenv\Scripts\activate.bat" +#. Backup your Red instance with the following command: - .. tip:: + .. prompt:: batch + :prompts: (redenv) C:\\> - You can fix permissions (if needed) on your directory using: + redbot-setup backup - .. code-block:: bash + .. attention:: - sudo chown -R : ~/.local + Replace ```` with the name of the instance you want to backup. +#. The command will create a backup file for you and show you the path to it. - Replace ```` with your actual username. \ No newline at end of file +.. tip:: + + If you want to backup your instance to a custom folder, + you can run the ``redbot-setup backup`` command as shown below, + replacing ``C:\path\to\backup\folder`` with the path to the folder that + you want to backup your instance to: + + .. prompt:: batch + :prompts: (redenv) C:\\> + + redbot-setup backup C:\path\to\backup\folder + +Linux & Mac +----------- + +To make a backup, perform the following steps: + +#. Stop the bot, ideally with ``[p]shutdown``. +#. Activate your venv. + + .. prompt:: bash + + source ~/redenv/bin/activate +#. Backup your Red instance with the following command: + + .. prompt:: bash + :prompts: (redenv) $ + + redbot-setup backup + + .. attention:: + + Replace ```` with the name of the instance you want to backup. +#. The command will create a backup file for you and show you the path to it. + +.. tip:: + + If you want to backup your instance to a custom folder, + you can run the ``redbot-setup backup`` command as shown below, + replacing ``/path/to/backup/folder`` with the path to the folder that + you want to backup your instance to: + + .. prompt:: bash + :prompts: (redenv) $ + + redbot-setup backup /path/to/backup/folder + +Restoring backups +***************** + +Windows +------- + +To restore a backup, perform the following steps: + +#. `Install Red ` on the new machine/location, skipping the ``redbot-setup`` step. +#. Activate your venv. + + .. prompt:: batch + + "%userprofile%\redenv\Scripts\activate.bat" +#. Restore your Red instance with the following command: + + .. prompt:: batch + :prompts: (redenv) C:\\> + + redbot-setup restore C:\path\to\backup\file.tar.gz + + .. attention:: + + Replace ``C:\path\to\backup\file.tar.gz`` with the path to the backup file + that you want to restore from. + +#. The command will guide you through the restore process. + +Linux & Mac +----------- + +To restore a backup, perform the following steps: + +#. `Install Red ` on the new machine/location, skipping the ``redbot-setup`` step. +#. Activate your venv. + + .. prompt:: bash + + source ~/redenv/bin/activate +#. Restore your Red instance with the following command: + + .. prompt:: bash + :prompts: (redenv) $ + + redbot-setup restore /path/to/backup/file.tar.gz + + .. attention:: + + Replace ``/path/to/backup/file.tar.gz`` with the path to the backup file + that you want to restore from. + +#. The command will guide you through the restore process. diff --git a/redbot/core/_downloader/__init__.py b/redbot/core/_downloader/__init__.py index 71129e710..fecb20bbd 100644 --- a/redbot/core/_downloader/__init__.py +++ b/redbot/core/_downloader/__init__.py @@ -12,6 +12,7 @@ from __future__ import annotations import contextlib import dataclasses +import json import os import shutil import sys @@ -37,6 +38,7 @@ import discord from redbot.core import commands, Config, version_info as red_version_info from redbot.core._cog_manager import CogManager from redbot.core.data_manager import cog_data_path +from redbot.core.utils._internal_utils import detailed_progress from . import errors from .log import log @@ -864,3 +866,157 @@ class CogUnavailableError(Exception): self.repo_name = repo_name self.cog_name = cog_name super().__init__(f"Couldn't find cog {cog_name!r} in {repo_name!r}") + + +async def _restore_from_backup() -> None: + """Restore cogs using `repos.json` in cog's data path. + + Used by `redbot-setup restore` cli command. + """ + with _repo_manager.repos_folder.with_name("repos.json").open(encoding="utf-8") as fp: + raw_repos = json.load(fp) + + with detailed_progress(unit="repos") as progress: + task_id = progress.add_task("Adding repos") + for repo_data in progress.track(raw_repos, task_id=task_id): + repo_url = repo_data["url"] + repo_name = repo_data["name"] + repo_branch = repo_data["branch"] + progress.update(task_id, description=f"Adding {repo_name!r} repo") + try: + await _repo_manager.add_repo(repo_url, repo_name, repo_branch) + except errors.ExistingGitRepo: + # this should not be possible + log.error( + "Failed to add repo %r (url: %r branch: %r) as it seems that" + " one with this name already exists.", + repo_name, + repo_url, + repo_branch, + ) + except errors.AuthenticationError: + log.error( + "Failed to add repo %r (url: %r branch: %r) due to authentication failure." + " This may also mean that repository no longer exists at that URL.", + repo_name, + repo_url, + repo_branch, + ) + except (errors.CloningError, OSError): + log.error( + "Failed to add repo %r (url: %r branch: %r) due to a cloning error.", + repo_name, + repo_url, + repo_branch, + ) + + _installed_cogs = await installed_cogs() + await _config.installed_cogs.clear() + await _config.installed_libraries.clear() + + cogs_to_reinstall = [] + cogs_without_repo: Dict[str, List[str]] = defaultdict(list) + for cog in _installed_cogs: + if cog.repo is None: + cogs_without_repo[cog._json_repo_name].append(cog.name) + else: + cogs_to_reinstall.append(cog) + for repo_name, cogs in cogs_without_repo.items(): + log.error( + "The backup does not contain metadata about repo %r," + " the following cogs cannot be reinstalled automatically: %s", + repo_name, + ", ".join(cogs), + ) + + with detailed_progress(unit="cogs") as progress: + task_id = progress.add_task("Installing cogs") + cog: Installable + for cog in progress.track(cogs_to_reinstall, task_id=task_id): + progress.update(task_id, description=f"Installing {cog.name!r} cog") + if not cog.commit: + last_cog_occurrence = await cog.repo.get_last_module_occurrence(cog.name) + if last_cog_occurrence is not None and not last_cog_occurrence.disabled: + cog = last_cog_occurrence + log.warning( + "The commit that %r cog was installed from is unknown" + " - will try to reinstall from latest commit where it's still available." + ) + else: + log.error( + "The commit that %r cog was installed from is unknown" + " and it could not be found in the repo." + ) + continue + + try: + install_result = await install_cogs(cog.repo, cog.commit, [cog.name]) + except errors.UnknownRevision: + log.error( + "The commit that %r cog was installed from (%s)" + " could not be found in the repo.", + cog.name, + cog.commit, + ) + continue + + # we know we've only tried to install one cog so we can be specific in error messages + + # below 3 should technically never happen with valid config but just in case... + if install_result.unavailable_cogs: + log.error("Could not find %r cog in %r repo", cog.name, cog.repo.name) + if install_result.already_installed: + log.error( + "Failed to reinstall %r cog from %r repo as it seems that" + " this cog is already installed.", + cog.name, + cog.repo.name, + ) + if install_result.name_already_used: + log.error( + "Failed to reinstall %r cog from %r repo as it seems that" + " a cog with the same name is already installed from a different repo.", + cog.name, + cog.repo.name, + ) + + if install_result.incompatible_python_version: + log.error( + "Failed to reinstall %r cog from %r repo because the instance is" + " being restored into Red running with a lower Python version." + " The minimum Python version required by this cog is %s", + cog.name, + cog.repo.name, + ".".join(map(str, cog.min_python_version)), + ) + if install_result.incompatible_bot_version: + log.error( + "Failed to reinstall %r cog from %r repo because the instance is" + " being restored into a Red version that the cog does not support." + " The minimum version required by this cog is %s%s.", + cog.name, + cog.repo.name, + cog.min_bot_version, + ( + "" + if cog.min_bot_version > cog.max_bot_version + else f"and maximum allowed is: {cog.max_bot_version}" + ), + ) + + if install_result.failed_cogs: + log.info("Failed to reinstall %r cog", cog.name) + if install_result.failed_reqs: + log.error( + "Failed to reinstall %r cog from %r repo" + " because the following requirements could not be reinstalled: %s", + cog.name, + cog.repo.name, + ", ".join(install_result.failed_reqs), + ) + if install_result.failed_libs: + log.error( + "Failed to reinstall shared libraries for %r cog from %r repo: %s", + cog.repo.name, + ", ".join([lib.name for lib in install_result.failed_libs]), + ) diff --git a/redbot/core/utils/_internal_utils.py b/redbot/core/utils/_internal_utils.py index cd70a760a..70ee11298 100644 --- a/redbot/core/utils/_internal_utils.py +++ b/redbot/core/utils/_internal_utils.py @@ -9,9 +9,12 @@ import os import re import shutil import tarfile +import time import warnings from datetime import datetime +from io import BytesIO from pathlib import Path +from tarfile import TarInfo from typing import ( AsyncIterable, AsyncIterator, @@ -24,6 +27,7 @@ from typing import ( Optional, Union, TypeVar, + TypedDict, TYPE_CHECKING, Tuple, cast, @@ -33,8 +37,9 @@ import aiohttp import discord from packaging.requirements import Requirement import rapidfuzz -from rich.progress import ProgressColumn -from rich.progress_bar import ProgressBar +import rich.progress +from rich.console import Console +from rich.text import Text from red_commons.logging import VERBOSE, TRACE from redbot import VersionInfo @@ -58,6 +63,8 @@ __all__ = ( "fetch_latest_red_version_info", "deprecated_removed", "RichIndefiniteBarColumn", + "RichSpeedColumn", + "detailed_progress", "cli_level_to_log_level", ) @@ -216,7 +223,27 @@ async def format_fuzzy_results( return "Perhaps you wanted one of these? " + box("\n".join(lines), lang="vhdl") +def _tar_addfile_from_string(tar: tarfile.TarFile, name: str, string: str) -> None: + encoded = string.encode("utf-8") + fp = BytesIO(encoded) + + # TarInfo needs `mtime` and `size` + # https://stackoverflow.com/q/53306000 + tar_info = tarfile.TarInfo(name) + tar_info.mtime = time.time() + tar_info.size = len(encoded) + + tar.addfile(tar_info, fp) + + +class BackupDetails(TypedDict): + backup_version: int + + async def create_backup(dest: Path = Path.home()) -> Optional[Path]: + # version of backup + BACKUP_VERSION = 2 + data_path = Path(data_manager.core_data_path().parent) if not data_path.exists(): return None @@ -226,13 +253,20 @@ async def create_backup(dest: Path = Path.home()) -> Optional[Path]: backup_fpath = dest / f"redv3_{data_manager.instance_name()}_{timestr}.tar.gz" to_backup = [] + # we need trailing separator to not exclude files and folders that only start with these names exclusions = [ "__pycache__", + # Lavalink will be downloaded on Audio load "Lavalink.jar", - os.path.join("Downloader", "lib"), - os.path.join("CogManager", "cogs"), - os.path.join("RepoManager", "repos"), - os.path.join("Audio", "logs"), + # cogs and repos installed through Downloader can be reinstalled using restore command + os.path.join("Downloader", "lib", ""), + os.path.join("CogManager", "cogs", ""), + os.path.join("RepoManager", "repos", ""), + os.path.join("Audio", "logs", ""), + # these files are created during backup so we exclude them from data path backup + os.path.join("RepoManager", "repos.json"), + "instance.json", + "backup_details.json", ] # Avoiding circular imports @@ -243,19 +277,42 @@ async def create_backup(dest: Path = Path.home()) -> Optional[Path]: repo_output = [] for repo in repo_mgr.repos: repo_output.append({"url": repo.url, "name": repo.name, "branch": repo.branch}) - repos_file = data_path / "cogs" / "RepoManager" / "repos.json" - with repos_file.open("w") as fs: - json.dump(repo_output, fs, indent=4) - instance_file = data_path / "instance.json" - with instance_file.open("w") as fs: - json.dump({data_manager.instance_name(): data_manager.basic_config}, fs, indent=4) - for f in data_path.glob("**/*"): - if not any(ex in str(f) for ex in exclusions) and f.is_file(): - to_backup.append(f) - with tarfile.open(str(backup_fpath), "w:gz") as tar: - for f in to_backup: - tar.add(str(f), arcname=str(f.relative_to(data_path)), recursive=False) + with rich.progress.Progress( + rich.progress.SpinnerColumn(), + rich.progress.TextColumn("[progress.description]{task.description}"), + RichIndefiniteBarColumn(), + rich.progress.TextColumn("{task.completed} files processed"), + rich.progress.TimeElapsedColumn(), + ) as progress: + for f in progress.track( + data_path.glob("**/*"), description="Preparing files for backup..." + ): + if not any(ex in str(f) for ex in exclusions) and f.is_file(): + to_backup.append(f) + + backup_details: BackupDetails = { + "backup_version": BACKUP_VERSION, + } + + with tarfile.open(str(backup_fpath), "w:gz", dereference=True) as tar: + with detailed_progress(unit="files") as progress: + progress_tracker = progress.track(to_backup, description="Compressing data") + for f in progress_tracker: + tar.add(str(f), arcname=str(f.relative_to(data_path)), recursive=False) + + # add repos backup + repos_data = json.dumps(repo_output, indent=4) + _tar_addfile_from_string(tar, "cogs/RepoManager/repos.json", repos_data) + + # add instance's original data + instance_data = json.dumps( + {data_manager.instance_name(): data_manager.basic_config}, indent=4 + ) + _tar_addfile_from_string(tar, "instance.json", instance_data) + + # add info about backup version + _tar_addfile_from_string(tar, "backup_details.json", json.dumps(backup_details)) return backup_fpath @@ -367,10 +424,10 @@ def deprecated_removed( ) -class RichIndefiniteBarColumn(ProgressColumn): - def render(self, task): - return ProgressBar( - pulse=task.completed < task.total, +class RichIndefiniteBarColumn(rich.progress.ProgressColumn): + def render(self, task: rich.progress.Task) -> rich.progress.ProgressBar: + return rich.progress.ProgressBar( + pulse=task.completed < task.total if task.total is not None else True, animation_time=task.get_time(), width=40, total=task.total, @@ -378,6 +435,33 @@ class RichIndefiniteBarColumn(ProgressColumn): ) +class RichSpeedColumn(rich.progress.ProgressColumn): + def __init__(self, *, unit: str) -> None: + self.unit = unit + super().__init__() + + def render(self, task: rich.progress.Task) -> Text: + speed = task.finished_speed or task.speed + if speed is None: + return Text("?", style="progress.data.speed") + return Text(f"{int(speed)} {self.unit}/s", style="progress.data.speed") + + +def detailed_progress(*, unit: str, console: Optional[Console] = None) -> rich.progress.Progress: + return rich.progress.Progress( + rich.progress.SpinnerColumn(), + rich.progress.TextColumn("[progress.description]{task.description}"), + rich.progress.BarColumn(bar_width=None), + RichSpeedColumn(unit=unit), + rich.progress.TaskProgressColumn(), + rich.progress.TextColumn("eta"), + rich.progress.TimeRemainingColumn(), + rich.progress.TextColumn("elapsed"), + rich.progress.TimeElapsedColumn(), + console=console, + ) + + def cli_level_to_log_level(level: int) -> int: if level == 0: log_level = logging.INFO diff --git a/redbot/setup.py b/redbot/setup.py index 6d7102542..a75083272 100644 --- a/redbot/setup.py +++ b/redbot/setup.py @@ -1,27 +1,35 @@ +from __future__ import annotations + from redbot import _early_init # this needs to be called as early as possible _early_init() import asyncio +import functools import json import logging +import os import sys import re +import tarfile from copy import deepcopy from pathlib import Path -from typing import Dict, Any, Optional, Union +from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Tuple, Union import click import redbot.logging from redbot.core._cli import confirm from redbot.core.utils._internal_utils import ( + BackupDetails, safe_delete, create_backup as red_create_backup, cli_level_to_log_level, + detailed_progress, ) -from redbot.core import config, data_manager +from redbot.core import config, data_manager, _downloader +from redbot.core._cog_manager import CogManager from redbot.core._config import migrate from redbot.core._cli import ExitCodes, asyncio_run from redbot.core.data_manager import appdir, config_dir, config_file @@ -58,10 +66,14 @@ def save_config(name, data, remove=False): json.dump(_config, fs, indent=4) +def get_default_data_path(instance_name: str) -> Path: + return Path(appdir.user_data_dir) / "data" / instance_name + + def get_data_dir(*, instance_name: str, data_path: Optional[Path], interactive: bool) -> str: if data_path is not None: return str(data_path.resolve()) - default_data_path = Path(appdir.user_data_dir) / "data" / instance_name + default_data_path = get_default_data_path(instance_name) if not interactive: return str(default_data_path.resolve()) @@ -99,7 +111,7 @@ def get_data_dir(*, instance_name: str, data_path: Optional[Path], interactive: " You may need to create the directory and set proper permissions" " for it manually before it can be used as the data directory." ) - sys.exit(ExitCodes.INVALID_CLI_USAGE) + continue print(f"You have chosen {str(data_path)!r} to be your data directory.") if click.confirm("Please confirm", default=True): @@ -270,12 +282,15 @@ def get_target_backend(backend: str) -> BackendType: async def do_migration( - current_backend: BackendType, target_backend: BackendType + current_backend: BackendType, + target_backend: BackendType, + new_storage_details: Optional[dict] = None, ) -> Dict[str, Any]: cur_driver_cls = get_driver_class_include_old(current_backend) new_driver_cls = get_driver_class(target_backend) cur_storage_details = data_manager.storage_details() - new_storage_details = new_driver_cls.get_config_details() + if new_storage_details is None: + new_storage_details = new_driver_cls.get_config_details() await cur_driver_cls.initialize(**cur_storage_details) await new_driver_cls.initialize(**new_storage_details) @@ -372,6 +387,379 @@ async def remove_instance_interaction() -> None: await remove_instance(selected, interactive=True) +def open_file_from_tar(tar: tarfile.TarFile, arcname: str) -> Optional[IO[bytes]]: + try: + fp = tar.extractfile(arcname) + except (KeyError, tarfile.StreamError): + return None + return fp + + +class RestoreInfo: + STORAGE_BACKENDS = { + BackendType.JSON: "JSON", + BackendType.POSTGRES: "PostgreSQL", + BackendType.MONGOV1: "MongoDB (unavailable)", + BackendType.MONGO: "MongoDB (unavailable)", + } + + def __init__( + self, + tar: tarfile.TarFile, + backup_details: BackupDetails, + name: str, + data_path: Path, + storage_type: BackendType, + storage_details: dict, + restore_downloader: Optional[bool] = None, + ): + self.tar = tar + self.backup_details = backup_details + self.backup_version = backup_details["backup_version"] + self.name = name + self._data_path = data_path + self.storage_type = storage_type + self.storage_details = storage_details + self._restore_downloader: Optional[bool] = restore_downloader + self._data_path_ensure_result: Optional[bool] = None + + @classmethod + def from_tar( + cls, tar: tarfile.TarFile, *, restore_downloader: Optional[bool] = None + ) -> RestoreInfo: + instance_name, raw_data = cls.get_instance_from_backup(tar) + backup_details = cls.get_backup_details(tar) + + return cls( + tar=tar, + backup_details=backup_details, + name=instance_name, + data_path=Path(raw_data["DATA_PATH"]), + storage_type=BackendType(raw_data["STORAGE_TYPE"]), + storage_details=raw_data["STORAGE_DETAILS"], + restore_downloader=restore_downloader, + ) + + @staticmethod + def get_instance_from_backup(tar: tarfile.TarFile) -> Tuple[str, dict]: + if (fp := open_file_from_tar(tar, "instance.json")) is None: + print("This isn't a valid backup file!") + sys.exit(1) + with fp: + return json.load(fp).popitem() + + @staticmethod + def get_backup_details(tar: tarfile.TarFile) -> BackupDetails: + if (fp := open_file_from_tar(tar, "backup_details.json")) is None: + # backup version 1 doesn't have the details file + return {"backup_version": 1} + with fp: + backup_details = json.load(fp) + backup_version = backup_details.get("backup_version") + if not isinstance(backup_version, int): + print("This does not appear to be a valid backup.") + sys.exit(1) + if backup_version > 2: + print("This backup was created using newer version of Red. Update Red to restore it.") + sys.exit(1) + return backup_details + + @property + def data_path(self) -> Path: + return self._data_path + + @data_path.setter + def data_path(self, value: Path) -> None: + self._data_path_ensure_result = None + self._data_path = value + + @property + def name_used(self) -> bool: + return self.name in instance_list + + def ensure_data_path(self) -> bool: + if self._data_path_ensure_result is not None: + return self._data_path_ensure_result + if self.data_path.is_absolute(): + try: + # try making the dir since that's most reliant access check, if path does not exist + self.data_path.mkdir(parents=True, exist_ok=True) + except OSError: + self._data_path_ensure_result = False + else: + # if path exists, mkdir above is a no-op so we still have to check for write access + self._data_path_ensure_result = os.access(self.data_path, os.W_OK) + else: + # if path is not absolute, it's not valid on the current OS, e.g. + # Path('D:\\data').is_absolute() is False on Linux/macOS + # Path('/some/path').is_absolute() is False on Windows + self._data_path_ensure_result = False + return self._data_path_ensure_result + + @property + def data_path_not_empty(self) -> bool: + if not self.ensure_data_path(): + return True + try: + return next(self.data_path.glob("*"), None) is not None + except OSError: + return True + + @property + def backend_unavailable(self) -> bool: + return self.storage_type in (BackendType.MONGOV1, BackendType.MONGO) + + @functools.cached_property + def can_restore_downloader(self) -> bool: + return "cogs/RepoManager/repos.json" in self.all_tar_member_names + + @functools.cached_property + def restore_downloader(self) -> bool: + if self._restore_downloader is not None: + return self.can_restore_downloader + return self.can_restore_downloader and click.confirm( + "Do you want to restore 3rd-party repos and cogs installed through Downloader?", + default=True, + ) + + @functools.cached_property + def all_tar_members(self) -> List[tarfile.TarInfo]: + return self.tar.getmembers() + + @functools.cached_property + def all_tar_member_names(self) -> List[str]: + return [tarinfo.name for tarinfo in self.all_tar_members] + + def get_tar_members_to_extract(self) -> List[tarfile.TarInfo]: + ignored_members: Set[str] = {"backup_details.json", "instance.json"} + if not self.restore_downloader: + ignored_members |= { + "cogs/RepoManager/repos.json", + "cogs/RepoManager/settings.json", + "cogs/Downloader/settings.json", + } + return [member for member in self.all_tar_members if member.name not in ignored_members] + + def print_instance_data(self) -> None: + print("\nWhen the instance was backed up, it was using these settings:") + print(" Original instance name:", self.name) + print(" Original data path:", self.data_path) + print(" Original storage backend:", self.STORAGE_BACKENDS[self.storage_type]) + self.print_storage_details() + + def print_storage_details(self, *, original: bool = True) -> None: + if self.storage_type is BackendType.POSTGRES: + if original: + print(" Original storage details:") + else: + print(" Storage details:") + for key in ("host", "port", "database", "user"): + print(f" - DB {key}:", self.storage_details[key]) + print(" - DB password: ***") + + def ask_for_changes(self, *, interactive: bool) -> None: + if interactive: + self._ask_for_optional_changes() + self._ask_for_required_changes(interactive=interactive) + + def _ask_for_optional_changes(self) -> None: + if click.confirm("\nWould you like to change anything?"): + if not self.name_used and click.confirm("Do you want to use different instance name?"): + self._ask_for_name() + if not self.data_path_not_empty and click.confirm( + "Do you want to use different data path?" + ): + self._ask_for_data_path() + if not self.backend_unavailable and click.confirm( + "Do you want to use different storage backend or change storage details?" + ): + self._ask_for_storage() + + @staticmethod + def _error_and_exit(message: str) -> NoReturn: + print(f"ERROR: {message}") + sys.exit(1) + + @staticmethod + def _warning(message: str) -> None: + print(f"WARNING: {message}") + + @staticmethod + def _info(message: str) -> None: + print(f"INFO: {message}") + + def _ask_for_required_changes(self, interactive: bool) -> None: + p = self._warning if interactive else self._error_and_exit + if self.name_used: + p("Original instance name is already used by a different instance.") + p("Continuing will overwrite the existing instance config.") + if click.confirm("Do you want to use different instance name?", default=True): + self._ask_for_name() + if not self.ensure_data_path(): + p( + "Original data path can't be used as it cannot be written to by the current user." + " You have to choose a different path." + ) + self._ask_for_data_path() + elif self.data_path_not_empty: + p( + "Original data path can't be used as it's not empty." + " You have to choose a different path." + ) + self._ask_for_data_path() + if self.backend_unavailable: + p( + "Original storage backend is no longer available in Red." + " You have to choose a different backend." + ) + self._ask_for_storage() + + def _ask_for_name(self) -> None: + self.name = get_name("") + + def _ask_for_data_path(self) -> None: + while True: + self.data_path = Path( + get_data_dir(instance_name=self.name, data_path=None, interactive=True) + ) + if not self.ensure_data_path(): + print("Given path can't be used as it cannot be written to by the current user.") + elif self.data_path_not_empty: + print("Given path can't be used as it's not empty.") + else: + return + + def _ask_for_storage(self) -> None: + self.storage_type = get_storage_type(None, interactive=True) + driver_cls = get_driver_class(self.storage_type) + self.storage_details = driver_cls.get_config_details() + + def extractall(self) -> None: + to_extract = self.get_tar_members_to_extract() + with detailed_progress(unit="files") as progress: + progress_tracker = progress.track(to_extract, description="Extracting data") + # tar.errorlevel == 0 so errors are printed to stderr + self.tar.extractall(path=self.data_path, members=progress_tracker) + + def get_basic_config(self, use_json: bool = False) -> dict: + default_dirs = deepcopy(data_manager.basic_config_default) + default_dirs["DATA_PATH"] = str(self.data_path) + if use_json: + default_dirs["STORAGE_TYPE"] = BackendType.JSON.value + default_dirs["STORAGE_DETAILS"] = {} + else: + default_dirs["STORAGE_TYPE"] = self.storage_type.value + default_dirs["STORAGE_DETAILS"] = self.storage_details + return default_dirs + + async def restore_data(self) -> None: + self.extractall() + + # data in backup file is using json + save_config(self.name, self.get_basic_config(use_json=True)) + data_manager.load_basic_configuration(self.name) + + if self.storage_type is not BackendType.JSON: + await do_migration(BackendType.JSON, self.storage_type, self.storage_details) + save_config(self.name, self.get_basic_config()) + data_manager.load_basic_configuration(self.name) + + if self.restore_downloader: + driver_cls = get_driver_class(self.storage_type) + await driver_cls.initialize(**self.storage_details) + try: + await _downloader._init_without_bot(CogManager()) + await _downloader._restore_from_backup() + finally: + await driver_cls.teardown() + elif self.backup_version == 1: + self._info( + "Downloader's data isn't included in the backup file" + " - this backup was created with Red 3.5.24 or older." + ) + elif not self.can_restore_downloader: + self._warning("Downloader's data isn't included in the backup file.") + + async def run( + self, + *, + interactive: bool, + instance_name: str = "", + data_path: Optional[Path] = None, + backend: Optional[BackendType] = None, + use_sane_default_data_path: bool = False, + ) -> None: + storage_details = {} + if backend: + driver_cls = get_driver_class(backend) + storage_details = driver_cls.get_config_details() + print("\n---") + self.print_instance_data() + + if use_sane_default_data_path: + data_path = get_default_data_path(instance_name or self.name) + if instance_name or data_path or backend: + print("\nThe following settings have been overridden with command options:") + if instance_name: + self.name = instance_name + print(" Instance name:", instance_name) + if data_path: + self.data_path = data_path + print(" Data path:", data_path) + if backend: + self.storage_type = backend + self.storage_details = storage_details + print(" Storage backend:", self.STORAGE_BACKENDS[backend]) + self.print_storage_details(original=False) + + self.ask_for_changes(interactive=interactive) + await self.restore_data() + + print("Restore process has been completed.") + + +async def restore_instance( + backup_path: Path, + *, + interactive: bool, + skip_downloader_restore: bool, + instance_name: str, + data_path: Optional[Path], + use_sane_default_data_path: bool = False, + backend: Optional[str], +) -> None: + try: + tar = tarfile.open(backup_path) + except tarfile.ReadError: + print( + "We couldn't open the given backup file. Make sure that you're passing correct file." + ) + return + + print("Hello! This command will guide you through restore process.") + if interactive: + restore_downloader = False if skip_downloader_restore else None + else: + restore_downloader = not skip_downloader_restore + with tar: + # The filter functionality exists on Python 3.11.4+. + # We'll use the value consistent with the 3.11's default + # since there's no reason we shouldn't trust the archive + # that we generated ourselves. + tar.extraction_filter = getattr(tarfile, "fully_trusted_filter", None) + restore_info = RestoreInfo.from_tar( + tar, + restore_downloader=restore_downloader, + ) + await restore_info.run( + interactive=interactive, + instance_name=instance_name, + data_path=data_path, + use_sane_default_data_path=use_sane_default_data_path, + backend=get_target_backend(backend) if backend else None, + ) + + @click.group(invoke_without_command=True) @click.option( "--debug", @@ -561,6 +949,89 @@ def backup(instance: str, destination_folder: Path) -> None: asyncio_run(create_backup(instance, destination_folder)) +@cli.command() +@click.argument( + "backup_file", + type=click.Path(file_okay=True, resolve_path=True, readable=True, path_type=Path), + metavar="", +) +@click.option( + "--no-prompt", + "interactive", + is_flag=True, + default=True, + help="Don't ask for user input during the process. Most of the values", +) +@click.option( + "--no-restore-downloader", + "skip_downloader_restore", + is_flag=True, + default=False, + help="Skip restoring of 3rd-party repos and cogs installed through Downloader.", +) +@click.option( + "--instance-name", + type=str, + default="", + help=( + "Name of the new instance. By default, the name stored in the backup will be used" + " and, if the --no-prompt option was not specified, you will be able to change this" + " before restoring" + ), +) +@click.option( + "--data-path", + type=click.Path(exists=False, dir_okay=True, file_okay=False, writable=True, path_type=Path), + default=None, + help=( + "Data path of the new instance. If this option and --no-prompt are omitted," + " you will be asked for this." + ), +) +@click.option( + "--use-sane-default-data-path", + is_flag=True, + default=False, + help=( + "Use the sane default data path derived from the instance name instead of using data path" + " from the backup or specifying --data-path option." + ), +) +@click.option( + "--backend", + type=click.Choice(["json", "postgres"]), + default=None, + help=( + "Choose a backend type for the new instance." + " By default, the backend of the backed up instance will be used" + " and, if the --no-prompt option was not specified, you will be able to change this" + " before restoring.\n" + "Note: Choosing PostgreSQL will prevent the setup from being completely non-interactive." + ), +) +def restore( + backup_file: Path, + interactive: bool, + skip_downloader_restore: bool, + instance_name: str, + data_path: Optional[Path], + use_sane_default_data_path: bool, + backend: Optional[str], +) -> None: + """Restore instance.""" + asyncio.run( + restore_instance( + backup_file, + interactive=interactive, + skip_downloader_restore=skip_downloader_restore, + instance_name=instance_name, + data_path=data_path, + use_sane_default_data_path=use_sane_default_data_path, + backend=backend, + ) + ) + + def run_cli(): # Setuptools entry point script stuff... try: