Draper 2da9b502d8
Audio Cog - v2.3.0 (#4446)
* First commit - Bring everything from dev cog minus NSFW support

* Add a toggle for auto deafen

* Add a one off Send to Owners

* aaaaaaa

* Update this to ensure `get_perms` is not called if the API is disabled

* Apply suggestions from code review

Co-authored-by: Vuks <51289041+Vuks69@users.noreply.github.com>

* silence any errors here (in case API is down so it doesnt affect audio)

* update the message to tell the mto join the Official Red server.

* remove useless sutff, and change dj check order to ensure bot doesnt join VC for non DJ's

* ffs

* Update redbot/cogs/audio/core/tasks/startup.py

Co-authored-by: Twentysix <Twentysix26@users.noreply.github.com>

* Aikas Review

* Add #3995 in here

* update

* *sigh*

* lock behind owner

* to help with debugging

* Revert "to help with debugging"

This reverts commit 8cbf17be

* resolve last review

Co-authored-by: Vuks <51289041+Vuks69@users.noreply.github.com>
Co-authored-by: Twentysix <Twentysix26@users.noreply.github.com>
2020-10-12 11:39:39 -07:00

220 lines
6.3 KiB
Python

import asyncio
import contextlib
import logging
import time
from enum import Enum, unique
from typing import MutableMapping
import discord
from redbot.core import commands
log = logging.getLogger("red.cogs.Audio.task.callback")
class CacheLevel:
__slots__ = ("value",)
def __init__(self, level=0):
if not isinstance(level, int):
raise TypeError(
f"Expected int parameter, received {level.__class__.__name__} instead."
)
elif level < 0:
level = 0
elif level > 0b11111:
level = 0b11111
self.value = level
def __eq__(self, other):
return isinstance(other, CacheLevel) and self.value == other.value
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.value)
def __add__(self, other):
return CacheLevel(self.value + other.value)
def __radd__(self, other):
return CacheLevel(other.value + self.value)
def __sub__(self, other):
return CacheLevel(self.value - other.value)
def __rsub__(self, other):
return CacheLevel(other.value - self.value)
def __str__(self):
return "{0:b}".format(self.value)
def __format__(self, format_spec):
return "{r:{f}}".format(r=self.value, f=format_spec)
def __repr__(self):
return f"<CacheLevel value={self.value}>"
def is_subset(self, other):
"""Returns ``True`` if self has the same or fewer caching levels as other."""
return (self.value & other.value) == self.value
def is_superset(self, other):
"""Returns ``True`` if self has the same or more caching levels as other."""
return (self.value | other.value) == self.value
def is_strict_subset(self, other):
"""Returns ``True`` if the caching level on other are a strict subset of those on self."""
return self.is_subset(other) and self != other
def is_strict_superset(self, other):
"""Returns ``True`` if the caching level on
other are a strict superset of those on self."""
return self.is_superset(other) and self != other
__le__ = is_subset
__ge__ = is_superset
__lt__ = is_strict_subset
__gt__ = is_strict_superset
@classmethod
def all(cls):
"""A factory method that creates a :class:`CacheLevel` with max caching level."""
return cls(0b11111)
@classmethod
def none(cls):
"""A factory method that creates a :class:`CacheLevel` with no caching."""
return cls(0)
@classmethod
def set_spotify(cls):
"""A factory method that creates a :class:`CacheLevel` with Spotify caching level."""
return cls(0b00011)
@classmethod
def set_youtube(cls):
"""A factory method that creates a :class:`CacheLevel` with YouTube caching level."""
return cls(0b00100)
@classmethod
def set_lavalink(cls):
"""A factory method that creates a :class:`CacheLevel` with lavalink caching level."""
return cls(0b11000)
def _bit(self, index):
return bool((self.value >> index) & 1)
def _set(self, index, value):
if value is True:
self.value |= 1 << index
elif value is False:
self.value &= ~(1 << index)
else:
raise TypeError("Value to set for CacheLevel must be a bool.")
@property
def lavalink(self):
""":class:`bool`: Returns ``True`` if a user can deafen other users."""
return self._bit(4)
@lavalink.setter
def lavalink(self, value):
self._set(4, value)
@property
def youtube(self):
""":class:`bool`: Returns ``True`` if a user can move users between other voice
channels."""
return self._bit(2)
@youtube.setter
def youtube(self, value):
self._set(2, value)
@property
def spotify(self):
""":class:`bool`: Returns ``True`` if a user can use voice activation in voice channels."""
return self._bit(1)
@spotify.setter
def spotify(self, value):
self._set(1, value)
class Notifier:
def __init__(
self, ctx: commands.Context, message: discord.Message, updates: MutableMapping, **kwargs
):
self.context = ctx
self.message = message
self.updates = updates
self.color = None
self.last_msg_time = 0
self.cooldown = 5
async def notify_user(
self,
current: int = None,
total: int = None,
key: str = None,
seconds_key: str = None,
seconds: str = None,
):
"""This updates an existing message.
Based on the message found in :variable:`Notifier.updates` as per the `key` param
"""
if self.last_msg_time + self.cooldown > time.time() and not current == total:
return
if self.color is None:
self.color = await self.context.embed_colour()
embed2 = discord.Embed(
colour=self.color,
title=self.updates.get(key, "").format(num=current, total=total, seconds=seconds),
)
if seconds and seconds_key:
embed2.set_footer(text=self.updates.get(seconds_key, "").format(seconds=seconds))
try:
await self.message.edit(embed=embed2)
self.last_msg_time = int(time.time())
except discord.errors.NotFound:
pass
async def update_text(self, text: str):
embed2 = discord.Embed(colour=self.color, title=text)
try:
await self.message.edit(embed=embed2)
except discord.errors.NotFound:
pass
async def update_embed(self, embed: discord.Embed):
try:
await self.message.edit(embed=embed)
self.last_msg_time = int(time.time())
except discord.errors.NotFound:
pass
@unique
class PlaylistScope(Enum):
GLOBAL = "GLOBALPLAYLIST"
GUILD = "GUILDPLAYLIST"
USER = "USERPLAYLIST"
def __str__(self):
return "{0}".format(self.value)
@staticmethod
def list():
return list(map(lambda c: c.value, PlaylistScope))
def task_callback(task: asyncio.Task) -> None:
with contextlib.suppress(asyncio.CancelledError, asyncio.InvalidStateError):
if exc := task.exception():
log.exception(f"{task.get_name()} raised an Exception", exc_info=exc)