Merge pull request #4008 from Drapersniper/node-managing

Another tiny PR :awesome:
This commit is contained in:
aikaterna 2020-06-23 10:09:53 -07:00 committed by GitHub
commit 7e93803c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 4106 additions and 22 deletions

View File

@ -1,2 +1,19 @@
from . import constants as constants
from . import errors as errors
from . import regex as regex
from . import wavelink as wavelink
from ._internal.wavelink.overwrites import (
LoadType as LoadType,
PlayerStatus as PlayerStatus,
RedClient as RedClient,
RedEqualizer as RedEqualizer,
RedNode as RedNode,
RedPlayer as RedPlayer,
RedTrack as RedTrack,
RedTrackPlaylist as RedTrackPlaylist,
Votes as Votes,
)
from ._internal.wavelink.events import QueueEnd as QueueEnd
from ._internal.playlists.enums import PlaylistScope as PlaylistScope
from . import config as config
from .config import _init as _init
from . import _internal as _internal

View File

@ -0,0 +1,6 @@
from . import nodes as nodes
from . import setting_cache as setting_cache
from . import playlists as playlists
from . import sql as sql
from . import wavelink as wavelink
from . import abstract as abstract

View File

@ -0,0 +1,137 @@
from __future__ import annotations
from typing import Any
__all__ = ["CacheLevel"]
class CacheLevel:
__slots__ = ("value",)
def __init__(self, level: int = 0) -> None:
if not isinstance(level, int):
raise TypeError(
f"Expected int parameter, received {level.__class__.__name__} instead."
)
elif level < 0:
level = 0
elif level > 0b11111:
level = 0b11111
self.value = level
def __eq__(self, other: Any) -> bool:
return isinstance(other, CacheLevel) and self.value == other.value
def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)
def __hash__(self) -> int:
return hash(self.value)
def __add__(self, other: CacheLevel) -> CacheLevel:
return CacheLevel(self.value + other.value)
def __radd__(self, other: CacheLevel) -> CacheLevel:
return CacheLevel(other.value + self.value)
def __sub__(self, other: CacheLevel) -> CacheLevel:
return CacheLevel(self.value - other.value)
def __rsub__(self, other: CacheLevel) -> CacheLevel:
return CacheLevel(other.value - self.value)
def __str__(self) -> str:
return "{0:b}".format(self.value)
def __format__(self, format_spec: str) -> str:
return "{r:{f}}".format(r=self.value, f=format_spec)
def __repr__(self) -> str:
return f"<CacheLevel value={self.value}>"
def is_subset(self, other: CacheLevel) -> bool:
"""Returns ``True`` if self has the same or fewer caching levels as other."""
return (self.value & other.value) == self.value
def is_superset(self, other: CacheLevel) -> bool:
"""Returns ``True`` if self has the same or more caching levels as other."""
return (self.value | other.value) == self.value
def is_strict_subset(self, other: CacheLevel) -> bool:
"""Returns ``True`` if the caching level on other are a strict subset of those on self."""
return self.is_subset(other) and self != other
def is_strict_superset(self, other: CacheLevel) -> bool:
"""Returns ``True`` if the caching level on other are a strict superset of those on
self."""
return self.is_superset(other) and self != other
__le__ = is_subset
__ge__ = is_superset
__lt__ = is_strict_subset
__gt__ = is_strict_superset
@classmethod
def all(cls) -> CacheLevel:
"""A factory method that creates a :class:`CacheLevel` with max caching level."""
return cls(0b11111)
@classmethod
def none(cls) -> CacheLevel:
"""A factory method that creates a :class:`CacheLevel` with no caching."""
return cls(0)
@classmethod
def set_spotify(cls) -> CacheLevel:
"""A factory method that creates a :class:`CacheLevel` with Spotify caching level."""
return cls(0b00011)
@classmethod
def set_youtube(cls) -> CacheLevel:
"""A factory method that creates a :class:`CacheLevel` with YouTube caching level."""
return cls(0b00100)
@classmethod
def set_lavalink(cls) -> CacheLevel:
"""A factory method that creates a :class:`CacheLevel` with lavalink caching level."""
return cls(0b11000)
def _bit(self, index: int) -> bool:
return bool((self.value >> index) & 1)
def _set(self, index: int, value: bool) -> None:
if value is True:
self.value |= 1 << index
elif value is False:
self.value &= ~(1 << index)
else:
raise TypeError("Value to set for CacheLevel must be a bool.")
@property
def lavalink(self) -> bool:
""":class:`bool`: Returns ``True`` if a user can deafen other users."""
return self._bit(4)
@lavalink.setter
def lavalink(self, value: bool) -> None:
self._set(4, value)
@property
def youtube(self) -> bool:
""":class:`bool`: Returns ``True`` if a user can move users between other voice
channels."""
return self._bit(2)
@youtube.setter
def youtube(self, value: bool) -> None:
self._set(2, value)
@property
def spotify(self) -> bool:
""":class:`bool`: Returns ``True`` if a user can use voice activation in voice channels."""
return self._bit(1)
@spotify.setter
def spotify(self, value: bool) -> None:
self._set(1, value)

View File

@ -0,0 +1 @@
from .events import AudioAPIEvents

View File

@ -0,0 +1,44 @@
from __future__ import annotations
import copy
import typing
from .managed import managed_lavalink_connect_task_event
from ... import constants, config
if typing.TYPE_CHECKING:
from redbot.core.bot import Red
__all__ = ["start_nodes"]
async def start_nodes(bot: Red, identifier: typing.Optional[str] = None) -> None:
"""Connect and initiate nodes."""
await bot.wait_until_ready()
if identifier:
if bot.wavelink.nodes:
previous = bot.wavelink.nodes.copy()
if node := previous.get(identifier):
await node.destroy()
async with config._config.nodes.all() as node_data:
if identifier in node_data:
node_copy = copy.copy(node_data[identifier])
elif identifier in constants.DEFAULT_COG_LAVALINK_SETTINGS:
node_copy = copy.copy(constants.DEFAULT_COG_LAVALINK_SETTINGS[identifier])
else:
return
node_copy["region"] = bot.wavelink.get_valid_region(node_copy["region"])
await bot.wavelink.initiate_node(**node_copy)
else:
if bot.wavelink.nodes:
previous = bot.wavelink.nodes.copy()
for node in previous.values():
await node.destroy()
use_managed_lavalink = await config.config_cache.managed_lavalink_server.get_global()
if use_managed_lavalink:
await managed_lavalink_attempt_connect(timeout=120)
await managed_lavalink_connect_task_event.wait()
nodes = await config._config.nodes()
for n in nodes.values():
n["region"] = bot.wavelink.get_valid_region(n["region"])
await bot.wavelink.initiate_node(**n)

View File

@ -0,0 +1,120 @@
from __future__ import annotations
import logging
import sys
import traceback
import wavelink
from redbot.core import commands
from redbot.core.bot import Red
from ..wavelink.events import QueueEnd
from ..wavelink.overwrites import RedNode
log = logging.getLogger("red.core.apis.audio.nodes")
__all__ = ["AudioAPIEvents"]
class AudioAPIEvents(commands.Cog, wavelink.WavelinkMixin):
def __init__(self, bot: Red):
self.bot = bot
@wavelink.WavelinkMixin.listener()
async def on_wavelink_error(self, listener, error: Exception):
"""Event dispatched when an error is raised during mixin listener dispatch.
Parameters
------------
listener:
The listener where an exception was raised.
error: Exception
The exception raised when dispatching a mixin listener.
"""
log.warning(f"Ignoring exception in listener {listener}")
traceback.print_exception(type(error), error, error.__traceback__, file=sys.stderr)
@wavelink.WavelinkMixin.listener()
async def on_node_ready(self, node: RedNode):
"""Listener dispatched when a :class:`wavelink.node.Node` is connected and ready.
Parameters
------------
node: Node
The node associated with the listener event.
"""
@wavelink.WavelinkMixin.listener()
async def on_track_start(self, node: RedNode, payload: wavelink.TrackStart):
"""Listener dispatched when a track starts.
Parameters
------------
node: Node
The node associated with the listener event.
payload: TrackStart
The :class:`wavelink.events.TrackStart` payload.
"""
@wavelink.WavelinkMixin.listener()
async def on_track_end(self, node: RedNode, payload: wavelink.TrackEnd):
"""Listener dispatched when a track ends.
Parameters
------------
node: Node
The node associated with the listener event.
payload: TrackEnd
The :class:`wavelink.events.TrackEnd` payload.
"""
await payload.player.do_next()
@wavelink.WavelinkMixin.listener()
async def on_track_stuck(self, node: RedNode, payload: wavelink.TrackStuck):
"""Listener dispatched when a track is stuck.
Parameters
------------
node: Node
The node associated with the listener event.
payload: TrackStuck
The :class:`wavelink.events.TrackStuck` payload.
"""
await payload.player.do_next()
@wavelink.WavelinkMixin.listener()
async def on_track_exception(self, node: RedNode, payload: wavelink.TrackException):
"""Listener dispatched when a track errors.
Parameters
------------
node: Node
The node associated with the listener event.
payload: TrackException
The :class:`wavelink.events.TrackException` payload.
"""
await payload.player.do_next()
@wavelink.WavelinkMixin.listener()
async def on_websocket_closed(self, node: RedNode, payload: wavelink.WebsocketClosed):
"""Listener dispatched when a node websocket is closed by lavalink.
Parameters
------------
node: Node
The node associated with the listener event.
payload: WebsocketClosed
The :class:`wavelink.events.WebsocketClosed` payload.
"""
@wavelink.WavelinkMixin.listener()
async def on_queue_end(self, node: RedNode, payload: QueueEnd):
"""Listener dispatched when a player queue ends.
Parameters
------------
node: Node
The node associated with the listener event.
payload: QueueEnd
The :class:`QueueEnd` payload.
"""

View File

@ -0,0 +1,368 @@
from __future__ import annotations
import asyncio
import contextlib
import itertools
import logging
import platform
import shutil
import sys
import tempfile
import time
from typing import ClassVar, Optional, Tuple, List
import aiohttp
from tqdm import tqdm
from redbot.core import Config
from ... import constants, regex, errors
__all__ = [
"managed_lavalink_connect_task_event",
"get_latest_lavalink_release",
"LavalinkServerManager",
]
log = logging.getLogger("red.core.apis.audio.nodes.managed")
managed_lavalink_connect_task_event = asyncio.Event()
async def get_latest_lavalink_release():
async with aiohttp.ClientSession() as session:
async with session.get(constants.LAVALINK_JAR_ENDPOINT) as result:
data = await result.json()
return (
data.get("name"),
data.get("tag_name"),
next(
(
i.get("browser_download_url")
for i in data.get("assets", [])
if i.get("name") == "Lavalink.jar"
),
None,
),
)
class LavalinkServerManager:
_java_available: ClassVar[Optional[bool]] = None
_java_version: ClassVar[Optional[Tuple[int, int]]] = None
_up_to_date: ClassVar[Optional[bool]] = None
_blacklisted_archs: List[str] = []
_jar_build: ClassVar[int] = constants.JAR_BUILD
_jar_version: ClassVar[str] = constants.JAR_VERSION
_jar_name: ClassVar[str] = f"{constants.JAR_VERSION}_{constants.JAR_BUILD}"
_jar_download_url: ClassVar[str] = constants.LAVALINK_DOWNLOAD_URL
_lavaplayer: ClassVar[Optional[str]] = None
_lavalink_build: ClassVar[Optional[int]] = None
_jvm: ClassVar[Optional[str]] = None
_lavalink_branch: ClassVar[Optional[str]] = None
_buildtime: ClassVar[Optional[str]] = None
_java_exc: ClassVar[str] = "java"
def __init__(self, config: Config) -> None:
self.ready: asyncio.Event = asyncio.Event()
self._proc: Optional[asyncio.subprocess.Process] = None # pylint:disable=no-member
self._monitor_task: Optional[asyncio.Task] = None
self._shutdown: bool = False
self._config: Config = config
@property
def jvm(self) -> Optional[str]:
return self._jvm
@property
def lavaplayer(self) -> Optional[str]:
return self._lavaplayer
@property
def ll_build(self) -> Optional[int]:
return self._lavalink_build
@property
def ll_branch(self) -> Optional[str]:
return self._lavalink_branch
@property
def build_time(self) -> Optional[str]:
return self._buildtime
async def start(self, java_path: str) -> None:
arch_name = platform.machine()
self._java_exc = java_path
if arch_name in self._blacklisted_archs:
raise asyncio.CancelledError(
"You are attempting to run Lavalink audio on an unsupported machine architecture."
)
if (jar_url := await self._config.lavalink.jar_url()) is not None:
self._jar_name = jar_url
self._jar_download_url = jar_url
self._jar_build = await self._config.lavalink.jar_build() or self._jar_build
else:
if await self._config.lavalink.autoupdate():
with contextlib.suppress(Exception):
name, tag, url = await get_latest_lavalink_release()
if name and "_" in name:
tag = name
version, build = name.split("_")
build = int(build)
elif tag and "_" in tag:
name = tag
version, build = name.split("_")
build = int(build)
else:
name = tag = version = build = None
self._jar_name = name or tag or self._jar_name
self._jar_download_url = url or self._jar_download_url
self._jar_build = build or self._jar_build
self._jar_version = version or self._jar_version
if self._proc is not None:
if self._proc.returncode is None:
raise RuntimeError("Internal Lavalink server is already running")
elif self._shutdown:
raise RuntimeError("Server manager has already been used - create another one")
await self.maybe_download_jar()
# Copy the application.yml across.
# For people to customise their Lavalink server configuration they need to run it
# externally
shutil.copyfile(str(constants.BUNDLED_APP_YML), str(constants.LAVALINK_APP_YML))
args = await self._get_jar_args()
self._proc = await asyncio.subprocess.create_subprocess_exec( # pylint:disable=no-member
*args,
cwd=str(constants.LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
log.info("Internal Lavalink server started. PID: %s", self._proc.pid)
try:
await asyncio.wait_for(self._wait_for_launcher(), timeout=120)
except asyncio.TimeoutError:
log.warning("Timeout occurred whilst waiting for internal Lavalink server to be ready")
self._monitor_task = asyncio.create_task(self._monitor())
async def _get_jar_args(self) -> List[str]:
(java_available, java_version) = await self._has_java()
if java_version is None:
raise RuntimeError(
f"`{self._java_exc}` is not a valid java executable in your machine."
)
if not java_available:
raise RuntimeError("You must install Java 11+ for Lavalink to run.")
if java_version >= (14, 0):
raise errors.UnsupportedJavaVersion(version=java_version)
elif java_version >= (13, 0):
extra_flags = []
elif java_version >= (12, 0):
raise errors.UnsupportedJavaVersion(version=java_version)
elif java_version >= (11, 0):
extra_flags = ["-Djdk.tls.client.protocols=TLSv1.2"]
else:
raise errors.UnsupportedJavaVersion(version=java_version)
return [self._java_exc, *extra_flags, "-jar", str(constants.LAVALINK_JAR_FILE)]
async def _has_java(self) -> Tuple[bool, Optional[Tuple[int, int]]]:
if self._java_available is not None:
# Return cached value if we've checked this before
return self._java_available, self._java_version
java_available = shutil.which(self._java_exc) is not None
if not java_available:
self.java_available = False
self.java_version = None
else:
self._java_version = version = await self._get_java_version()
self._java_available = (11, 0) <= version < (12, 0) or (13, 0) <= version < (14, 0)
return self._java_available, self._java_version
async def _get_java_version(self) -> Tuple[int, int]:
"""This assumes we've already checked that java exists."""
_proc: asyncio.subprocess.Process = await asyncio.create_subprocess_exec( # pylint:disable=no-member
self._java_exc,
"-version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
# java -version outputs to stderr
_, err = await _proc.communicate()
version_info: str = err.decode("utf-8")
# We expect the output to look something like:
# $ java -version
# ...
# ... version "MAJOR.MINOR.PATCH[_BUILD]" ...
# ...
# We only care about the major and minor parts though.
lines = version_info.splitlines()
for line in lines:
match = regex.JAVA_VERSION_LINE.search(line)
short_match = regex.JAVA_SHORT_VERSION.search(line)
if match:
return int(match["major"]), int(match["minor"])
elif short_match:
return int(short_match["major"]), 0
raise RuntimeError(f"The output of `{self._java_exc} -version` was unexpected.")
async def _wait_for_launcher(self) -> None:
log.debug("Waiting for Lavalink server to be ready")
lastmessage = 0
for i in itertools.cycle(range(50)):
line = await self._proc.stdout.readline()
if regex.LAVALINK_READY_LINE.search(line):
self.ready.set()
break
if regex.LAVALINK_READY_LINE.search(line):
raise RuntimeError(f"Lavalink failed to start: {line.decode().strip()}")
if self._proc.returncode is not None and lastmessage + 5 < time.perf_counter():
# Avoid Console spam only print once every 2 seconds
lastmessage = time.perf_counter()
log.critical("Internal lavalink server exited early")
if i == 49:
# Sleep after 50 lines to prevent busylooping
await asyncio.sleep(0.1)
async def _monitor(self) -> None:
while self._proc.returncode is None:
await asyncio.sleep(0.5)
# This task hasn't been cancelled - Lavalink was shut down by something else
log.warning("Internal Lavalink jar shutdown unexpectedly")
if not self._has_java_error():
log.info("Restarting internal Lavalink server")
await self.start(self._java_exc)
else:
log.critical(
"Your Java is borked. Please find the hs_err_pid%d.log file"
" in the Audio data folder and report this issue.",
self._proc.pid,
)
def _has_java_error(self) -> bool:
poss_error_file = constants.LAVALINK_DOWNLOAD_DIR / "hs_err_pid{}.log".format(
self._proc.pid
)
return poss_error_file.exists()
async def shutdown(self) -> None:
if self._shutdown is True or self._proc is None:
# For convenience, calling this method more than once or calling it before starting it
# does nothing.
return
log.info("Shutting down internal Lavalink server")
if self._monitor_task is not None:
self._monitor_task.cancel()
self._proc.terminate()
await self._proc.wait()
self._shutdown = True
async def _download_jar(self) -> None:
log.info("Downloading Lavalink.jar...")
async with aiohttp.ClientSession() as session:
async with session.get(self._jar_download_url) as response:
if response.status == 404:
# A 404 means our LAVALINK_DOWNLOAD_URL is invalid, so likely the jar version
# hasn't been published yet
raise errors.LavalinkDownloadFailed(
f"Lavalink jar version {self._jar_name} hasn't been published " f"yet",
response=response,
should_retry=False,
)
elif 400 <= response.status < 600:
# Other bad responses should be raised but we should retry just incase
raise errors.LavalinkDownloadFailed(response=response, should_retry=True)
fd, path = tempfile.mkstemp()
file = open(fd, "wb")
nbytes = 0
with tqdm(
desc="Lavalink.jar",
total=response.content_length,
file=sys.stdout,
unit="B",
unit_scale=True,
miniters=1,
dynamic_ncols=True,
leave=False,
) as progress_bar:
try:
chunk = await response.content.read(1024)
while chunk:
chunk_size = file.write(chunk)
nbytes += chunk_size
progress_bar.update(chunk_size)
chunk = await response.content.read(1024)
file.flush()
finally:
file.close()
shutil.move(path, str(constants.LAVALINK_JAR_FILE), copy_function=shutil.copyfile)
log.info("Successfully downloaded Lavalink.jar (%s bytes written)", format(nbytes, ","))
await self._is_up_to_date()
async def _is_up_to_date(self):
if self._up_to_date is True:
# Return cached value if we've checked this before
return True
args = await self._get_jar_args()
args.append("--version")
_proc = await asyncio.subprocess.create_subprocess_exec( # pylint:disable=no-member
*args,
cwd=str(constants.LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout = (await _proc.communicate())[0]
if (build := regex.LAVALINK_BUILD_LINE.search(stdout)) is None:
# Output is unexpected, suspect corrupted jarfile
return False
if (branch := regex.LAVALINK_BRANCH_LINE.search(stdout)) is None:
# Output is unexpected, suspect corrupted jarfile
return False
if (java := regex.LAVALINK_JAVA_LINE.search(stdout)) is None:
# Output is unexpected, suspect corrupted jarfile
return False
if (lavaplayer := regex.LAVALINK_LAVAPLAYER_LINE.search(stdout)) is None:
# Output is unexpected, suspect corrupted jarfile
return False
if (buildtime := regex.LAVALINK_BUILD_TIME_LINE.search(stdout)) is None:
# Output is unexpected, suspect corrupted jarfile
return False
build = int(build["build"])
date = buildtime["build_time"].decode()
date = date.replace(".", "/")
self._lavalink_build = build
self._lavalink_branch = branch["branch"].decode()
self._jvm = java["jvm"].decode()
self._lavaplayer = lavaplayer["lavaplayer"].decode()
self._buildtime = date
self._up_to_date = build >= self._jar_build
return self._up_to_date
async def maybe_download_jar(self):
if not (constants.LAVALINK_JAR_FILE.exists() and await self._is_up_to_date()):
await self._download_jar()
if not await self._is_up_to_date():
raise errors.LavalinkDownloadFailed(
f"Download of Lavalink build {self.ll_build} from {self.ll_branch} "
f"({self._jar_download_url}) failed, Excepted build {self._jar_build} "
f"But downloaded {self._lavalink_build}",
should_retry=False,
)

View File

@ -0,0 +1 @@
from . import enums

View File

@ -0,0 +1,21 @@
from __future__ import annotations
import enum
from typing import List
__all__ = ["PlaylistScope"]
@enum.unique
class PlaylistScope(enum.Enum):
GLOBAL = "GLOBALPLAYLIST"
GUILD = "GUILDPLAYLIST"
USER = "USERPLAYLIST"
def __str__(self) -> str:
return "{0}".format(self.value)
@staticmethod
def list() -> List[str]: # type: ignore
return list(map(lambda c: c.value, PlaylistScope))

View File

@ -0,0 +1,89 @@
from __future__ import annotations
from redbot.core import Config
from redbot.core.bot import Red
from .autodc import AutoDCManager
from .autoplay import AutoPlayManager
from .blacklist_whitelist import WhitelistBlacklistManager
from .channel_restrict import ChannelRestrictManager
from .country_code import CountryCodeManager
from .daily_global_playlist import DailyGlobalPlaylistManager
from .daily_playlist import DailyPlaylistManager
from .dj_roles import DJRoleManager
from .dj_status import DJStatusManager
from .emptydc import EmptyDCManager
from .emptydc_timer import EmptyDCTimerManager
from .emptypause import EmptyPauseManager
from .emptypause_timer import EmptyPauseTimerManager
from .globaldb import GlobalDBManager
from .globaldb_timeout import GlobalDBTimeoutManager
from .jukebox import JukeboxManager
from .jukebox_price import JukeboxPriceManager
from .local_cache_age import LocalCacheAgeManager
from .local_cache_level import LocalCacheLevelManager
from .localpath import LocalPathManager
from .lyrics import PreferLyricsManager
from .managed_lavalink_auto_update import LavalinkAutoUpdateManager
from .managed_lavalink_server import ManagedLavalinkManager
from .max_track_length import MaxTrackLengthManager
from .notify import NotifyManager
from .persist_queue import PersistentQueueManager
from .repeat import RepeatManager
from .restrict import URLRestrictManager
from .shuffle import ShuffleManager
from .shuffle_bumped import ShuffleBumpedManager
from .status import StatusManager
from .thumbnail import ThumbnailManager
from .vc_restricted import VCRestrictedManager
from .volume import VolumeManager
from .votes_percentage import VotesPercentageManager
from .voting import VotingManager
__all__ = ["SettingCacheManager"]
class SettingCacheManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True) -> None:
self._config: Config = config
self.bot: Red = bot
self.enabled = enable_cache
self.blacklist_whitelist = WhitelistBlacklistManager(bot, config, self.enabled)
self.dj_roles = DJRoleManager(bot, config, self.enabled)
self.dj_status = DJStatusManager(bot, config, self.enabled)
self.daily_playlist = DailyPlaylistManager(bot, config, self.enabled)
self.daily_global_playlist = DailyGlobalPlaylistManager(bot, config, self.enabled)
self.persistent_queue = PersistentQueueManager(bot, config, self.enabled)
self.votes = VotingManager(bot, config, self.enabled)
self.votes_percentage = VotesPercentageManager(bot, config, self.enabled)
self.shuffle = ShuffleManager(bot, config, self.enabled)
self.shuffle_bumped = ShuffleBumpedManager(bot, config, self.enabled)
self.autoplay = AutoPlayManager(bot, config, self.enabled)
self.thumbnail = ThumbnailManager(bot, config, self.enabled)
self.localpath = LocalPathManager(bot, config, self.enabled)
self.disconnect = AutoDCManager(bot, config, self.enabled)
self.empty_dc = EmptyDCManager(bot, config, self.enabled)
self.empty_dc_timer = EmptyDCTimerManager(bot, config, self.enabled)
self.empty_pause = EmptyPauseManager(bot, config, self.enabled)
self.empty_pause_timer = EmptyPauseTimerManager(bot, config, self.enabled)
self.global_api = GlobalDBManager(bot, config, self.enabled)
self.global_api_timeout = GlobalDBTimeoutManager(bot, config, self.enabled)
self.local_cache_level = LocalCacheLevelManager(bot, config, self.enabled)
self.country_code = CountryCodeManager(bot, config, self.enabled)
self.repeat = RepeatManager(bot, config, self.enabled)
self.channel_restrict = ChannelRestrictManager(bot, config, self.enabled)
self.volume = VolumeManager(bot, config, self.enabled)
self.local_cache_age = LocalCacheAgeManager(bot, config, self.enabled)
self.jukebox = JukeboxManager(bot, config, self.enabled)
self.jukebox_price = JukeboxPriceManager(bot, config, self.enabled)
self.max_track_length = MaxTrackLengthManager(bot, config, self.enabled)
self.prefer_lyrics = PreferLyricsManager(bot, config, self.enabled)
self.notify = NotifyManager(bot, config, self.enabled)
self.status = StatusManager(bot, config, self.enabled)
self.url_restrict = URLRestrictManager(bot, config, self.enabled)
self.managed_lavalink_server = ManagedLavalinkManager(bot, config, self.enabled)
self.managed_lavalink_server_auto_update = LavalinkAutoUpdateManager(
bot, config, self.enabled
)
self.vc_restricted = VCRestrictedManager(bot, config, self.enabled)

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class AutoDCManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
self._cached_global: Dict[None, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).disconnect()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).disconnect.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).disconnect.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["disconnect"]
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.emptydc_enabled()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.emptydc_enabled.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.emptydc_enabled.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["disconnect"]
async def get_context_value(self, guild: discord.Guild) -> Optional[bool]:
if (value := await self.get_global()) is True:
return value
return await self.get_guild(guild)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class AutoPlayManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached:
ret = self._cached[gid]
else:
ret = await self._config.guild_from_id(gid).auto_play()
self._cached[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).auto_play.set(set_to)
self._cached[gid] = set_to
else:
await self._config.guild_from_id(gid).auto_play.clear()
self._cached[gid] = self._config.defaults["GUILD"]["auto_play"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,222 @@
from __future__ import annotations
from typing import Dict, Optional, Set, Union
import discord
from redbot.core import Config
from redbot.core.bot import Red
class WhitelistBlacklistManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_whitelist: Dict[Optional[int], Set[str]] = {}
self._cached_blacklist: Dict[Optional[int], Set[str]] = {}
async def get_whitelist(self, guild: Optional[discord.Guild] = None) -> Set[str]:
ret: Set[str]
gid: Optional[int] = guild.id if guild else None
if self.enable_cache and gid in self._cached_whitelist:
ret = self._cached_whitelist[gid].copy()
else:
if gid is not None:
ret = set(await self._config.guild_from_id(gid).url_keyword_whitelist())
if not ret:
ret = set()
else:
ret = set(await self._config.url_keyword_whitelist())
self._cached_whitelist[gid] = ret.copy()
return {i.lower() for i in ret}
async def add_to_whitelist(self, guild: Optional[discord.Guild], strings: Set[str]) -> None:
gid: Optional[int] = guild.id if guild else None
strings = strings or set()
if not isinstance(strings, set) or any(not isinstance(s, str) for s in strings):
raise TypeError("Whitelisted objects must be a set of strings")
if gid is None:
if gid not in self._cached_whitelist:
self._cached_whitelist[gid] = set(await self._config.url_keyword_whitelist())
for string in strings:
if string not in self._cached_whitelist[gid]:
self._cached_whitelist[gid].add(string)
async with self._config.url_keyword_whitelist() as curr_list:
curr_list.append(string)
else:
if gid not in self._cached_whitelist:
self._cached_whitelist[gid] = set(
await self._config.guild_from_id(gid).url_keyword_whitelist()
)
for string in strings:
if string not in self._cached_whitelist[gid]:
self._cached_whitelist[gid].add(string)
async with self._config.guild_from_id(
gid
).url_keyword_whitelist() as curr_list:
curr_list.append(string)
async def clear_whitelist(self, guild: Optional[discord.Guild] = None) -> None:
gid: Optional[int] = guild.id if guild else None
self._cached_whitelist[gid] = set()
if gid is None:
await self._config.url_keyword_whitelist.clear()
else:
await self._config.guild_from_id(gid).url_keyword_whitelist.clear()
async def remove_from_whitelist(
self, guild: Optional[discord.Guild], strings: Set[str]
) -> None:
gid: Optional[int] = guild.id if guild else None
strings = strings or set()
if not isinstance(strings, set) or any(not isinstance(s, str) for s in strings):
raise TypeError("Whitelisted objects must be a set of strings")
if gid is None:
if gid not in self._cached_whitelist:
self._cached_whitelist[gid] = set(await self._config.url_keyword_whitelist())
for string in strings:
if string in self._cached_whitelist[gid]:
self._cached_whitelist[gid].remove(string)
async with self._config.url_keyword_whitelist() as curr_list:
curr_list.remove(string)
else:
if gid not in self._cached_whitelist:
self._cached_whitelist[gid] = set(
await self._config.guild_from_id(gid).url_keyword_whitelist()
)
for string in strings:
if string in self._cached_whitelist[gid]:
self._cached_whitelist[gid].remove(string)
async with self._config.guild_from_id(
gid
).url_keyword_whitelist() as curr_list:
curr_list.remove(string)
async def get_blacklist(self, guild: Optional[discord.Guild] = None) -> Set[str]:
ret: Set[str]
gid: Optional[int] = guild.id if guild else None
if self.enable_cache and gid in self._cached_blacklist:
ret = self._cached_blacklist[gid].copy()
else:
if gid is not None:
ret = set(await self._config.guild_from_id(gid).url_keyword_blacklist())
if not ret:
ret = set()
else:
ret = set(await self._config.url_keyword_blacklist())
self._cached_blacklist[gid] = ret.copy()
return {i.lower() for i in ret}
async def add_to_blacklist(self, guild: Optional[discord.Guild], strings: Set[str]) -> None:
gid: Optional[int] = guild.id if guild else None
strings = strings or set()
if not isinstance(strings, set) or any(not isinstance(r_or_u, str) for r_or_u in strings):
raise TypeError("Blacklisted objects must be a set of strings")
if gid is None:
if gid not in self._cached_blacklist:
self._cached_blacklist[gid] = set(await self._config.url_keyword_blacklist())
for string in strings:
if string not in self._cached_blacklist[gid]:
self._cached_blacklist[gid].add(string)
async with self._config.url_keyword_blacklist() as curr_list:
curr_list.append(string)
else:
if gid not in self._cached_blacklist:
self._cached_blacklist[gid] = set(
await self._config.guild_from_id(gid).url_keyword_blacklist()
)
for string in strings:
if string not in self._cached_blacklist[gid]:
self._cached_blacklist[gid].add(string)
async with self._config.guild_from_id(
gid
).url_keyword_blacklist() as curr_list:
curr_list.append(string)
async def clear_blacklist(self, guild: Optional[discord.Guild] = None) -> None:
gid: Optional[int] = guild.id if guild else None
self._cached_blacklist[gid] = set()
if gid is None:
await self._config.url_keyword_blacklist.clear()
else:
await self._config.guild_from_id(gid).url_keyword_blacklist.clear()
async def remove_from_blacklist(
self, guild: Optional[discord.Guild], strings: Set[str]
) -> None:
gid: Optional[int] = guild.id if guild else None
strings = strings or set()
if not isinstance(strings, set) or any(not isinstance(r_or_u, str) for r_or_u in strings):
raise TypeError("Blacklisted objects must be a set of strings")
if gid is None:
if gid not in self._cached_blacklist:
self._cached_blacklist[gid] = set(await self._config.url_keyword_blacklist())
for string in strings:
if string in self._cached_blacklist[gid]:
self._cached_blacklist[gid].remove(string)
async with self._config.url_keyword_blacklist() as curr_list:
curr_list.remove(string)
else:
if gid not in self._cached_blacklist:
self._cached_blacklist[gid] = set(
await self._config.guild_from_id(gid).url_keyword_blacklist()
)
for string in strings:
if string in self._cached_blacklist[gid]:
self._cached_blacklist[gid].remove(string)
async with self._config.guild_from_id(
gid
).url_keyword_blacklist() as curr_list:
curr_list.remove(string)
async def allowed_by_whitelist_blacklist(
self, what: Optional[str] = None, *, guild: Optional[Union[discord.Guild, int]] = None,
) -> bool:
if isinstance(guild, int):
guild = self.bot.get_guild(guild)
if global_whitelist := await self.get_whitelist():
if what not in global_whitelist:
return False
else:
# blacklist is only used when whitelist doesn't exist.
global_blacklist = await self.get_blacklist()
if what in global_blacklist:
return False
if guild:
if guild_whitelist := await self.get_whitelist(guild):
if what in guild_whitelist:
return False
else:
guild_blacklist = await self.get_blacklist(guild)
if what in guild_blacklist:
return False
return True
async def get_context_whitelist(
self, guild: Optional[discord.Guild] = None, printable: bool = False
) -> Set[str]:
global_whitelist = await self.get_whitelist()
if printable:
global_whitelist = {f"{s} * Global" for s in global_whitelist}
if guild:
context_whitelist = await self.get_whitelist(guild)
else:
context_whitelist = set()
context_whitelist.update(global_whitelist)
return context_whitelist
async def get_context_blacklist(
self, guild: Optional[discord.Guild] = None, printable: bool = False
) -> Set[str]:
global_blacklist = await self.get_blacklist()
if printable:
global_blacklist = {f"{s} * Global" for s in global_blacklist}
if guild:
context_whitelist = await self.get_blacklist(guild)
else:
context_whitelist = set()
context_whitelist.update(global_blacklist)
return context_whitelist

View File

@ -0,0 +1,120 @@
from __future__ import annotations
from typing import Dict, Optional, Set, Union
import discord
from redbot.core import Config
from redbot.core.bot import Red
class ChannelRestrictManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_vc: Dict[Optional[int], Set[int]] = {}
self._cached_text: Dict[Optional[int], Set[int]] = {}
async def get_context_vc(self, guild: discord.Guild) -> Set[discord.VoiceChannel]:
ret: Set[int]
gid: int = guild.id
if self.enable_cache and gid in self._cached_vc:
ret = self._cached_vc[gid].copy()
else:
ret = await self._config.guild_from_id(gid).whitelisted_vc()
self._cached_vc[gid] = set(ret.copy())
return {vc for cid in ret if (vc := self.bot.get_channel(cid)) is not None}
async def add_to_vc_whitelist(self, guild: discord.Guild, ids: Set[int]) -> None:
gid: int = guild.id
ids = ids or set()
if not isinstance(ids, set) or any(not isinstance(s, int) for s in ids):
raise TypeError("IDs objects must be a set of ints")
if gid not in self._cached_vc:
self._cached_vc[gid] = set(await self._config.guild_from_id(gid).whitelisted_vc())
for cid in ids:
if cid not in self._cached_vc[gid]:
self._cached_vc[gid].add(cid)
async with self._config.guild_from_id(gid).whitelisted_vc() as curr_list:
if cid not in curr_list:
curr_list.append(cid)
async def clear_vc_whitelist(self, guild: discord.Guild) -> None:
gid: int = guild.id
self._cached_vc[gid] = set()
await self._config.guild_from_id(gid).whitelisted_vc.clear()
async def remove_from_vc_whitelist(self, guild: discord.Guild, ids: Set[int]) -> None:
gid: int = guild.id
ids = ids or set()
if not isinstance(ids, set) or any(not isinstance(s, int) for s in ids):
raise TypeError("IDs objects must be a set of ints")
if gid not in self._cached_vc:
self._cached_vc[gid] = set(await self._config.guild_from_id(gid).whitelisted_vc())
for cid in ids:
if cid in self._cached_vc[gid]:
self._cached_vc[gid].remove(cid)
async with self._config.guild_from_id(gid).whitelisted_vc() as curr_list:
if cid in curr_list:
curr_list.remove(cid)
async def get_context_text(self, guild: discord.Guild) -> Set[discord.TextChannel]:
ret: Set[int]
gid: int = guild.id
if self.enable_cache and gid in self._cached_text:
ret = self._cached_text[gid].copy()
else:
ret = await self._config.guild_from_id(gid).whitelisted_text()
self._cached_text[gid] = set(ret.copy())
return {text for cid in ret if (text := self.bot.get_channel(cid)) is not None}
async def add_to_text_whitelist(self, guild: discord.Guild, ids: Set[int]) -> None:
gid: int = guild.id
ids = ids or set()
if not isinstance(ids, set) or any(not isinstance(s, int) for s in ids):
raise TypeError("IDs objects must be a set of ints")
if gid not in self._cached_text:
self._cached_text[gid] = set(await self._config.guild_from_id(gid).whitelisted_text())
for cid in ids:
if cid not in self._cached_text[gid]:
self._cached_text[gid].add(cid)
async with self._config.guild_from_id(gid).whitelisted_text() as curr_list:
if cid not in curr_list:
curr_list.append(cid)
async def clear_text_whitelist(self, guild: discord.Guild) -> None:
gid: int = guild.id
self._cached_text[gid] = set()
await self._config.guild_from_id(gid).whitelisted_text.clear()
async def remove_from_text_whitelist(self, guild: discord.Guild, ids: Set[int]) -> None:
gid: int = guild.id
ids = ids or set()
if not isinstance(ids, set) or any(not isinstance(s, int) for s in ids):
raise TypeError("IDs objects must be a set of ints")
if gid not in self._cached_text:
self._cached_text[gid] = set(await self._config.guild_from_id(gid).whitelisted_text())
for cid in ids:
if cid in self._cached_text[gid]:
self._cached_text[gid].remove(cid)
async with self._config.guild_from_id(gid).whitelisted_text() as curr_list:
if cid in curr_list:
curr_list.remove(cid)
async def allowed_by_whitelist(
self,
what: Union[discord.TextChannel, discord.VoiceChannel],
guild: Union[discord.Guild, int],
) -> bool:
if isinstance(guild, int):
guild = await self.bot.get_guild(guild)
if isinstance(what, discord.VoiceChannel):
if allowed := await self.get_context_vc(guild):
return what in allowed
elif isinstance(what, discord.TextChannel):
if allowed := await self.get_context_text(guild):
return what in allowed
return True

View File

@ -0,0 +1,60 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class CountryCodeManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_user: Dict[int, Optional[str]] = {}
self._cached_guilds: Dict[int, str] = {}
async def get_user(self, user: discord.Member) -> Optional[str]:
ret: Optional[str]
uid: int = user.id
if self.enable_cache and uid in self._cached_user:
ret = self._cached_user[uid]
else:
ret = await self._config.user_from_id(uid).country_code()
self._cached_user[uid] = ret
return ret
async def set_user(self, user: discord.Member, set_to: Optional[str]) -> None:
uid: int = user.id
if set_to is not None:
await self._config.user_from_id(uid).country_code.set(set_to)
self._cached_user[uid] = set_to
else:
await self._config.user_from_id(uid).country_code.clear()
self._cached_user[uid] = self._config.defaults["USER"]["country_code"]
async def get_guild(self, guild: discord.Guild) -> str:
ret: str
gid: int = guild.id
if self.enable_cache and gid in self._cached_guilds:
ret = self._cached_guilds[gid]
else:
ret = await self._config.guild_from_id(gid).country_code()
self._cached_guilds[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[str]) -> None:
gid: int = guild.id
if set_to:
await self._config.guild_from_id(gid).country_code.set(set_to)
self._cached_guilds[gid] = set_to
else:
await self._config.guild_from_id(gid).ignored.clear()
self._cached_user[gid] = self._config.defaults["GUILD"]["country_code"]
async def get_context_value(self, guild: discord.Guild, user: discord.Member,) -> str:
if (code := await self.get_user(user)) is not None:
return code
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class DailyGlobalPlaylistManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.daily_playlists()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.daily_playlists.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.daily_playlists.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["daily_playlists"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class DailyPlaylistManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).daily_playlists()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).daily_playlists.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).daily_playlists.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["daily_playlists"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,73 @@
from __future__ import annotations
from typing import Dict, Optional, Set
import discord
from redbot.core import Config
from redbot.core.bot import Red
class DJRoleManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[Optional[int], Set[int]] = {}
async def get_guild(self, guild: discord.Guild) -> Set[discord.Role]:
ret: Set[int]
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid].copy()
else:
ret = await self._config.guild_from_id(gid).dj_roles()
self._cached_guild[gid] = ret.copy()
return {y for r in ret if (y := guild.get_role(r)) in guild.roles}
async def get_allowed_members(self, guild: discord.Guild) -> Set[discord.Member]:
return {member for role in await self.get_guild(guild) for member in role.members}
async def member_is_dj(self, guild: discord.Guild, member: discord.Member) -> bool:
return member in await self.get_allowed_members(guild)
async def set_guild(
self, guild: discord.Guild, roles: Optional[Set[discord.Role]] = None
) -> None:
gid: int = guild.id
roles = roles or set()
assert isinstance(roles, set)
if not isinstance(roles, set) and any(not isinstance(r, discord.Role) for r in roles):
raise TypeError("Roles must be a set of discord.Role objects")
roles_ids = {y.id for r in roles if (y := guild.get_role(r.id)) in guild.roles}
self._cached_guild[gid] = roles_ids
await self._config.guild_from_id(gid).dj_roles.set(list(roles_ids))
async def add_guild(self, guild: discord.Guild, roles: Set[discord.Role]) -> None:
gid: int = guild.id
assert isinstance(roles, set)
if not isinstance(roles, set) and any(not isinstance(r, discord.Role) for r in roles):
raise TypeError("Roles must be a set of discord.Role objects")
roles = set(sorted(roles, reverse=True))
roles = {y.id for r in roles if (y := guild.get_role(r.id)) in guild.roles}
roles.update(self._cached_guild[gid])
self._cached_guild[gid] = roles
await self._config.guild_from_id(gid).dj_roles.set(list(roles))
async def clear_guild(self, guild: discord.Guild) -> None:
gid: int = guild.id
self._cached_guild[gid] = set()
await self._config.guild_from_id(gid).dj_roles.clear()
async def remove_guild(self, guild: discord.Guild, roles: Set[discord.Role]) -> None:
gid: int = guild.id
if not isinstance(roles, set) or any(not isinstance(r, discord.Role) for r in roles):
raise TypeError("Roles must be a set of discord.Role objects")
if gid not in self._cached_guild:
self._cached_guild[gid] = await self._config.guild_from_id(gid).dj_roles()
for role in roles:
role = role.id
if role in self._cached_guild[gid]:
self._cached_guild[gid].remove(role)
async with self._config.guild_from_id(gid).dj_roles() as curr_list:
curr_list.remove(role)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class DJStatusManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).dj_enabled()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).dj_enabled.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).dj_enabled.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["dj_enabled"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class EmptyDCManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
self._cached_global: Dict[None, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).emptydc_enabled()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).emptydc_enabled.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).emptydc_enabled.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["emptydc_enabled"]
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.emptydc_enabled()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.emptydc_enabled.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.emptydc_enabled.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["emptydc_enabled"]
async def get_context_value(self, guild: discord.Guild) -> Optional[bool]:
if (value := await self.get_global()) is True:
return value
return await self.get_guild(guild)

View File

@ -0,0 +1,61 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class EmptyDCTimerManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
self._cached_global: Dict[None, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).emptydc_timer()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).emptydc_timer.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).emptydc_timer.clear()
await self._config.guild_from_id(gid).emptydc_timer()
self._cached_guild[gid] = self._config.defaults["GUILD"]["emptydc_timer"]
async def get_global(self) -> int:
ret: int
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.emptydc_enabled()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.emptydc_enabled.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.emptydc_enabled.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["emptydc_timer"]
async def get_context_value(self, guild: discord.Guild) -> int:
guild_time = await self.get_guild(guild)
global_time = await self.get_global()
if global_time == 0 or global_time >= guild_time:
return guild_time
return global_time

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class EmptyPauseManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).emptypause_enabled()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).emptypause_enabled.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).emptypause_enabled.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["emptypause_enabled"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class EmptyPauseTimerManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).emptypause_timer()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).emptypause_timer.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).emptypause_timer.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["emptypause_timer"]
async def get_context_value(self, guild: discord.Guild) -> int:
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class GlobalDBManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.global_db_enabled()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.global_db_enabled.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.global_db_enabled.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["global_db_enabled"]
async def get_context_value(self, guild: discord.Guild = None) -> bool:
return await self.get_global()

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional, Union
import discord
from redbot.core import Config
from redbot.core.bot import Red
class GlobalDBTimeoutManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, Union[float, int]] = {}
async def get_global(self) -> Union[float, int]:
ret: Union[float, int]
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.global_db_get_timeout()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[Union[float, int]]) -> None:
if set_to is not None:
await self._config.global_db_get_timeout.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.global_db_get_timeout.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["global_db_get_timeout"]
async def get_context_value(self, guild: discord.Guild = None) -> Union[float, int]:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class JukeboxManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).jukebox()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).jukebox.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).jukebox.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["jukebox"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class JukeboxPriceManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).jukebox_price()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).jukebox_price.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).jukebox_price.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["jukebox_price"]
async def get_context_value(self, guild: discord.Guild) -> int:
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class LocalCacheAgeManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, int] = {}
async def get_global(self) -> int:
ret: int
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.cache_age()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[int]) -> None:
if set_to is not None:
await self._config.global_db_get_timeout.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.global_db_get_timeout.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["cache_age"]
async def get_context_value(self, guild: discord.Guild = None) -> int:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
from ..abstract import CacheLevel
class LocalCacheLevelManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, int] = {}
async def get_global(self) -> CacheLevel:
ret: int
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.cache_level()
self._cached_global[None] = ret
return CacheLevel(int(ret))
async def set_global(self, set_to: Optional[int]) -> None:
if set_to is not None:
await self._config.cache_level.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.cache_level.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["cache_level"]
async def get_context_value(self, guild: discord.Guild = None) -> CacheLevel:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from pathlib import Path
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class LocalPathManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, str] = {}
async def get_global(self) -> Path:
ret: str
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.localpath()
self._cached_global[None] = ret
return Path(ret).absolute()
async def set_global(self, set_to: Optional[Path]) -> None:
gid = None
if set_to is not None:
await self._config.localpath.set(set_to)
self._cached_global[gid] = str(set_to.absolute())
else:
await self._config.localpath.clear()
self._cached_global[gid] = self._config.defaults["GLOBAL"]["localpath"]
async def get_context_value(self, guild: discord.Guild = None) -> Path:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class PreferLyricsManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).prefer_lyrics()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).prefer_lyrics.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).prefer_lyrics.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["prefer_lyrics"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class LavalinkAutoUpdateManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.lavalink.autoupdate()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.lavalink.autoupdate.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.lavalink.autoupdate.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["lavalink"]["autoupdate"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_global()

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class ManagedLavalinkManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.lavalink.use_managed()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.lavalink.use_managed.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.lavalink.use_managed.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["lavalink"]["use_managed"]
async def get_context_value(self, guild: discord.Guild = None) -> bool:
return await self.get_global()

View File

@ -0,0 +1,61 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class MaxTrackLengthManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
self._cached_global: Dict[None, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).emptydc_timer()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).emptydc_timer.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).emptydc_timer.clear()
await self._config.guild_from_id(gid).emptydc_timer()
self._cached_guild[gid] = self._config.defaults["GUILD"]["maxlength"]
async def get_global(self) -> int:
ret: int
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.emptydc_enabled()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.emptydc_enabled.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.emptydc_enabled.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["maxlength"]
async def get_context_value(self, guild: discord.Guild) -> int:
guild_time = await self.get_guild(guild)
global_time = await self.get_global()
if global_time == 0 or global_time >= guild_time:
return guild_time
return global_time

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class NotifyManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).notify()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).notify.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).notify.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["notify"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class PersistentQueueManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
self._cached_global: Dict[None, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).persist_queue()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).persist_queue.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).persist_queue.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["persist_queue"]
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.persist_queue()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.persist_queue.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.persist_queue.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["persist_queue"]
async def get_context_value(self, guild: discord.Guild) -> Optional[bool]:
if (guild_value := await self.get_guild(guild)) is not None:
return guild_value
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class RepeatManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).repeat()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).repeat.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).repeat.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["repeat"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class URLRestrictManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.restrict()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.restrict.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.restrict.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["restrict"]
async def get_context_value(self, guild: discord.Guild = None) -> bool:
return await self.get_global()

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class ShuffleManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).shuffle()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).shuffle.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).shuffle.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["shuffle"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class ShuffleBumpedManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).shuffle_bumped()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).shuffle_bumped.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).shuffle_bumped.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["shuffle_bumped"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,36 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class StatusManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_global: Dict[None, bool] = {}
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.status()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.status.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.status.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["status"]
async def get_context_value(self, guild: discord.Guild = None) -> bool:
return await self.get_global()

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class ThumbnailManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
self._cached_global: Dict[None, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).thumbnail()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).thumbnail.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).thumbnail.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["thumbnail"]
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_guild:
ret = self._cached_global[None]
else:
ret = await self._config.thumbnail()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.thumbnail.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.thumbnail.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["thumbnail"]
async def get_context_value(self, guild: discord.Guild) -> bool:
if (value := await self.get_global()) is not None:
return value
return await self.get_guild(guild)

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class VCRestrictedManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
self._cached_global: Dict[None, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).vc_restricted()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).vc_restricted.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).vc_restricted.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["vc_restricted"]
async def get_global(self) -> bool:
ret: bool
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.vc_restricted()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[bool]) -> None:
if set_to is not None:
await self._config.vc_restricted.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.vc_restricted.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["vc_restricted"]
async def get_context_value(self, guild: discord.Guild) -> Optional[bool]:
if (value := await self.get_global()) is True:
return value
return await self.get_guild(guild)

View File

@ -0,0 +1,90 @@
from __future__ import annotations
from typing import Dict, Optional, Tuple
import discord
from redbot.core import Config
from redbot.core.bot import Red
class VolumeManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
self._cached_channel: Dict[int, int] = {}
self._cached_global: Dict[None, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).volume()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).volume.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).volume.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["volume"]
async def get_global(self) -> int:
ret: int
if self.enable_cache and None in self._cached_global:
ret = self._cached_global[None]
else:
ret = await self._config.volume()
self._cached_global[None] = ret
return ret
async def set_global(self, set_to: Optional[int]) -> None:
if set_to is not None:
await self._config.volume.set(set_to)
self._cached_global[None] = set_to
else:
await self._config.volume.clear()
self._cached_global[None] = self._config.defaults["GLOBAL"]["volume"]
async def get_channel(self, channel: discord.VoiceChannel) -> int:
ret: int
vid: int = channel.id
if self.enable_cache and vid in self._cached_channel:
ret = self._cached_channel[vid]
else:
ret = await self._config.channel_from_id(vid).volume()
self._cached_channel[vid] = ret
return ret
async def set_channel(self, channel: discord.VoiceChannel, set_to: Optional[int]) -> None:
vid: int = channel.id
if set_to is not None:
await self._config.channel_from_id(vid).volume.set(set_to)
self._cached_channel[vid] = set_to
else:
await self._config.channel_from_id(vid).volume.clear()
self._cached_channel[vid] = self._config.defaults["TEXTCHANNEL"]["volume"]
async def get_context_value(
self, guild: discord.Guild, channel: discord.VoiceChannel = None
) -> int:
global_value = await self.get_global()
guild_value = await self.get_guild(guild)
if channel:
channel_value = await self.get_channel(channel)
else:
channel_value = 1000000
return min(global_value, guild_value, channel_value)
async def get_context_max(self, guild: discord.Guild) -> Tuple[int, int]:
global_value = await self.get_global()
guild_value = await self.get_guild(guild)
return global_value, guild_value

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class VotesPercentageManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, int] = {}
async def get_guild(self, guild: discord.Guild) -> int:
ret: int
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).vote_percent()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[int]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).vote_percent.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).vote_percent.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["vote_percent"]
async def get_context_value(self, guild: discord.Guild) -> float:
return await self.get_guild(guild) / 100

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, Optional
import discord
from redbot.core import Config
from redbot.core.bot import Red
class VotingManager:
def __init__(self, bot: Red, config: Config, enable_cache: bool = True):
self._config: Config = config
self.bot = bot
self.enable_cache = enable_cache
self._cached_guild: Dict[int, bool] = {}
async def get_guild(self, guild: discord.Guild) -> bool:
ret: bool
gid: int = guild.id
if self.enable_cache and gid in self._cached_guild:
ret = self._cached_guild[gid]
else:
ret = await self._config.guild_from_id(gid).vote_enabled()
self._cached_guild[gid] = ret
return ret
async def set_guild(self, guild: discord.Guild, set_to: Optional[bool]) -> None:
gid: int = guild.id
if set_to is not None:
await self._config.guild_from_id(gid).vote_enabled.set(set_to)
self._cached_guild[gid] = set_to
else:
await self._config.guild_from_id(gid).vote_enabled.clear()
self._cached_guild[gid] = self._config.defaults["GUILD"]["vote_enabled"]
async def get_context_value(self, guild: discord.Guild) -> bool:
return await self.get_guild(guild)

View File

@ -0,0 +1,5 @@
from __future__ import annotations
from . import lavalink, persist_queue, playlists, pragmas, spotify, youtube
__all__ = ["pragmas", "lavalink", "persist_queue", "playlists", "spotify", "youtube"]

View File

@ -0,0 +1,150 @@
from __future__ import annotations
from typing import Final
__all__ = [
"DROP_TABLE",
"CREATE_TABLE",
"CREATE_INDEX",
"UPSERT",
"UPDATE",
"QUERY",
"QUERY_ALL",
"QUERY_LAST_FETCHED_RANDOM",
"DELETE_OLD_ENTRIES",
"FETCH_ALL_ENTRIES_GLOBAL",
]
DROP_TABLE: Final[
str
] = """
DROP
TABLE
IF EXISTS
lavalink
;
"""
CREATE_TABLE: Final[
str
] = """
CREATE
TABLE
IF NOT EXISTS
lavalink(
query TEXT,
data JSON,
last_updated INTEGER,
last_fetched INTEGER
)
;
"""
CREATE_INDEX: Final[
str
] = """
CREATE
UNIQUE INDEX
IF NOT EXISTS
idx_lavalink_query
ON
lavalink (
query
)
;
"""
UPSERT: Final[
str
] = """ -- noinspection SqlResolve @ any/"excluded"
INSERT
INTO
lavalink (
query,
data,
last_updated,
last_fetched
)
VALUES
(
:query,
:data,
:last_updated,
:last_fetched
)
ON
CONFLICT (
query
)
DO
UPDATE
SET
data = excluded.data,
last_updated = excluded.last_updated
;
"""
UPDATE: Final[
str
] = """
UPDATE
lavalink
SET
last_fetched=:last_fetched
WHERE
query=:query
;
"""
QUERY: Final[
str
] = """
SELECT
data, last_updated
FROM
lavalink
WHERE
query=:query
AND
last_updated > :maxage
LIMIT 1
;
"""
QUERY_ALL: Final[
str
] = """
SELECT
data, last_updated
FROM
lavalink
;
"""
QUERY_LAST_FETCHED_RANDOM: Final[
str
] = """
SELECT
data, last_updated
FROM
lavalink
WHERE
last_fetched > :day
AND
last_updated > :maxage
LIMIT 100
;
"""
DELETE_OLD_ENTRIES: Final[
str
] = """
DELETE
FROM
lavalink
WHERE
last_updated < :maxage
;
"""
FETCH_ALL_ENTRIES_GLOBAL: Final[
str
] = """
SELECT
query, data
FROM
lavalink
;
"""

View File

@ -0,0 +1,128 @@
from __future__ import annotations
from typing import Final
__all__ = [
"DROP_TABLE",
"CREATE_TABLE",
"CREATE_INDEX",
"PLAYED",
"DELETE_SCHEDULED",
"FETCH_ALL",
"UPSERT",
"BULK_PLAYED",
]
DROP_TABLE: Final[
str
] = """
DROP
TABLE
IF EXISTS
persist_queue
;
"""
CREATE_TABLE: Final[
str
] = """
CREATE
TABLE
IF NOT EXISTS
persist_queue(
guild_id INTEGER NOT NULL,
room_id INTEGER NOT NULL,
track JSON NOT NULL,
played BOOLEAN DEFAULT FALSE,
track_id TEXT NOT NULL,
time INTEGER NOT NULL,
PRIMARY KEY
(
guild_id, room_id, track_id
)
)
;
"""
CREATE_INDEX: Final[
str
] = """
CREATE
INDEX
IF NOT EXISTS
track_index
ON
persist_queue (
guild_id, track_id
)
;
"""
PLAYED: Final[
str
] = """
UPDATE
persist_queue
SET
played = TRUE
WHERE
(
guild_id = :guild_id
AND
track_id = :track_id
)
;
"""
BULK_PLAYED: Final[
str
] = """
UPDATE
persist_queue
SET
played = TRUE
WHERE
guild_id = :guild_id
;
"""
DELETE_SCHEDULED: Final[
str
] = """
DELETE
FROM
persist_queue
WHERE
played = TRUE
;
"""
FETCH_ALL: Final[
str
] = """
SELECT
guild_id, room_id, track
FROM
persist_queue
WHERE
played = FALSE
ORDER BY
time
;
"""
UPSERT: Final[
str
] = """ -- noinspection SqlResolve
INSERT INTO
persist_queue (
guild_id, room_id, track, played, track_id, time
)
VALUES
(
:guild_id, :room_id, :track, :played, :track_id, :time
)
ON
CONFLICT (
guild_id, room_id, track_id
)
DO
UPDATE
SET
time = excluded.time
;
"""

View File

@ -0,0 +1,211 @@
from __future__ import annotations
from typing import Final
__all__ = [
"CREATE_TABLE",
"DELETE",
"DELETE_SCOPE",
"DELETE_SCHEDULED",
"FETCH_ALL",
"FETCH_ALL_WITH_FILTER",
"FETCH_ALL_CONVERTER",
"FETCH",
"UPSERT",
"CREATE_INDEX",
]
CREATE_TABLE: Final[
str
] = """
CREATE
TABLE
IF NOT EXISTS
playlists (
scope_type INTEGER NOT NULL,
playlist_id INTEGER NOT NULL,
playlist_name TEXT NOT NULL,
scope_id INTEGER NOT NULL,
author_id INTEGER NOT NULL,
deleted BOOLEAN DEFAULT FALSE,
playlist_url TEXT,
tracks JSON,
PRIMARY KEY
(
playlist_id, scope_id, scope_type
)
)
;
"""
DELETE: Final[
str
] = """
UPDATE
playlists
SET
deleted = TRUE
WHERE
(
scope_type = :scope_type
AND
playlist_id = :playlist_id
AND
scope_id = :scope_id
)
;
"""
DELETE_SCOPE: Final[
str
] = """
DELETE
FROM
playlists
WHERE
scope_type = :scope_type
;
"""
DELETE_SCHEDULED: Final[
str
] = """
DELETE
FROM
playlists
WHERE
deleted = TRUE
;
"""
FETCH_ALL: Final[
str
] = """
SELECT
playlist_id,
playlist_name,
scope_id,
author_id,
playlist_url,
tracks
FROM
playlists
WHERE
scope_type = :scope_type
AND
scope_id = :scope_id
AND
deleted = FALSE
;
"""
FETCH_ALL_WITH_FILTER: Final[
str
] = """
SELECT
playlist_id,
playlist_name,
scope_id,
author_id,
playlist_url,
tracks
FROM
playlists
WHERE
(
scope_type = :scope_type
AND
scope_id = :scope_id
AND
author_id = :author_id
AND
deleted = FALSE
)
;
"""
FETCH_ALL_CONVERTER: Final[
str
] = """-- noinspection SqlResolveForFile
SELECT
playlist_id,
playlist_name,
scope_id,
author_id,
playlist_url,
tracks
FROM
playlists
WHERE
(
scope_type = :scope_type
AND
(
playlist_id = :playlist_id
OR
LOWER(playlist_name) LIKE "%" || COALESCE(LOWER(:playlist_name), "") || "%"
)
AND
deleted = FALSE
)
;
"""
FETCH: Final[
str
] = """
SELECT
playlist_id,
playlist_name,
scope_id,
author_id,
playlist_url,
tracks
FROM
playlists
WHERE
(
scope_type = :scope_type
AND
playlist_id = :playlist_id
AND
scope_id = :scope_id
AND
deleted = FALSE
)
LIMIT 1
;
"""
UPSERT: Final[
str
] = """ -- noinspection SqlResolve @ any/"excluded"
INSERT
INTO
playlists (
scope_type, playlist_id, playlist_name,
scope_id, author_id, playlist_url, tracks
)
VALUES
(
:scope_type, :playlist_id, :playlist_name, :scope_id, :author_id, :playlist_url, :tracks
)
ON
CONFLICT
(
scope_type, playlist_id, scope_id
)
DO
UPDATE
SET
playlist_name = excluded.playlist_name,
playlist_url = excluded.playlist_url,
tracks = excluded.tracks
;
"""
CREATE_INDEX: Final[
str
] = """
CREATE
INDEX
IF NOT EXISTS name_index
ON
playlists (
scope_type, playlist_id, playlist_name, scope_id
)
;
"""

View File

@ -0,0 +1,48 @@
from __future__ import annotations
from typing import Final
__all__ = [
"SET_temp_store",
"SET_journal_mode",
"SET_read_uncommitted",
"FETCH_user_version",
"SET_user_version",
]
SET_temp_store: Final[
str
] = """
PRAGMA
temp_store = 2
;
"""
SET_journal_mode: Final[
str
] = """
PRAGMA
journal_mode = wal
;
"""
SET_read_uncommitted: Final[
str
] = """
PRAGMA
read_uncommitted = 1
;
"""
FETCH_user_version: Final[
str
] = """
PRAGMA
user_version
;
"""
SET_user_version: Final[
str
] = """
PRAGMA
user_version = 3
;
"""

View File

@ -0,0 +1,146 @@
from __future__ import annotations
from typing import Final
__all__ = [
"DROP_TABLE",
"CREATE_INDEX",
"CREATE_TABLE",
"UPSERT",
"QUERY",
"QUERY_ALL",
"UPDATE",
"DELETE_OLD_ENTRIES",
"QUERY_LAST_FETCHED_RANDOM",
]
DROP_TABLE: Final[
str
] = """
DROP
TABLE
IF EXISTS
spotify
;
"""
CREATE_TABLE: Final[
str
] = """
CREATE
TABLE
IF NOT EXISTS
spotify(
id TEXT,
type TEXT,
uri TEXT,
track_name TEXT,
artist_name TEXT,
song_url TEXT,
track_info TEXT,
last_updated INTEGER,
last_fetched INTEGER
)
;
"""
CREATE_INDEX: Final[
str
] = """
CREATE
UNIQUE INDEX
IF NOT EXISTS
idx_spotify_uri
ON
spotify (
id, type, uri
)
;
"""
UPSERT: Final[
str
] = """ -- noinspection SqlResolve @ any/"excluded"
INSERT
INTO
spotify(
id, type, uri, track_name, artist_name,
song_url, track_info, last_updated, last_fetched
)
VALUES
(
:id, :type, :uri, :track_name, :artist_name,
:song_url, :track_info, :last_updated, :last_fetched
)
ON
CONFLICT
(
id,
type,
uri
)
DO
UPDATE
SET
track_name = excluded.track_name,
artist_name = excluded.artist_name,
song_url = excluded.song_url,
track_info = excluded.track_info,
last_updated = excluded.last_updated;
"""
UPDATE: Final[
str
] = """
UPDATE
spotify
SET
last_fetched=:last_fetched
WHERE
uri=:uri
;
"""
QUERY: Final[
str
] = """
SELECT
track_info, last_updated
FROM
spotify
WHERE
uri=:uri
AND
last_updated > :maxage
LIMIT 1
;
"""
QUERY_ALL: Final[
str
] = """
SELECT
track_info, last_updated
FROM
spotify
;
"""
DELETE_OLD_ENTRIES: Final[
str
] = """
DELETE
FROM
spotify
WHERE
last_updated < :maxage
;
"""
QUERY_LAST_FETCHED_RANDOM: Final[
str
] = """
SELECT
track_info, last_updated
FROM
spotify
WHERE
last_fetched > :day
AND
last_updated > :maxage
LIMIT 100
;
"""

View File

@ -0,0 +1,143 @@
from __future__ import annotations
from typing import Final
__all__ = [
"DROP_TABLE",
"CREATE_TABLE",
"CREATE_INDEX",
"UPSERT",
"UPDATE",
"QUERY",
"QUERY_ALL",
"DELETE_OLD_ENTRIES",
"QUERY_LAST_FETCHED_RANDOM",
]
DROP_TABLE: Final[
str
] = """
DROP
TABLE
IF EXISTS
youtube
;
"""
CREATE_TABLE: Final[
str
] = """
CREATE
TABLE
IF NOT EXISTS
youtube(
id INTEGER PRIMARY KEY AUTOINCREMENT,
track_info TEXT,
youtube_url TEXT,
last_updated INTEGER,
last_fetched INTEGER
)
;
"""
CREATE_INDEX: Final[
str
] = """
CREATE
UNIQUE INDEX
IF NOT EXISTS
idx_youtube_url
ON
youtube (
track_info, youtube_url
)
;
"""
UPSERT: Final[
str
] = """-- noinspection SqlResolveForFile
INSERT INTO
youtube
(
track_info,
youtube_url,
last_updated,
last_fetched
)
VALUES
(
:track_info,
:track_url,
:last_updated,
:last_fetched
)
ON CONFLICT
(
track_info,
youtube_url
)
DO
UPDATE
SET
track_info = excluded.track_info,
last_updated = excluded.last_updated
;
"""
UPDATE: Final[
str
] = """
UPDATE
youtube
SET
last_fetched=:last_fetched
WHERE
track_info=:track
;
"""
QUERY: Final[
str
] = """
SELECT
youtube_url, last_updated
FROM
youtube
WHERE
track_info=:track
AND
last_updated > :maxage
LIMIT 1
;
"""
QUERY_ALL: Final[
str
] = """
SELECT
youtube_url, last_updated
FROM
youtube
;
"""
DELETE_OLD_ENTRIES: Final[
str
] = """
DELETE
FROM
youtube
WHERE
last_updated < :maxage
;
"""
QUERY_LAST_FETCHED_RANDOM: Final[
str
] = """
SELECT
youtube_url, last_updated
FROM
youtube
WHERE
last_fetched > :day
AND
last_updated > :maxage
LIMIT 100
;
"""

View File

@ -1,3 +1,2 @@
from . import constants as constants
from . import events as events
from . import overwrites as overwrites

View File

@ -22,8 +22,7 @@ from discord.http import Route
from redbot.core import commands
from .. import regex
from . import constants
from ... import regex, constants
from .events import QueueEnd
__all__ = [

View File

@ -0,0 +1,41 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Optional
from redbot.core import Config
from . import constants
from ._internal.playlists.enums import PlaylistScope
from ._internal.setting_cache import SettingCacheManager
if TYPE_CHECKING:
from redbot.core.bot import Red
__all__ = ["config_cache", "_init", "_bot_ref"]
_config: Optional[Config] = None
_bot_ref: Optional[Red] = None
config_cache: Optional[SettingCacheManager] = None
async def _init(bot: Red):
global _config
global _bot_ref
global config_cache
_bot_ref = bot
_config = Config.get_conf(None, 2711759130, force_registration=True, cog_name="Audio")
_config.init_custom("EQUALIZER", 1)
_config.init_custom(PlaylistScope.GLOBAL.value, 1)
_config.init_custom(PlaylistScope.GUILD.value, 2)
_config.init_custom(PlaylistScope.USER.value, 2)
_config.register_custom("EQUALIZER", **constants.DEFAULT_COG_EQUALIZER_SETTINGS)
_config.register_custom(PlaylistScope.GLOBAL.value, **constants.DEFAULT_COG_PLAYLISTS_SETTINGS)
_config.register_custom(PlaylistScope.GUILD.value, **constants.DEFAULT_COG_PLAYLISTS_SETTINGS)
_config.register_custom(PlaylistScope.USER.value, **constants.DEFAULT_COG_PLAYLISTS_SETTINGS)
_config.register_guild(**constants.DEFAULT_COG_GUILD_SETTINGS)
_config.register_global(**constants.DEFAULT_COG_GLOBAL_SETTINGS)
_config.register_user(**constants.DEFAULT_COG_USER_SETTINGS)
_config.register_channel(**constants.DEFAULT_COG_CHANNEL_SETTINGS)
config_cache = SettingCacheManager(bot, _config, enable_cache=True)

View File

@ -0,0 +1,282 @@
from __future__ import annotations
import pathlib
from typing import Dict, Final, List, Mapping, Optional, Set, Union
from redbot.core import data_manager, Config
from redbot.core.data_manager import cog_data_path
__all__ = [
"SCHEMA_VERSION",
"ARG_PARSER_SCOPE_HELP",
"ARG_PARSER_USER_HELP",
"ARG_PARSER_GUILD_HELP",
"HUMANIZED_PERMS_MAPPING",
"DEFAULT_COG_PERMISSIONS_SETTINGS",
"DEFAULT_COG_LAVALINK_SETTINGS",
"DEFAULT_COG_GUILD_SETTINGS",
"DEFAULT_COG_GLOBAL_SETTINGS",
"DEFAULT_COG_PLAYLISTS_SETTINGS",
"DEFAULT_COG_USER_SETTINGS",
"DEFAULT_COG_CHANNEL_SETTINGS",
"DEFAULT_COG_EQUALIZER_SETTINGS",
"VALID_GLOBAL_DEFAULTS",
"VALID_GUILD_DEFAULTS",
"LAVALINK_DOWNLOAD_DIR",
"LAVALINK_DOWNLOAD_URL",
"LAVALINK_JAR_ENDPOINT",
"LAVALINK_JAR_FILE",
"BUNDLED_APP_YML",
"LAVALINK_APP_YML",
"JAR_VERSION",
"JAR_BUILD",
]
# Needed here so that `LAVALINK_DOWNLOAD_DIR` doesnt blow up
Config.get_conf(None, 2711759130, force_registration=True, cog_name="Audio")
SCHEMA_VERSION: Final[int] = 4
JAR_VERSION: Final[str] = "3.3.1"
JAR_BUILD: Final[int] = 987
LAVALINK_DOWNLOAD_URL: Final[str] = (
"https://github.com/Cog-Creators/Lavalink-Jars/releases/download/"
f"{JAR_VERSION}_{JAR_BUILD}/"
"Lavalink.jar"
)
LAVALINK_JAR_ENDPOINT: Final[str] = (
"https://api.github.com/repos/Cog-Creators/Lavalink-Jars/releases/latest"
)
LAVALINK_DOWNLOAD_DIR: Final[pathlib.Path] = data_manager.cog_data_path(raw_name="Audio")
LAVALINK_JAR_FILE: Final[pathlib.Path] = LAVALINK_DOWNLOAD_DIR / "Lavalink.jar"
BUNDLED_APP_YML: Final[pathlib.Path] = pathlib.Path(
__file__
).parent.parent / "data" / "application.yml"
LAVALINK_APP_YML: Final[pathlib.Path] = LAVALINK_DOWNLOAD_DIR / "application.yml"
ARG_PARSER_SCOPE_HELP: Final[
str
] = """
Scope must be a valid version of one of the following:
Global
Guild
User
"""
ARG_PARSER_USER_HELP: Final[
str
] = """
Author must be a valid version of one of the following:
User ID
User Mention
User Name#123
"""
ARG_PARSER_GUILD_HELP: Final[
str
] = """
Guild must be a valid version of one of the following:
Guild ID
Exact guild name
"""
HUMANIZED_PERMS_MAPPING: Final[Mapping[str, str]] = {
"create_instant_invite": "Create Instant Invite",
"kick_members": "Kick Members",
"ban_members": "Ban Members",
"administrator": "Administrator",
"manage_channels": "Manage Channels",
"manage_guild": "Manage Server",
"add_reactions": "Add Reactions",
"view_audit_log": "View Audit Log",
"priority_speaker": "Priority Speaker",
"stream": "Go Live",
"read_messages": "Read Text Channels & See Voice Channels",
"send_messages": "Send Messages",
"send_tts_messages": "Send TTS Messages",
"manage_messages": "Manage Messages",
"embed_links": "Embed Links",
"attach_files": "Attach Files",
"read_message_history": "Read Message History",
"mention_everyone": "Mention @everyone, @here, and All Roles",
"external_emojis": "Use External Emojis",
"view_guild_insights": "View Server Insights",
"connect": "Connect",
"speak": "Speak",
"mute_members": "Mute Members",
"deafen_members": "Deafen Members",
"move_members": "Move Members",
"use_voice_activation": "Use Voice Activity",
"change_nickname": "Change Nickname",
"manage_nicknames": "Manage Nicknames",
"manage_roles": "Manage Roles",
"manage_webhooks": "Manage Webhooks",
"manage_emojis": "Manage Emojis",
}
DEFAULT_COG_PERMISSIONS_SETTINGS: Final[Dict[str, bool]] = {
"embed_links": True,
"read_messages": True,
"send_messages": True,
"read_message_history": True,
"add_reactions": True,
}
DEFAULT_COG_LAVALINK_SETTINGS: Final[Dict[str, Dict[str, Union[int, str]]]] = {
"2711759130": {
"host": "localhost",
"port": 2333,
"rest_uri": "http://localhost:2333",
"password": "youshallnotpass",
"identifier": "2711759130",
"region": "",
"shard_id": 1,
"search_only": False,
}
}
DEFAULT_COG_GUILD_SETTINGS: Final[
Dict[str, Union[bool, None, int, str, List, Dict[str, Optional[bool]]]]
] = {
"auto_play": False,
"autoplaylist": {"enabled": False, "id": None, "name": None, "scope": None},
"persist_queue": None,
"disconnect": None,
"dj_enabled": False,
"dj_role": None,
"dj_roles": [],
"daily_playlists": False,
"emptydc_enabled": None,
"emptydc_timer": 0,
"emptypause_enabled": False,
"emptypause_timer": 0,
"jukebox": False,
"jukebox_price": 0,
"maxlength": 0,
"notify": False,
"prefer_lyrics": False,
"repeat": False,
"shuffle": False,
"shuffle_bumped": True,
"thumbnail": False,
"volume": 100,
"vote_enabled": False,
"vote_percent": 51,
"url_keyword_blacklist": [],
"url_keyword_whitelist": [],
"whitelisted_vc": [],
"whitelisted_text": [],
"country_code": "US",
"vc_restricted": True,
}
VALID_GUILD_DEFAULTS: Set[str] = {
"auto_play",
"autoplaylist",
"persist_queue",
"disconnect",
"dj_enabled",
"dj_roles",
"daily_playlists",
"emptydc_enabled",
"emptydc_timer",
"emptypause_enabled",
"emptypause_timer",
"jukebox",
"jukebox_price",
"jukebox_price",
"maxlength",
"notify",
"prefer_lyrics",
"repeat",
"shuffle",
"shuffle_bumped",
"thumbnail",
"volume",
"vote_enabled",
"vote_percent",
"url_keyword_blacklist",
"url_keyword_whitelist",
"whitelisted_vc",
"whitelisted_text",
"country_code",
"vc_restricted",
}
DEFAULT_COG_CHANNEL_SETTINGS: Final[str, Union[None, bool, str, int]] = {"volume": 100}
DEFAULT_COG_GLOBAL_SETTINGS: Final[
Dict[str, Union[None, bool, str, List, int, Dict[str, Dict[str, Union[int, str]]]]]
] = {
"schema_version": 1,
"cache_level": 0,
"cache_age": 365,
"daily_playlists": False,
"global_db_enabled": True,
"global_db_get_timeout": 5,
"status": False,
"volume": 250,
"use_external_lavalink": False,
"restrict": True,
"disconnect": False,
"localpath": str(cog_data_path(raw_name="Audio")),
"persist_queue": None,
"emptydc_enabled": False,
"emptydc_timer": 0,
"thumbnail": None,
"maxlength": 0,
"url_keyword_blacklist": [],
"url_keyword_whitelist": [],
"nodes": {},
"lavalink__jar_url": None,
"lavalink__jar_build": None,
"lavalink__use_managed": True,
"lavalink__autoupdate": False,
"vc_restricted": True,
}
VALID_GLOBAL_DEFAULTS: Set[str] = {
"schema_version",
"cache_level",
"cache_age",
"daily_playlists",
"global_db_enabled",
"global_db_get_timeout",
"status",
"restrict",
"disconnect",
"localpath",
"persist_queue",
"emptydc_enabled",
"thumbnail",
"emptydc_timer",
"maxlength",
"url_keyword_blacklist",
"url_keyword_whitelist",
"nodes",
"lavalink",
"volume",
"vc_restricted",
}
DEFAULT_COG_GLOBAL_SETTINGS.update(DEFAULT_COG_LAVALINK_SETTINGS["2711759130"])
DEFAULT_COG_PLAYLISTS_SETTINGS: Final[Dict[str, Union[None, List]]] = {
"id": None,
"author": None,
"name": None,
"playlist_url": None,
"tracks": [],
}
DEFAULT_COG_EQUALIZER_SETTINGS: Final[Dict[str, Union[Dict, List]]] = {
"eq_bands": [],
"eq_presets": {},
}
DEFAULT_COG_USER_SETTINGS: Final[Dict[str, None]] = {"country_code": None}
REGION_AGGREGATION: Dict[str, str] = {
"dubai": "singapore",
"amsterdam": "europe",
"london": "europe",
"frankfurt": "europe",
"eu-central": "europe",
"eu-west": "europe",
}

View File

@ -0,0 +1,265 @@
from __future__ import annotations
from typing import Any, Tuple
import aiohttp
from redbot.core import commands
from redbot.core.commands import CheckFailure, UserFeedbackCheckFailure
__all__ = [
"NoChannelProvided",
"IncorrectChannelError",
"AudioError",
"AudioPermissionError",
"QueryUnauthorized",
"PlayerError",
"AudioConnectionError",
"LavalinkDownloadFailed",
"TrackEnqueueError",
"PlayListError",
"InvalidPlaylistScope",
"MissingGuild",
"MissingAuthor",
"TooManyMatches",
"NoMatchesFound",
"NotAllowed",
"ApiError",
"SpotifyApiError",
"SpotifyFetchError",
"YouTubeApiError",
"DatabaseError",
"InvalidTableError",
"LocalTrackError",
"InvalidLocalTrack",
"InvalidLocalTrackFolder",
"ArgParserFailure",
"NoPlayerFoundError",
"JukeboxError",
"OutOfCredits",
"UnsupportedJavaVersion",
"ManagedLavalinkError",
"PlayerNotPlayingError",
"PlayerNotConnectedError",
"TextChannelNotAllowedError",
"UserNotInPlayerVCError",
"InvalidEnvironmentError",
]
class NoChannelProvided(commands.CommandError):
"""Error raised when no suitable voice channel was supplied."""
class IncorrectChannelError(commands.CommandError):
"""Error raised when commands are issued outside of the players session channel."""
class AudioError(commands.CommandError):
"""Base exception for errors in the Audio cog."""
class AudioPermissionError(AudioError):
"""Base exception for permissions exceptions in the Audio cog."""
class PlayerError(AudioError):
"""Base exception for errors related to the player."""
class NoPlayerFoundError(PlayerError):
"""No Players found."""
class AudioConnectionError(AudioError):
"""Base Exception for errors raised due to a downloads/upload issue."""
class QueryUnauthorized(PlayerError, AudioPermissionError):
"""Provided an unauthorized query to audio."""
def __init__(self, message: str, *args: Any) -> None:
self.message = message
super().__init__(*args)
class TrackEnqueueError(PlayerError):
"""Unable to play track."""
class PlayListError(PlayerError):
"""Base exception for errors related to playlists."""
class InvalidPlaylistScope(PlayListError):
"""Provided playlist scope is not valid."""
class MissingGuild(PlayListError):
"""Trying to access the Guild scope without a guild."""
class MissingAuthor(PlayListError):
"""Trying to access the User scope without an user id."""
class TooManyMatches(PlayListError):
"""Too many playlist match user input."""
class NoMatchesFound(PlayListError):
"""No entries found for this input."""
class NotAllowed(PlayListError):
"""Too many playlist match user input."""
class ApiError(AudioConnectionError):
"""Base exception for API errors in the Audio cog."""
class SpotifyApiError(ApiError):
"""Base exception for Spotify API errors."""
class SpotifyFetchError(SpotifyApiError):
"""Fetching Spotify data failed."""
def __init__(self, message: str, *args: Any) -> None:
self.message = message
super().__init__(*args)
class YouTubeApiError(ApiError):
"""Base exception for YouTube Data API errors."""
class DatabaseError(AudioError):
"""Base exception for database errors in the Audio cog."""
class InvalidTableError(DatabaseError):
"""Provided table to query is not a valid table."""
class LocalTrackError(AudioError):
"""Base exception for local track errors."""
class InvalidLocalTrack(LocalTrackError):
"""Base exception for local track errors."""
class InvalidLocalTrackFolder(LocalTrackError):
"""Base exception for local track errors."""
class ArgParserFailure(AudioError, UserFeedbackCheckFailure):
"""Raised when parsing an argument fails."""
def __init__(
self, cmd: str, user_input: str, custom_help: str = None, ctx_send_help: bool = False
) -> None:
self.cmd = cmd
self.user_input = user_input
self.send_cmd_help = ctx_send_help
self.custom_help_msg = custom_help
class JukeboxError(TrackEnqueueError):
"""Raised for Jukebox related errors."""
class OutOfCredits(JukeboxError):
"""Raised when the user doesn't have enough money to play tracks."""
class ManagedLavalinkError(AudioError):
"""Exceptions Raised by the Local Lavalink Server Manager."""
class UnsupportedJavaVersion(ManagedLavalinkError):
"""The available Java versions is not Supported."""
def __init__(self, *args: Any, version: Tuple[int, int], **kwargs: Any) -> None:
super().__init__(*args)
self.version = version
class LavalinkDownloadFailed(AudioConnectionError, ManagedLavalinkError):
"""Downloading the Lavalink jar failed.
Attributes
----------
response : aiohttp.ClientResponse
The response from the server to the failed GET request.
should_retry : bool
Whether or not the Audio cog should retry downloading the jar.
"""
def __init__(
self, *args: Any, response: aiohttp.ClientResponse, should_retry: bool = False
) -> None:
super().__init__(*args)
self.response = response
self.should_retry = should_retry
def __repr__(self) -> str:
str_args = [*map(str, self.args), self._response_repr()]
return f"LavalinkDownloadFailed({', '.join(str_args)}"
def __str__(self) -> str:
return f"{super().__str__()} {self._response_repr()}"
def _response_repr(self) -> str:
return f"[{self.response.status} {self.response.reason}]"
class AudioCheckError(CheckFailure):
"""Base Exception for all Errors raised by Audio Checks."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class PlayerNotConnectedError(AudioCheckError):
"""Raised when a command required a Player but its connected."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class PlayerNotPlayingError(AudioCheckError):
"""Raised when a command required a playing Player but there's nothing playing."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class TextChannelNotAllowedError(AudioCheckError):
"""Raised when a command is run in a non whitelisted channel."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class VoiceChannelNotAllowedError(AudioCheckError):
"""Raised when the bot tries to connect to a non whitelisted Voice Channel."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class UserNotInPlayerVCError(AudioCheckError):
"""Raised running commands while not in Player VC."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
class InvalidEnvironmentError(AudioCheckError):
"""Raised when the bot enviroment is not supported."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

View File

@ -1,14 +0,0 @@
from __future__ import annotations
import typing
__all__ = ["REGION_AGGREGATION"]
REGION_AGGREGATION: typing.Dict[str, str] = {
"dubai": "singapore",
"amsterdam": "europe",
"london": "europe",
"frankfurt": "europe",
"eu-central": "europe",
"eu-west": "europe",
}

View File

@ -3,7 +3,6 @@ import inspect
import logging
import os
import platform
import re
import shutil
import sys
import contextlib
@ -19,7 +18,6 @@ from typing import (
Dict,
NoReturn,
Set,
Coroutine,
TypeVar,
Callable,
Awaitable,
@ -30,10 +28,8 @@ from types import MappingProxyType
import discord
from discord.ext import commands as dpy_commands
from discord.ext.commands import when_mentioned_or
from discord.ext.commands.bot import BotBase
from . import Config, i18n, commands, errors, drivers, modlog, bank
from .apis.audio.wavelink.overwrites import RedClient
from .cog_manager import CogManager, CogManagerUI
from .core_commands import license_info_command, Core
from .data_manager import cog_data_path
@ -192,6 +188,10 @@ class RedBase(
self._permissions_hooks: List[commands.CheckPredicate] = []
self._red_ready = asyncio.Event()
self._red_before_invoke_objs: Set[PreInvokeCoroutine] = set()
from redbot.core.apis.audio import (
RedClient,
) # This is needed to avoid objects not being ready
self.wavelink = RedClient(bot=self)
def get_command(self, name: str) -> Optional[commands.Command]:
@ -587,6 +587,10 @@ class RedBase(
await modlog._init(self)
bank._init()
from .apis import audio # This is needed to avoid objects not being ready
await audio._init(self)
self.add_cog(audio._internal.nodes.AudioAPIEvents(self))
packages = []