Red-DiscordBot/redbot/cogs/audio/audio_dataclasses.py
Draper 7bdd177713 [3.2.3][Audio] Correct an unsupported LoadType (#3337)
* Limit Playlists

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* logging improvements

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* logging improvements

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* sigh

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* chore

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>
2020-01-12 18:35:49 -05:00

638 lines
23 KiB
Python

import ntpath
import os
import posixpath
import re
from pathlib import Path, PosixPath, WindowsPath
from typing import List, Optional, Union, MutableMapping
from urllib.parse import urlparse
import lavalink
from redbot.core import Config
from redbot.core.bot import Red
from redbot.core.i18n import Translator
_config: Optional[Config] = None
_bot: Optional[Red] = None
_localtrack_folder: Optional[str] = None
_ = Translator("Audio", __file__)
_RE_REMOVE_START = re.compile(r"^(sc|list) ")
_RE_YOUTUBE_TIMESTAMP = re.compile(r"&t=(\d+)s?")
_RE_YOUTUBE_INDEX = re.compile(r"&index=(\d+)")
_RE_SPOTIFY_URL = re.compile(r"(http[s]?://)?(open.spotify.com)/")
_RE_SPOTIFY_TIMESTAMP = re.compile(r"#(\d+):(\d+)")
_RE_SOUNDCLOUD_TIMESTAMP = re.compile(r"#t=(\d+):(\d+)s?")
_RE_TWITCH_TIMESTAMP = re.compile(r"\?t=(\d+)h(\d+)m(\d+)s")
_PATH_SEPS = [posixpath.sep, ntpath.sep]
_FULLY_SUPPORTED_MUSIC_EXT = (".mp3", ".flac", ".ogg")
_PARTIALLY_SUPPORTED_MUSIC_EXT = (
".m3u",
".m4a",
".aac",
".ra",
".wav",
".opus",
".wma",
".ts",
".au",
# These do not work
# ".mid",
# ".mka",
# ".amr",
# ".aiff",
# ".ac3",
# ".voc",
# ".dsf",
)
_PARTIALLY_SUPPORTED_VIDEO_EXT = (
".mp4",
".mov",
".flv",
".webm",
".mkv",
".wmv",
".3gp",
".m4v",
".mk3d", # https://github.com/Devoxin/lavaplayer
".mka", # https://github.com/Devoxin/lavaplayer
".mks", # https://github.com/Devoxin/lavaplayer
# These do not work
# ".vob",
# ".mts",
# ".avi",
# ".mpg",
# ".mpeg",
# ".swf",
)
_PARTIALLY_SUPPORTED_MUSIC_EXT += _PARTIALLY_SUPPORTED_VIDEO_EXT
def _pass_config_to_dataclasses(config: Config, bot: Red, folder: str):
global _config, _bot, _localtrack_folder
if _config is None:
_config = config
if _bot is None:
_bot = bot
_localtrack_folder = folder
class LocalPath:
"""Local tracks class.
Used to handle system dir trees in a cross system manner. The only use of this class is for
`localtracks`.
"""
_all_music_ext = _FULLY_SUPPORTED_MUSIC_EXT + _PARTIALLY_SUPPORTED_MUSIC_EXT
def __init__(self, path, **kwargs):
self._path = path
if isinstance(path, (Path, WindowsPath, PosixPath, LocalPath)):
path = str(path.absolute())
elif path is not None:
path = str(path)
self.cwd = Path.cwd()
_lt_folder = Path(_localtrack_folder) if _localtrack_folder else self.cwd
_path = Path(path) if path else self.cwd
if _lt_folder.parts[-1].lower() == "localtracks" and not kwargs.get("forced"):
self.localtrack_folder = _lt_folder
elif kwargs.get("forced"):
if _path.parts[-1].lower() == "localtracks":
self.localtrack_folder = _path
else:
self.localtrack_folder = _path / "localtracks"
else:
self.localtrack_folder = _lt_folder / "localtracks"
try:
_path = Path(path)
_path.relative_to(self.localtrack_folder)
self.path = _path
except (ValueError, TypeError):
for sep in _PATH_SEPS:
if path and path.startswith(f"localtracks{sep}{sep}"):
path = path.replace(f"localtracks{sep}{sep}", "", 1)
elif path and path.startswith(f"localtracks{sep}"):
path = path.replace(f"localtracks{sep}", "", 1)
self.path = self.localtrack_folder.joinpath(path) if path else self.localtrack_folder
try:
if self.path.is_file():
parent = self.path.parent
else:
parent = self.path
self.parent = Path(parent)
except OSError:
self.parent = None
@property
def name(self):
return str(self.path.name)
@property
def suffix(self):
return str(self.path.suffix)
def is_dir(self):
try:
return self.path.is_dir()
except OSError:
return False
def exists(self):
try:
return self.path.exists()
except OSError:
return False
def is_file(self):
try:
return self.path.is_file()
except OSError:
return False
def absolute(self):
try:
return self.path.absolute()
except OSError:
return self._path
@classmethod
def joinpath(cls, *args):
modified = cls(None)
modified.path = modified.path.joinpath(*args)
return modified
def multiglob(self, *patterns):
paths = []
for p in patterns:
paths.extend(list(self.path.glob(p)))
for p in self._filtered(paths):
yield p
def multirglob(self, *patterns):
paths = []
for p in patterns:
paths.extend(list(self.path.rglob(p)))
for p in self._filtered(paths):
yield p
def _filtered(self, paths: List[Path]):
for p in paths:
if p.suffix in self._all_music_ext:
yield p
def __str__(self):
return self.to_string()
def to_string(self):
try:
return str(self.path.absolute())
except OSError:
return str(self._path)
def to_string_user(self, arg: str = None):
string = str(self.absolute()).replace(
(str(self.localtrack_folder.absolute()) + os.sep) if arg is None else arg, ""
)
chunked = False
while len(string) > 145 and os.sep in string:
string = string.split(os.sep, 1)[-1]
chunked = True
if chunked:
string = f"...{os.sep}{string}"
return string
def tracks_in_tree(self):
tracks = []
for track in self.multirglob(*[f"*{ext}" for ext in self._all_music_ext]):
if track.exists() and track.is_file() and track.parent != self.localtrack_folder:
tracks.append(Query.process_input(LocalPath(str(track.absolute()))))
return sorted(tracks, key=lambda x: x.to_string_user().lower())
def subfolders_in_tree(self):
files = list(self.multirglob(*[f"*{ext}" for ext in self._all_music_ext]))
folders = []
for f in files:
if f.exists() and f.parent not in folders and f.parent != self.localtrack_folder:
folders.append(f.parent)
return_folders = []
for folder in folders:
if folder.exists() and folder.is_dir():
return_folders.append(LocalPath(str(folder.absolute())))
return sorted(return_folders, key=lambda x: x.to_string_user().lower())
def tracks_in_folder(self):
tracks = []
for track in self.multiglob(*[f"*{ext}" for ext in self._all_music_ext]):
if track.exists() and track.is_file() and track.parent != self.localtrack_folder:
tracks.append(Query.process_input(LocalPath(str(track.absolute()))))
return sorted(tracks, key=lambda x: x.to_string_user().lower())
def subfolders(self):
files = list(self.multiglob(*[f"*{ext}" for ext in self._all_music_ext]))
folders = []
for f in files:
if f.exists() and f.parent not in folders and f.parent != self.localtrack_folder:
folders.append(f.parent)
return_folders = []
for folder in folders:
if folder.exists() and folder.is_dir():
return_folders.append(LocalPath(str(folder.absolute())))
return sorted(return_folders, key=lambda x: x.to_string_user().lower())
def __eq__(self, other):
if not isinstance(other, LocalPath):
return NotImplemented
return self.path._cparts == other.path._cparts
def __hash__(self):
try:
return self._hash
except AttributeError:
self._hash = hash(tuple(self.path._cparts))
return self._hash
def __lt__(self, other):
if not isinstance(other, LocalPath):
return NotImplemented
return self.path._cparts < other.path._cparts
def __le__(self, other):
if not isinstance(other, LocalPath):
return NotImplemented
return self.path._cparts <= other.path._cparts
def __gt__(self, other):
if not isinstance(other, LocalPath):
return NotImplemented
return self.path._cparts > other.path._cparts
def __ge__(self, other):
if not isinstance(other, LocalPath):
return NotImplemented
return self.path._cparts >= other.path._cparts
class Query:
"""Query data class.
Use: Query.process_input(query) to generate the Query object.
"""
def __init__(self, query: Union[LocalPath, str], **kwargs):
query = kwargs.get("queryforced", query)
self._raw: Union[LocalPath, str] = query
_localtrack: LocalPath = LocalPath(query)
self.track: Union[LocalPath, str] = _localtrack if (
(_localtrack.is_file() or _localtrack.is_dir()) and _localtrack.exists()
) else query
self.valid: bool = query != "InvalidQueryPlaceHolderName"
self.is_local: bool = kwargs.get("local", False)
self.is_spotify: bool = kwargs.get("spotify", False)
self.is_youtube: bool = kwargs.get("youtube", False)
self.is_soundcloud: bool = kwargs.get("soundcloud", False)
self.is_bandcamp: bool = kwargs.get("bandcamp", False)
self.is_vimeo: bool = kwargs.get("vimeo", False)
self.is_mixer: bool = kwargs.get("mixer", False)
self.is_twitch: bool = kwargs.get("twitch", False)
self.is_other: bool = kwargs.get("other", False)
self.is_playlist: bool = kwargs.get("playlist", False)
self.is_album: bool = kwargs.get("album", False)
self.is_search: bool = kwargs.get("search", False)
self.is_stream: bool = kwargs.get("stream", False)
self.single_track: bool = kwargs.get("single", False)
self.id: Optional[str] = kwargs.get("id", None)
self.invoked_from: Optional[str] = kwargs.get("invoked_from", None)
self.local_name: Optional[str] = kwargs.get("name", None)
self.search_subfolders: bool = kwargs.get("search_subfolders", False)
self.spotify_uri: Optional[str] = kwargs.get("uri", None)
self.uri: Optional[str] = kwargs.get("url", None)
self.is_url: bool = kwargs.get("is_url", False)
self.start_time: int = kwargs.get("start_time", 0)
self.track_index: Optional[int] = kwargs.get("track_index", None)
if self.invoked_from == "sc search":
self.is_youtube = False
self.is_soundcloud = True
self.lavalink_query: str = self._get_query()
if self.is_playlist or self.is_album:
self.single_track = False
self._hash = hash(
(
self.valid,
self.is_local,
self.is_spotify,
self.is_youtube,
self.is_soundcloud,
self.is_bandcamp,
self.is_vimeo,
self.is_mixer,
self.is_twitch,
self.is_other,
self.is_playlist,
self.is_album,
self.is_search,
self.is_stream,
self.single_track,
self.id,
self.spotify_uri,
self.start_time,
self.track_index,
self.uri,
)
)
def __str__(self):
return str(self.lavalink_query)
@classmethod
def process_input(cls, query: Union[LocalPath, lavalink.Track, "Query", str], **kwargs):
"""A replacement for :code:`lavalink.Player.load_tracks`. This will try to get a valid
cached entry first if not found or if in valid it will then call the lavalink API.
Parameters
----------
query : Union[Query, LocalPath, lavalink.Track, str]
The query string or LocalPath object.
Returns
-------
Query
Returns a parsed Query object.
"""
if not query:
query = "InvalidQueryPlaceHolderName"
possible_values = {}
if isinstance(query, str):
query = query.strip("<>")
while "ytsearch:" in query:
query = query.replace("ytsearch:", "")
while "scsearch:" in query:
query = query.replace("scsearch:", "")
elif isinstance(query, Query):
for key, val in kwargs.items():
setattr(query, key, val)
return query
elif isinstance(query, lavalink.Track):
possible_values["stream"] = query.is_stream
query = query.uri
possible_values.update(dict(**kwargs))
possible_values.update(cls._parse(query, **kwargs))
return cls(query, **possible_values)
@staticmethod
def _parse(track, **kwargs) -> MutableMapping:
returning = {}
if (
type(track) == type(LocalPath)
and (track.is_file() or track.is_dir())
and track.exists()
):
returning["local"] = True
returning["name"] = track.name
if track.is_file():
returning["single"] = True
elif track.is_dir():
returning["album"] = True
else:
track = str(track)
if track.startswith("spotify:"):
returning["spotify"] = True
if ":playlist:" in track:
returning["playlist"] = True
elif ":album:" in track:
returning["album"] = True
elif ":track:" in track:
returning["single"] = True
_id = track.split(":", 2)[-1]
_id = _id.split("?")[0]
returning["id"] = _id
if "#" in _id:
match = re.search(_RE_SPOTIFY_TIMESTAMP, track)
if match:
returning["start_time"] = (int(match.group(1)) * 60) + int(match.group(2))
returning["uri"] = track
return returning
if track.startswith("sc ") or track.startswith("list "):
if track.startswith("sc "):
returning["invoked_from"] = "sc search"
returning["soundcloud"] = True
elif track.startswith("list "):
returning["invoked_from"] = "search list"
track = _RE_REMOVE_START.sub("", track, 1)
returning["queryforced"] = track
_localtrack = LocalPath(track)
if _localtrack.exists():
if _localtrack.is_file():
returning["local"] = True
returning["single"] = True
returning["name"] = _localtrack.name
return returning
elif _localtrack.is_dir():
returning["album"] = True
returning["local"] = True
returning["name"] = _localtrack.name
return returning
try:
query_url = urlparse(track)
if all([query_url.scheme, query_url.netloc, query_url.path]):
returning["url"] = track
returning["is_url"] = True
url_domain = ".".join(query_url.netloc.split(".")[-2:])
if not query_url.netloc:
url_domain = ".".join(query_url.path.split("/")[0].split(".")[-2:])
if url_domain in ["youtube.com", "youtu.be"]:
returning["youtube"] = True
_has_index = "&index=" in track
if "&t=" in track:
match = re.search(_RE_YOUTUBE_TIMESTAMP, track)
if match:
returning["start_time"] = int(match.group(1))
if _has_index:
match = re.search(_RE_YOUTUBE_INDEX, track)
if match:
returning["track_index"] = int(match.group(1)) - 1
if all(k in track for k in ["&list=", "watch?"]):
returning["track_index"] = 0
returning["playlist"] = True
returning["single"] = False
elif all(x in track for x in ["playlist?"]):
returning["playlist"] = not _has_index
returning["single"] = _has_index
elif any(k in track for k in ["list="]):
returning["track_index"] = 0
returning["playlist"] = True
returning["single"] = False
else:
returning["single"] = True
elif url_domain == "spotify.com":
returning["spotify"] = True
if "/playlist/" in track:
returning["playlist"] = True
elif "/album/" in track:
returning["album"] = True
elif "/track/" in track:
returning["single"] = True
val = re.sub(_RE_SPOTIFY_URL, "", track).replace("/", ":")
if "user:" in val:
val = val.split(":", 2)[-1]
_id = val.split(":", 1)[-1]
_id = _id.split("?")[0]
if "#" in _id:
_id = _id.split("#")[0]
match = re.search(_RE_SPOTIFY_TIMESTAMP, track)
if match:
returning["start_time"] = (int(match.group(1)) * 60) + int(
match.group(2)
)
returning["id"] = _id
returning["uri"] = f"spotify:{val}"
elif url_domain == "soundcloud.com":
returning["soundcloud"] = True
if "#t=" in track:
match = re.search(_RE_SOUNDCLOUD_TIMESTAMP, track)
if match:
returning["start_time"] = (int(match.group(1)) * 60) + int(
match.group(2)
)
if "/sets/" in track:
if "?in=" in track:
returning["single"] = True
else:
returning["playlist"] = True
else:
returning["single"] = True
elif url_domain == "bandcamp.com":
returning["bandcamp"] = True
if "/album/" in track:
returning["album"] = True
else:
returning["single"] = True
elif url_domain == "vimeo.com":
returning["vimeo"] = True
elif url_domain in ["mixer.com", "beam.pro"]:
returning["mixer"] = True
elif url_domain == "twitch.tv":
returning["twitch"] = True
if "?t=" in track:
match = re.search(_RE_TWITCH_TIMESTAMP, track)
if match:
returning["start_time"] = (
(int(match.group(1)) * 60 * 60)
+ (int(match.group(2)) * 60)
+ int(match.group(3))
)
if not any(x in track for x in ["/clip/", "/videos/"]):
returning["stream"] = True
else:
returning["other"] = True
returning["single"] = True
else:
if kwargs.get("soundcloud", False):
returning["soundcloud"] = True
else:
returning["youtube"] = True
returning["search"] = True
returning["single"] = True
except Exception:
returning["search"] = True
returning["youtube"] = True
returning["single"] = True
return returning
def _get_query(self):
if self.is_local:
return self.track.to_string()
elif self.is_spotify:
return self.spotify_uri
elif self.is_search and self.is_youtube:
return f"ytsearch:{self.track}"
elif self.is_search and self.is_soundcloud:
return f"scsearch:{self.track}"
return self.track
def to_string_user(self):
if self.is_local:
return str(self.track.to_string_user())
return str(self._raw)
@property
def suffix(self):
if self.is_local:
return self.track.suffix
return None
def __eq__(self, other):
if not isinstance(other, Query):
return NotImplemented
return self.to_string_user() == other.to_string_user()
def __hash__(self):
try:
return self._hash
except AttributeError:
self._hash = hash(
(
self.valid,
self.is_local,
self.is_spotify,
self.is_youtube,
self.is_soundcloud,
self.is_bandcamp,
self.is_vimeo,
self.is_mixer,
self.is_twitch,
self.is_other,
self.is_playlist,
self.is_album,
self.is_search,
self.is_stream,
self.single_track,
self.id,
self.spotify_uri,
self.start_time,
self.track_index,
self.uri,
)
)
return self._hash
def __lt__(self, other):
if not isinstance(other, Query):
return NotImplemented
return self.to_string_user() < other.to_string_user()
def __le__(self, other):
if not isinstance(other, Query):
return NotImplemented
return self.to_string_user() <= other.to_string_user()
def __gt__(self, other):
if not isinstance(other, Query):
return NotImplemented
return self.to_string_user() > other.to_string_user()
def __ge__(self, other):
if not isinstance(other, Query):
return NotImplemented
return self.to_string_user() >= other.to_string_user()