Merge V3/feature/audio into V3/develop (a.k.a. audio refactor) (#3459)

This commit is contained in:
Draper
2020-05-20 21:30:06 +01:00
committed by GitHub
parent ef76affd77
commit 8fa47cb789
53 changed files with 12372 additions and 10144 deletions

View File

@@ -1,36 +1,38 @@
import asyncio
import contextlib
import glob
import logging
import ntpath
import os
import posixpath
import re
from pathlib import Path, PosixPath, WindowsPath
from typing import List, Optional, Union, MutableMapping, Iterator, AsyncIterator
from typing import (
AsyncIterator,
Final,
Iterator,
MutableMapping,
Optional,
Tuple,
Union,
Callable,
Pattern,
)
from urllib.parse import urlparse
import lavalink
from redbot.core.utils import AsyncIter
from redbot.core import Config
from redbot.core.bot import Red
from redbot.core.i18n import Translator
_RE_REMOVE_START: Final[Pattern] = re.compile(r"^(sc|list) ")
_RE_YOUTUBE_TIMESTAMP: Final[Pattern] = re.compile(r"[&|?]t=(\d+)s?")
_RE_YOUTUBE_INDEX: Final[Pattern] = re.compile(r"&index=(\d+)")
_RE_SPOTIFY_URL: Final[Pattern] = re.compile(r"(http[s]?://)?(open.spotify.com)/")
_RE_SPOTIFY_TIMESTAMP: Final[Pattern] = re.compile(r"#(\d+):(\d+)")
_RE_SOUNDCLOUD_TIMESTAMP: Final[Pattern] = re.compile(r"#t=(\d+):(\d+)s?")
_RE_TWITCH_TIMESTAMP: Final[Pattern] = re.compile(r"\?t=(\d+)h(\d+)m(\d+)s")
_PATH_SEPS: Final[Tuple[str, str]] = (posixpath.sep, ntpath.sep)
_config: Optional[Config] = None
_bot: Optional[Red] = None
_localtrack_folder: Optional[str] = None
_ = Translator("Audio", __file__)
_RE_REMOVE_START = re.compile(r"^(sc|list) ")
_RE_YOUTUBE_TIMESTAMP = re.compile(r"&t=(\d+)s?")
_RE_YOUTUBE_INDEX = re.compile(r"&index=(\d+)")
_RE_SPOTIFY_URL = re.compile(r"(http[s]?://)?(open.spotify.com)/")
_RE_SPOTIFY_TIMESTAMP = re.compile(r"#(\d+):(\d+)")
_RE_SOUNDCLOUD_TIMESTAMP = re.compile(r"#t=(\d+):(\d+)s?")
_RE_TWITCH_TIMESTAMP = re.compile(r"\?t=(\d+)h(\d+)m(\d+)s")
_PATH_SEPS = [posixpath.sep, ntpath.sep]
_FULLY_SUPPORTED_MUSIC_EXT = (".mp3", ".flac", ".ogg")
_PARTIALLY_SUPPORTED_MUSIC_EXT = (
_FULLY_SUPPORTED_MUSIC_EXT: Final[Tuple[str, ...]] = (".mp3", ".flac", ".ogg")
_PARTIALLY_SUPPORTED_MUSIC_EXT: Tuple[str, ...] = (
".m3u",
".m4a",
".aac",
@@ -49,7 +51,7 @@ _PARTIALLY_SUPPORTED_MUSIC_EXT = (
# ".voc",
# ".dsf",
)
_PARTIALLY_SUPPORTED_VIDEO_EXT = (
_PARTIALLY_SUPPORTED_VIDEO_EXT: Tuple[str, ...] = (
".mp4",
".mov",
".flv",
@@ -72,25 +74,20 @@ _PARTIALLY_SUPPORTED_VIDEO_EXT = (
_PARTIALLY_SUPPORTED_MUSIC_EXT += _PARTIALLY_SUPPORTED_VIDEO_EXT
def _pass_config_to_dataclasses(config: Config, bot: Red, folder: str):
global _config, _bot, _localtrack_folder
if _config is None:
_config = config
if _bot is None:
_bot = bot
_localtrack_folder = folder
log = logging.getLogger("red.cogs.Audio.audio_dataclasses")
class LocalPath:
"""Local tracks class.
Used to handle system dir trees in a cross system manner. The only use of this class is for
`localtracks`.
Used to handle system dir trees in a cross system manner.
The only use of this class is for `localtracks`.
"""
_all_music_ext = _FULLY_SUPPORTED_MUSIC_EXT + _PARTIALLY_SUPPORTED_MUSIC_EXT
def __init__(self, path, **kwargs):
def __init__(self, path, localtrack_folder, **kwargs):
self._localtrack_folder = localtrack_folder
self._path = path
if isinstance(path, (Path, WindowsPath, PosixPath, LocalPath)):
path = str(path.absolute())
@@ -98,9 +95,8 @@ class LocalPath:
path = str(path)
self.cwd = Path.cwd()
_lt_folder = Path(_localtrack_folder) if _localtrack_folder else self.cwd
_lt_folder = Path(self._localtrack_folder) if self._localtrack_folder else self.cwd
_path = Path(path) if path else self.cwd
if _lt_folder.parts[-1].lower() == "localtracks" and not kwargs.get("forced"):
self.localtrack_folder = _lt_folder
elif kwargs.get("forced"):
@@ -165,46 +161,44 @@ class LocalPath:
return self._path
@classmethod
def joinpath(cls, *args):
modified = cls(None)
def joinpath(cls, localpath, *args):
modified = cls(None, localpath)
modified.path = modified.path.joinpath(*args)
return modified
def rglob(self, pattern, folder=False) -> Iterator[str]:
if folder:
return glob.iglob(f"{self.path}{os.sep}**{os.sep}", recursive=True)
return glob.iglob(f"{glob.escape(self.path)}{os.sep}**{os.sep}", recursive=True)
else:
return glob.iglob(f"{self.path}{os.sep}**{os.sep}{pattern}", recursive=True)
return glob.iglob(
f"{glob.escape(self.path)}{os.sep}**{os.sep}*{pattern}", recursive=True
)
def glob(self, pattern, folder=False) -> Iterator[str]:
if folder:
return glob.iglob(f"{self.path}{os.sep}*{os.sep}", recursive=False)
return glob.iglob(f"{glob.escape(self.path)}{os.sep}*{os.sep}", recursive=False)
else:
return glob.iglob(f"{self.path}{os.sep}*{pattern}", recursive=False)
return glob.iglob(f"{glob.escape(self.path)}{os.sep}*{pattern}", recursive=False)
async def _multiglob(self, pattern: str, folder: bool, method: Callable):
async for rp in AsyncIter(method(pattern)):
rp_local = LocalPath(rp, self._localtrack_folder)
if (
(folder and rp_local.is_dir() and rp_local.exists())
or (not folder and rp_local.suffix in self._all_music_ext and rp_local.is_file())
and rp_local.exists()
):
yield rp_local
async def multiglob(self, *patterns, folder=False) -> AsyncIterator["LocalPath"]:
for p in patterns:
for rp in self.glob(p):
rp = LocalPath(rp)
if folder and rp.is_dir() and rp.exists():
yield rp
await asyncio.sleep(0)
else:
if rp.suffix in self._all_music_ext and rp.is_file() and rp.exists():
yield rp
await asyncio.sleep(0)
async for p in AsyncIter(patterns):
async for path in self._multiglob(p, folder, self.glob):
yield path
async def multirglob(self, *patterns, folder=False) -> AsyncIterator["LocalPath"]:
for p in patterns:
for rp in self.rglob(p):
rp = LocalPath(rp)
if folder and rp.is_dir() and rp.exists():
yield rp
await asyncio.sleep(0)
else:
if rp.suffix in self._all_music_ext and rp.is_file() and rp.exists():
yield rp
await asyncio.sleep(0)
async for p in AsyncIter(patterns):
async for path in self._multiglob(p, folder, self.rglob):
yield path
def __str__(self):
return self.to_string()
@@ -238,7 +232,7 @@ class LocalPath:
if track.path.parent != self.localtrack_folder and track.path.relative_to(
self.path
):
tracks.append(Query.process_input(track))
tracks.append(Query.process_input(track, self._localtrack_folder))
return sorted(tracks, key=lambda x: x.to_string_user().lower())
async def subfolders_in_tree(self):
@@ -247,6 +241,7 @@ class LocalPath:
with contextlib.suppress(ValueError):
if (
f not in return_folders
and f.is_dir()
and f.path != self.localtrack_folder
and f.path.relative_to(self.path)
):
@@ -260,7 +255,7 @@ class LocalPath:
if track.path.parent != self.localtrack_folder and track.path.relative_to(
self.path
):
tracks.append(Query.process_input(track))
tracks.append(Query.process_input(track, self._localtrack_folder))
return sorted(tracks, key=lambda x: x.to_string_user().lower())
async def subfolders(self):
@@ -321,18 +316,14 @@ class LocalPath:
class Query:
"""Query data class.
Use: Query.process_input(query) to generate the Query object.
Use: Query.process_input(query, localtrack_folder) to generate the Query object.
"""
def __init__(self, query: Union[LocalPath, str], **kwargs):
def __init__(self, query: Union[LocalPath, str], local_folder_current_path: Path, **kwargs):
query = kwargs.get("queryforced", query)
self._raw: Union[LocalPath, str] = query
_localtrack: LocalPath = LocalPath(query)
self.track: Union[LocalPath, str] = _localtrack if (
(_localtrack.is_file() or _localtrack.is_dir()) and _localtrack.exists()
) else query
self._local_folder_current_path = local_folder_current_path
_localtrack: LocalPath = LocalPath(query, local_folder_current_path)
self.valid: bool = query != "InvalidQueryPlaceHolderName"
self.is_local: bool = kwargs.get("local", False)
@@ -364,6 +355,15 @@ class Query:
self.is_youtube = False
self.is_soundcloud = True
if (_localtrack.is_file() or _localtrack.is_dir()) and _localtrack.exists():
self.local_track_path: Optional[LocalPath] = _localtrack
self.track: str = str(_localtrack.absolute())
self.is_local: bool = True
self.uri = self.track
else:
self.local_track_path: Optional[LocalPath] = None
self.track: str = str(query)
self.lavalink_query: str = self._get_query()
if self.is_playlist or self.is_album:
@@ -397,14 +397,21 @@ class Query:
return str(self.lavalink_query)
@classmethod
def process_input(cls, query: Union[LocalPath, lavalink.Track, "Query", str], **kwargs):
"""A replacement for :code:`lavalink.Player.load_tracks`. This will try to get a valid
cached entry first if not found or if in valid it will then call the lavalink API.
def process_input(
cls,
query: Union[LocalPath, lavalink.Track, "Query", str],
_local_folder_current_path: Path,
**kwargs,
) -> "Query":
"""
Process the input query into its type
Parameters
----------
query : Union[Query, LocalPath, lavalink.Track, str]
The query string or LocalPath object.
_local_folder_current_path: Path
The Current Local Track folder
Returns
-------
Query
@@ -430,12 +437,13 @@ class Query:
query = query.uri
possible_values.update(dict(**kwargs))
possible_values.update(cls._parse(query, **kwargs))
return cls(query, **possible_values)
possible_values.update(cls._parse(query, _local_folder_current_path, **kwargs))
return cls(query, _local_folder_current_path, **possible_values)
@staticmethod
def _parse(track, **kwargs) -> MutableMapping:
returning = {}
def _parse(track, _local_folder_current_path: Path, **kwargs) -> MutableMapping:
"""Parse a track into all the relevant metadata"""
returning: MutableMapping = {}
if (
type(track) == type(LocalPath)
and (track.is_file() or track.is_dir())
@@ -475,7 +483,7 @@ class Query:
track = _RE_REMOVE_START.sub("", track, 1)
returning["queryforced"] = track
_localtrack = LocalPath(track)
_localtrack = LocalPath(track, _local_folder_current_path)
if _localtrack.exists():
if _localtrack.is_file():
returning["local"] = True
@@ -498,7 +506,7 @@ class Query:
if url_domain in ["youtube.com", "youtu.be"]:
returning["youtube"] = True
_has_index = "&index=" in track
if "&t=" in track:
if "&t=" in track or "?t=" in track:
match = re.search(_RE_YOUTUBE_TIMESTAMP, track)
if match:
returning["start_time"] = int(match.group(1))
@@ -599,7 +607,7 @@ class Query:
def _get_query(self):
if self.is_local:
return self.track.to_string()
return self.local_track_path.to_string()
elif self.is_spotify:
return self.spotify_uri
elif self.is_search and self.is_youtube:
@@ -610,13 +618,13 @@ class Query:
def to_string_user(self):
if self.is_local:
return str(self.track.to_string_user())
return str(self.local_track_path.to_string_user())
return str(self._raw)
@property
def suffix(self):
if self.is_local:
return self.track.suffix
return self.local_track_path.suffix
return None
def __eq__(self, other):