[V3 Cog/Data Manager] Bundled Cog Data (#1063)

* Refactor find_spec out of core_commands

* Fix version error when not installed

* initial

* Fix find_cogs call

* Enable copying

* Add helper method for cog creators

* Add warning

* My dpy skillz need work
This commit is contained in:
Will
2017-10-27 20:06:47 -04:00
committed by GitHub
parent 77e29ff43b
commit 09b3642559
5 changed files with 227 additions and 35 deletions

View File

@@ -1,6 +1,9 @@
import sys
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List
import hashlib
import shutil
import logging
import appdirs
@@ -10,7 +13,10 @@ if TYPE_CHECKING:
from . import Config
__all__ = ['load_basic_configuration', 'cog_data_path', 'core_data_path',
'storage_details', 'storage_type']
'load_bundled_data', 'bundled_data_path', 'storage_details',
'storage_type']
log = logging.getLogger("red.data_manager")
jsonio = None
basic_config = None
@@ -108,6 +114,151 @@ def core_data_path() -> Path:
return core_path.resolve()
def _find_data_files(init_location: str) -> (Path, List[Path]):
"""
Discovers all files in the bundled data folder of an installed cog.
Parameters
----------
init_location
Returns
-------
(pathlib.Path, list of pathlib.Path)
"""
init_file = Path(init_location)
if not init_file.is_file():
return []
package_folder = init_file.parent.resolve() / 'data'
if not package_folder.is_dir():
return []
all_files = list(package_folder.rglob("*"))
return package_folder, [p.resolve()
for p in all_files
if p.is_file()]
def _compare_and_copy(to_copy: List[Path], bundled_data_dir: Path, cog_data_dir: Path):
"""
Filters out files from ``to_copy`` that already exist, and are the
same, in ``data_dir``. The files that are different are copied into
``data_dir``.
Parameters
----------
to_copy : list of pathlib.Path
bundled_data_dir : pathlib.Path
cog_data_dir : pathlib.Path
"""
def hash_bytestr_iter(bytesiter, hasher, ashexstr=False):
for block in bytesiter:
hasher.update(block)
return hasher.hexdigest() if ashexstr else hasher.digest()
def file_as_blockiter(afile, blocksize=65536):
with afile:
block = afile.read(blocksize)
while len(block) > 0:
yield block
block = afile.read(blocksize)
lookup = {p: cog_data_dir.joinpath(p.relative_to(bundled_data_dir))
for p in to_copy}
for orig, poss_existing in lookup.items():
if not poss_existing.is_file():
poss_existing.parent.mkdir(exist_ok=True, parents=True)
exists_checksum = None
else:
exists_checksum = hash_bytestr_iter(file_as_blockiter(
poss_existing.open('rb')), hashlib.sha256())
orig_checksum = ...
if exists_checksum is not None:
orig_checksum = hash_bytestr_iter(file_as_blockiter(
orig.open('rb')), hashlib.sha256())
if exists_checksum != orig_checksum:
shutil.copy(str(orig), str(poss_existing))
log.debug("Copying {} to {}".format(
orig, poss_existing
))
def load_bundled_data(cog_instance, init_location: str):
"""
This function copies (and overwrites) data from the ``data/`` folder
of the installed cog.
.. important::
This function MUST be called from the ``setup()`` function of your
cog.
Examples
--------
>>> from redbot.core import data_manager
>>>
>>> def setup(bot):
>>> cog = MyCog()
>>> data_manager.load_bundled_data(cog, __file__)
>>> bot.add_cog(cog)
Parameters
----------
cog_instance
An instance of your cog class.
init_location : str
The ``__file__`` attribute of the file where your ``setup()``
function exists.
"""
bundled_data_folder, to_copy = _find_data_files(init_location)
cog_data_folder = cog_data_path(cog_instance) / 'bundled_data'
_compare_and_copy(to_copy, bundled_data_folder, cog_data_folder)
def bundled_data_path(cog_instance) -> Path:
"""
The "data" directory that has been copied from installed cogs.
.. important::
You should *NEVER* write to this directory. Data manager will
overwrite files in this directory each time `load_bundled_data`
is called. You should instead write to the directory provided by
`cog_data_path`.
Parameters
----------
cog_instance
Returns
-------
pathlib.Path
Path object to the bundled data folder.
Raises
------
FileNotFoundError
If no bundled data folder exists or if it hasn't been loaded yet.
"""
bundled_path = cog_data_path(cog_instance) / 'bundled_data'
if not bundled_path.is_dir():
raise FileNotFoundError("No such directory {}".format(
bundled_path
))
return bundled_path
def storage_type() -> str:
"""Gets the storage type as a string.