mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-21 10:17:59 -05:00
[Audio] One PR to rule them all, One PR to find them, One PR to bring them all, and in the darkness bind them (all-in-one pr) (#2904)
* More changes Signed-off-by: Guy <guyreis96@gmail.com> * Fixed auto play defaulting to playlist Signed-off-by: Guy <guyreis96@gmail.com> * Localtrack fix Signed-off-by: Guy <guyreis96@gmail.com> * Updated deps .. since for some reason aiosqlite is not being auto installed for everyone Signed-off-by: Guy <guyreis96@gmail.com> * Yupo Signed-off-by: Guy <guyreis96@gmail.com> * Fixed a crash in [p]now Signed-off-by: Guy <guyreis96@gmail.com> * Fixed crash on playlist save Signed-off-by: Guy <guyreis96@gmail.com> * Debugging Commit Signed-off-by: Guy <guyreis96@gmail.com> * Yet more prints Signed-off-by: Guy <guyreis96@gmail.com> * Even more spammy debug Signed-off-by: Guy <guyreis96@gmail.com> * Debugging commit + NEw Dispatches Signed-off-by: Guy <guyreis96@gmail.com> * Debugging commit + NEw Dispatches Signed-off-by: Guy <guyreis96@gmail.com> * Fixed localpath checks Signed-off-by: Guy <guyreis96@gmail.com> * more fixes for Localpaths Signed-off-by: Guy <guyreis96@gmail.com> * Spelling mistake on method Signed-off-by: Guy <guyreis96@gmail.com> * Fixed Crash on event handler Signed-off-by: Guy <guyreis96@gmail.com> * Fixed Crash on local search Signed-off-by: Guy <guyreis96@gmail.com> * Reduced fuzzy match percentage threshold for local tracks to account for nested folders Signed-off-by: Guy <guyreis96@gmail.com> * Fixed a crash on queue end Signed-off-by: Guy <guyreis96@gmail.com> * Sigh ... Removed a duplicate dispatch Signed-off-by: Guy <guyreis96@gmail.com> * Sigh i removed this before ... Signed-off-by: Guy <guyreis96@gmail.com> * Reorder dispatch signatures so all 3 new dispatch have matching signature Signed-off-by: Guy <guyreis96@gmail.com> * Formatting Signed-off-by: Guy <guyreis96@gmail.com> * Edited Error Event to support localtracks Signed-off-by: Guy <guyreis96@gmail.com> * Fix a Crash on track crash :awesome: Signed-off-by: Guy <guyreis96@gmail.com> * Yikes soo much spam Signed-off-by: Guy <guyreis96@gmail.com> * Remove spam and improve existance check Signed-off-by: Guy <guyreis96@gmail.com> * Repeat and Auto-play are mutually exclusive now Signed-off-by: Guy <guyreis96@gmail.com> * DEBUGS for Preda Signed-off-by: Guy <guyreis96@gmail.com> * Vimeo tracks can be from both these domains "vimeo.com", "beam.pro" Signed-off-by: Guy <guyreis96@gmail.com> * I mean Mixer can be from those 2 domains .... Signed-off-by: Guy <guyreis96@gmail.com> * Fixed `search sc` command Signed-off-by: Guy <guyreis96@gmail.com> * Run everything though lints. rename localtracks module to dataclasses Clear lock on errors Signed-off-by: Draper <guyreis96@gmail.com> * Try to speed up long playlist loading Signed-off-by: Draper <guyreis96@gmail.com> * Im an idiot Signed-off-by: Draper <guyreis96@gmail.com> * Im an idiot Signed-off-by: Draper <guyreis96@gmail.com> * Added logging for writes Signed-off-by: Draper <guyreis96@gmail.com> * Fix crash on cog reload Signed-off-by: Draper <guyreis96@gmail.com> * Fix for runtimewarning ? Signed-off-by: Draper <guyreis96@gmail.com> * Fix for Local Track cache Signed-off-by: Draper <guyreis96@gmail.com> * Remove broken tracks from queue on exception Theoretically do not auto play if track stop reason is Stopped or cleanup Signed-off-by: Draper <guyreis96@gmail.com> * Previous commit was a fluke ... ignore it Signed-off-by: Draper <guyreis96@gmail.com> * Change from cleanup to Replaced Signed-off-by: Draper <guyreis96@gmail.com> * Fixed AttributeError: 'Track' object has no attribute 'info'. [p]skip will only work for autoplay is there a track being played. Fixed Console spam if query saving failed in the background while reloading bot. Autoplay now respect [p]stop command Signed-off-by: Guy <guyreis96@gmail.com> * Black formatting Fix Issue with auto play working when there is songs in the queue Stop notifying queue ended if autoplay is on Signed-off-by: Guy <guyreis96@gmail.com> * Fixed a crash on track load timeout Signed-off-by: Guy <guyreis96@gmail.com> * [p]playlist start will now show the playlist name in embed body Improved Logic for handling broken tracks when repeat is on. Signed-off-by: Draper <guyreis96@gmail.com> * Enqueue tracks as soon as we have the youtube URL .... This basically changes how spotify urls are handled Need to test saving spotify playlist Need to test loading a spotify playlist from file Need to test enqueuing a spotify playlist Signed-off-by: Draper <guyreis96@gmail.com> * Updated a track whrn enqueuing spotify playlist Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Debug Signed-off-by: Draper <guyreis96@gmail.com> * Revert spotify_enqueue changes Signed-off-by: Draper <guyreis96@gmail.com> * Revert spotify_enqueue changes Signed-off-by: Draper <guyreis96@gmail.com> * Allow to set Lavalink jar version from Environment vars Signed-off-by: Draper <guyreis96@gmail.com> * Allow to set Lavalink jar version from Environment vars Signed-off-by: Draper <guyreis96@gmail.com> * Fix for a crash on Equalizer, Merge Spotify_enqueue changes and revert manager changes Signed-off-by: Draper <guyreis96@gmail.com> * Break playlist enqueue after 10 consecutive failures Signed-off-by: Draper <guyreis96@gmail.com> * Auto DC, is not compatible with Auto Play Signed-off-by: Draper <guyreis96@gmail.com> * Make notifier aware of guild its being called for Signed-off-by: Draper <guyreis96@gmail.com> * Type checking Signed-off-by: Draper <guyreis96@gmail.com> * Remove lock from 2 exits that i didn't before Signed-off-by: Draper <guyreis96@gmail.com> * Fixed TypeError: spotify_enqueue() got an unexpected keyword argument 'notify' Signed-off-by: Guy <guyreis96@gmail.com> * Reorder toggles to alphabetical order Signed-off-by: Guy <guyreis96@gmail.com> * Update Query to handle spotify URIs Signed-off-by: Guy <guyreis96@gmail.com> * update database Signed-off-by: Guy <guyreis96@gmail.com> * Dont say tracks enqued on invalid link Make autop lay a mod only setting Signed-off-by: Draper <guyreis96@gmail.com> * Dont say tracks enqued on invalid spotify link Signed-off-by: Draper <guyreis96@gmail.com> * Set default age to 365 days Signed-off-by: Draper <guyreis96@gmail.com> * Allow Audio mods to set auto play playlists. Save playlists songs to cache when migrating Signed-off-by: Guy <guyreis96@gmail.com> * Black formatting Signed-off-by: Guy <guyreis96@gmail.com> * [p]eq cooldown is not triggered is player check fails (i.e if nothing is currently playing) Adding and removing reaction is no longer a blocking action Signed-off-by: Guy <guyreis96@gmail.com> * changelog for non blocking reaction handles Signed-off-by: Guy <guyreis96@gmail.com> * Show auto dc and auto play settings by default Signed-off-by: Guy <guyreis96@gmail.com> * lint is being a bitch Signed-off-by: Guy <guyreis96@gmail.com> * lint changes Signed-off-by: Draper <guyreis96@gmail.com> * stop caching local tracks Signed-off-by: Draper <guyreis96@gmail.com> * List of Lavalink.Tracks natively added to Playlist Objects Signed-off-by: Draper <guyreis96@gmail.com> * Fix UX changes and should fix autoplay Signed-off-by: Draper <guyreis96@gmail.com> * Fixed Skip x number of tracks Signed-off-by: Draper <guyreis96@gmail.com> * Lint changes Signed-off-by: Draper <guyreis96@gmail.com> * Remvoe dead code Signed-off-by: Draper <guyreis96@gmail.com> * Update playlist embed formatting to reflect Preda's suggestions Signed-off-by: Draper <guyreis96@gmail.com> * Update change logs Signed-off-by: Draper <guyreis96@gmail.com> * Add `async with ctx.typing():` to queue and to local folder Signed-off-by: Draper <guyreis96@gmail.com> * Stop queuing now when queue is empty with [p]queue Signed-off-by: Draper <guyreis96@gmail.com> * fix ctx.typing() Signed-off-by: Draper <guyreis96@gmail.com> * fix ctx.typing() Signed-off-by: Draper <guyreis96@gmail.com> * Part 1 Signed-off-by: Draper <guyreis96@gmail.com> * Dont check local track author and name if title is Unknown Signed-off-by: Guy <guyreis96@gmail.com> * Makes auto play more random Signed-off-by: Guy <guyreis96@gmail.com> * Fixes local play Fixed missing format Signed-off-by: Guy <guyreis96@gmail.com> * Query.process_input accept lavalink.Track objects Signed-off-by: Draper <guyreis96@gmail.com> * docstrings Signed-off-by: Draper <guyreis96@gmail.com> * Add TODO for timestamp support Signed-off-by: Draper <guyreis96@gmail.com> * Improve autoplay from cache logic (possibly slightly slower but more efficient overall) Signed-off-by: Draper <guyreis96@gmail.com> * Add My Lavalink PR as a dependency Remember to remove this .... The PR will bump it to 0.3.2 Signed-off-by: Draper <guyreis96@gmail.com> * Add My Lavalink PR as a dependency Remember to remove this .... The PR will bump it to 0.3.2 Signed-off-by: Draper <guyreis96@gmail.com> * Add My Lavalink PR as a dependency Remember to remove this .... The PR will bump it to 0.3.2 Signed-off-by: Draper <guyreis96@gmail.com> * Compile all regex at runtime Signed-off-by: Draper <guyreis96@gmail.com> * Fixes local play Fixed missing format Signed-off-by: Guy <guyreis96@gmail.com> * Revert Dep error Signed-off-by: Guy <guyreis96@gmail.com> * black Signed-off-by: Guy <guyreis96@gmail.com> * Fixed attribute error Signed-off-by: Guy <guyreis96@gmail.com> * add `self.bot.dispatch("audio_disconnect", ctx.guild)` dispatch when the player is disconnected Signed-off-by: Guy <guyreis96@gmail.com> * Removed shuffle lock on skip Signed-off-by: Guy <guyreis96@gmail.com> * Better logic for auto seek (timestamps) Signed-off-by: Guy <guyreis96@gmail.com> * Better logic for auto seek (timestamps) Signed-off-by: Guy <guyreis96@gmail.com> * Fixes timestamps on spotify tracks Signed-off-by: Guy <guyreis96@gmail.com> * Add ctx typing to playlist enqueue Signed-off-by: Guy <guyreis96@gmail.com> * Fix Deps Signed-off-by: Guy <guyreis96@gmail.com> * Black formatting + Using new lavalink methods for shuffling Signed-off-by: Guy <guyreis96@gmail.com> * remove ctx.typing from playlist start Signed-off-by: Guy <guyreis96@gmail.com> * Fixes typerror when enqueuing spotify playlists Signed-off-by: Guy <guyreis96@gmail.com> * Fix keyerror Signed-off-by: Guy <guyreis96@gmail.com> * black formatting, + embed for [p]audioset cache as I forgot it before Signed-off-by: Guy <guyreis96@gmail.com> * Fix Error on playlist upload Signed-off-by: Guy <guyreis96@gmail.com> * Fix Text help for bump Signed-off-by: Guy <guyreis96@gmail.com> * Allow track bumping while shuffle is on Signed-off-by: Guy <guyreis96@gmail.com> * Edit bump embed to be consistent with other embed Hyperlink tracks and removed dynamic title Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * Errors not printing fix? Signed-off-by: Guy <guyreis96@gmail.com> * Errors not printing fix? Signed-off-by: Guy <guyreis96@gmail.com> * Track enqueued footer now shows correct track position when shuffle is on Signed-off-by: Guy <guyreis96@gmail.com> * Update changelogs Signed-off-by: Guy <guyreis96@gmail.com> * Fix is_owner check in audioset settings Signed-off-by: Guy <guyreis96@gmail.com> * Changelogs Signed-off-by: Guy <guyreis96@gmail.com> * Dont store searches with no results in cache, fix malformated playlist to cache upon settings migration Signed-off-by: Guy <guyreis96@gmail.com> * _clear_lock_on_error > Needs to be reviewed to see if it has been done correctly Signed-off-by: Guy <guyreis96@gmail.com> * _clear_lock_on_error > Needs to be reviewed to see if it has been done correctly Signed-off-by: Guy <guyreis96@gmail.com> * Fix Query search so that it works with absolute paths for localtracks Signed-off-by: Guy <guyreis96@gmail.com> * Extra error if lavalink is set to external and the query is a localtrack and nothing is found Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * More detailed error message Signed-off-by: Guy <guyreis96@gmail.com> * [p]seek and [p]skip can be used by user if they are the song requester while DJ mode is enabled, if votes are disabled. , [p]queue shuffle can be used to shuffle the queue manually. and [p]queue clean self can be used to remove all songs you requested from the queue. Signed-off-by: Guy <guyreis96@gmail.com> * black Signed-off-by: Guy <guyreis96@gmail.com> * All the fixes + a `should_auto_play` dispatch for the tech savy peeps Signed-off-by: Guy <guyreis96@gmail.com> * Spellchecker + Pythonic changes Signed-off-by: Guy <guyreis96@gmail.com> * NO spam for logs Signed-off-by: Guy <guyreis96@gmail.com> * Pass Current voice channel to `red_audio_should_auto_play` dispatch Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * playlist upload also updates cache in the background Signed-off-by: Guy <guyreis96@gmail.com> * playlist upload also updates cache in the background Signed-off-by: Guy <guyreis96@gmail.com> * Add scope to playlist picker Signed-off-by: Guy <guyreis96@gmail.com> * Delete Playlist picker message once something is selected Signed-off-by: Guy <guyreis96@gmail.com> * OCD Fix Signed-off-by: Guy <guyreis96@gmail.com> * Facepalm Signed-off-by: Guy <guyreis96@gmail.com> * Fix a Potential crash Signed-off-by: Guy <guyreis96@gmail.com> * Update my stupidity Signed-off-by: Guy <guyreis96@gmail.com> * Auto Pause + Skip tracks already in playlist upon playlist append + a command to remove duplicated tracks from playlist Signed-off-by: Guy <guyreis96@gmail.com> * Fix DJ mode when Role is deleted - Credits go to Neuro Assassin#4779 Fix an issue where auto play MAY not trigger Signed-off-by: Guy <guyreis96@gmail.com> * Change log to Neuro Assassin#4779 fix Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * Dont auto pause manual pauses Signed-off-by: Guy <guyreis96@gmail.com> * Adds `[p]autoplay` that can be run by mods or higher Signed-off-by: Guy <guyreis96@gmail.com> * 🤦 Signed-off-by: Guy <guyreis96@gmail.com> * 2x 🤦 Signed-off-by: Guy <guyreis96@gmail.com> * Fixed wrong import Signed-off-by: Guy <guyreis96@gmail.com> * Added Autoplay notify Signed-off-by: Guy <guyreis96@gmail.com> * Added Autoplay notify Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * Store Track object as prev song instead of URI Signed-off-by: Guy <guyreis96@gmail.com> * Black why do u hate me Signed-off-by: Guy <guyreis96@gmail.com> * Fix command name Signed-off-by: Guy <guyreis96@gmail.com> * Fix Autoplay notify Signed-off-by: Guy <guyreis96@gmail.com> * Fix missing await and TypeError, Thanks Flame Signed-off-by: Guy <guyreis96@gmail.com> * Add a list of tracks to show as a menu Signed-off-by: Guy <guyreis96@gmail.com> * adds the `[p]genre` command which uses the Spotify and Youtube API Signed-off-by: Guy <guyreis96@gmail.com> * Enqueue Playlists from genre command Signed-off-by: Guy <guyreis96@gmail.com> * Pretify `[p]genre` Signed-off-by: Guy <guyreis96@gmail.com> * Fix a Typo and correct jukebox charge order Signed-off-by: Guy <guyreis96@gmail.com> * Add genre command to error handling Signed-off-by: Guy <guyreis96@gmail.com> * Type checking Signed-off-by: Guy <guyreis96@gmail.com> * Update naming scheme for `[p]genre` Signed-off-by: Guy <guyreis96@gmail.com> * Black why do you hate me Signed-off-by: Guy <guyreis96@gmail.com> * Fixed `[p]local start` Playlist picker auto selects if theres just 1 playlist found `[p]queue cleanself` added Signed-off-by: Guy <guyreis96@gmail.com> * *sigh* back compatibility with old localtrack paths Signed-off-by: Guy <guyreis96@gmail.com> * *sigh* back compatibility with old localtrack paths, even more Signed-off-by: Guy <guyreis96@gmail.com> * *sigh* back compatibility with old localtrack paths Even more Signed-off-by: Guy <guyreis96@gmail.com> * Fixes localtracks in playlist info command Signed-off-by: Guy <guyreis96@gmail.com> * Debug Local Strings Signed-off-by: Guy <guyreis96@gmail.com> * Debug Local Strings Signed-off-by: Guy <guyreis96@gmail.com> * Fixes `[p]playlist info` for local tracks + fixed error in `[p]remove` Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * Fixes formatting in `[p]playlist info` Signed-off-by: Guy <guyreis96@gmail.com> * Fix an issue with User Scope playlists were not being deleted Signed-off-by: Guy <guyreis96@gmail.com> * Typechecking Signed-off-by: Guy <guyreis96@gmail.com> * Black Signed-off-by: Guy <guyreis96@gmail.com> * Fix the logic of `delegate_autoplay` Signed-off-by: Guy <guyreis96@gmail.com> * Fix a Crash on Load due to type hinting Signed-off-by: Guy <guyreis96@gmail.com> * Fix a Crash on Load due to type hintingBlack + fix order of `red_audio_should_auto_play` Signed-off-by: Guy <guyreis96@gmail.com> * Add `red_audio_initialized` dispatch so that ownership of auto play can be maintained after a reload Signed-off-by: Guy <guyreis96@gmail.com> * Check if the current owner is loaded before raising an error Signed-off-by: Guy <guyreis96@gmail.com> * Fixes the Existence Check in `delegate_autoplay` Signed-off-by: Guy <guyreis96@gmail.com> * Turns `own_autoplay` in a property of Audio and improves `delegate_autoplay` Thanks Sinbad! Signed-off-by: Guy <guyreis96@gmail.com> * Fix for Localtracks playlists Signed-off-by: Guy <guyreis96@gmail.com> * When disconnecting send `Disconnecting...` Fix Stop after a skip Fix UX discrepancy on Playlist IDs Fixed Exception when theres a track error Signed-off-by: Guy <guyreis96@gmail.com> * add `on_red_audio_unload` dispatch Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix a crash on track start where `player.current` can be none? Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Missing new line Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Allow `--author` for playlist to be used to filter playlist for an specific author. Plus a few bugfixes for UX Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Rename `remdupe` to `dedupe` Make global scope always be referenced as Global add missing backwards quotes around the Playlist ID for 1 string Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Towncrier entries for dep changes Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Remove track index when shuffle is on Fix Progress bar for livestreams Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Trigger autoplay on `QUEUE_END` event instead of `TRACK_END` Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Can't reproduce Ians bug but here a safeguard agaisnt it just in case Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fixes 2 Messages that had the wrong formatting Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * standerdize playlist naming scheme Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix `[p]autoplay` message when Notify is enabled Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * y u h8 me black Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix an issue with `[p]audioset localpath` where the localtracks folder was incorrect Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Pythonic formatting Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Ugh Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix a typo Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Silently try to delete messages + fixes error Ian found with `[p]genre` Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * sigh black Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Add humanize_number usage correctly Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Bump RLL to 0.4.0 Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Update changelog entries Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Change `bot.db` to new API's added by #2967 Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Additional reformatting Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Remove PyCharm noise + Fixes a few Pycharm warnings Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Rework `index` parsing for youtube urls Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Addess Aika's review Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix a potential crash, saves guild ID to playlists to avoid an scheme change in the future Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Add handling for Python installs without sqlite3. Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Address Flame's review Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Fix ma stupidity Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Address Aika's latest review. 1. Update docstring for `[p]playlist rename`. 2. Fix punctuation for playlist matching. 3. `[p]playlist update` now respect playlist management perms 4. Playlist management errors now shows playlist name, id and scope where possible 5. Remove duplicated code and dead code. Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com> * Pluralize string Signed-off-by: guyre <27962761+drapersniper@users.noreply.github.com>
This commit is contained in:
1131
redbot/cogs/audio/apis.py
Normal file
1131
redbot/cogs/audio/apis.py
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
37
redbot/cogs/audio/checks.py
Normal file
37
redbot/cogs/audio/checks.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from redbot.core import Config, commands
|
||||
|
||||
from .apis import HAS_SQL
|
||||
|
||||
_config = None
|
||||
|
||||
|
||||
def _pass_config_to_checks(config: Config):
|
||||
global _config
|
||||
if _config is None:
|
||||
_config = config
|
||||
|
||||
|
||||
def roomlocked():
|
||||
"""Deny the command if the bot has been room locked."""
|
||||
|
||||
async def predicate(ctx: commands.Context):
|
||||
if ctx.guild is None:
|
||||
return False
|
||||
if await ctx.bot.is_mod(member=ctx.author):
|
||||
return True
|
||||
|
||||
room_id = await _config.guild(ctx.guild).room_lock()
|
||||
if room_id is None or ctx.channel.id == room_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
return commands.check(predicate)
|
||||
|
||||
|
||||
def can_have_caching():
|
||||
"""Check to disable Caching commands if SQLite is not avaliable."""
|
||||
|
||||
async def predicate(ctx: commands.Context):
|
||||
return HAS_SQL
|
||||
|
||||
return commands.check(predicate)
|
||||
495
redbot/cogs/audio/converters.py
Normal file
495
redbot/cogs/audio/converters.py
Normal file
@@ -0,0 +1,495 @@
|
||||
import argparse
|
||||
import functools
|
||||
from typing import Optional, Tuple, Union
|
||||
|
||||
import discord
|
||||
|
||||
from redbot.core import Config, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.i18n import Translator
|
||||
|
||||
from .playlists import PlaylistScope, standardize_scope
|
||||
|
||||
_ = Translator("Audio", __file__)
|
||||
|
||||
__all__ = [
|
||||
"ComplexScopeParser",
|
||||
"PlaylistConverter",
|
||||
"ScopeParser",
|
||||
"LazyGreedyConverter",
|
||||
"standardize_scope",
|
||||
"get_lazy_converter",
|
||||
"get_playlist_converter",
|
||||
]
|
||||
|
||||
_config = None
|
||||
_bot = None
|
||||
|
||||
_SCOPE_HELP = """
|
||||
Scope must be a valid version of one of the following:
|
||||
Global
|
||||
Guild
|
||||
User
|
||||
"""
|
||||
_USER_HELP = """
|
||||
Author must be a valid version of one of the following:
|
||||
User ID
|
||||
User Mention
|
||||
User Name#123
|
||||
"""
|
||||
_GUILD_HELP = """
|
||||
Guild must be a valid version of one of the following:
|
||||
Guild ID
|
||||
Exact guild name
|
||||
"""
|
||||
|
||||
|
||||
def _pass_config_to_converters(config: Config, bot: Red):
|
||||
global _config, _bot
|
||||
if _config is None:
|
||||
_config = config
|
||||
if _bot is None:
|
||||
_bot = bot
|
||||
|
||||
|
||||
class PlaylistConverter(commands.Converter):
|
||||
async def convert(self, ctx: commands.Context, arg: str) -> dict:
|
||||
global_scope = await _config.custom(PlaylistScope.GLOBAL.value).all()
|
||||
guild_scope = await _config.custom(PlaylistScope.GUILD.value).all()
|
||||
user_scope = await _config.custom(PlaylistScope.USER.value).all()
|
||||
user_matches = [
|
||||
(uid, pid, pdata)
|
||||
for uid, data in user_scope.items()
|
||||
for pid, pdata in data.items()
|
||||
if arg == pid or arg.lower() in pdata.get("name", "").lower()
|
||||
]
|
||||
guild_matches = [
|
||||
(gid, pid, pdata)
|
||||
for gid, data in guild_scope.items()
|
||||
for pid, pdata in data.items()
|
||||
if arg == pid or arg.lower() in pdata.get("name", "").lower()
|
||||
]
|
||||
global_matches = [
|
||||
(None, pid, pdata)
|
||||
for pid, pdata in global_scope.items()
|
||||
if arg == pid or arg.lower() in pdata.get("name", "").lower()
|
||||
]
|
||||
if not user_matches and not guild_matches and not global_matches:
|
||||
raise commands.BadArgument(_("Could not match '{}' to a playlist.").format(arg))
|
||||
|
||||
return {
|
||||
PlaylistScope.GLOBAL.value: global_matches,
|
||||
PlaylistScope.GUILD.value: guild_matches,
|
||||
PlaylistScope.USER.value: user_matches,
|
||||
"arg": arg,
|
||||
}
|
||||
|
||||
|
||||
class NoExitParser(argparse.ArgumentParser):
|
||||
def error(self, message):
|
||||
raise commands.BadArgument()
|
||||
|
||||
|
||||
class ScopeParser(commands.Converter):
|
||||
async def convert(
|
||||
self, ctx: commands.Context, argument: str
|
||||
) -> Tuple[str, discord.User, Optional[discord.Guild], bool]:
|
||||
target_scope: str = PlaylistScope.GUILD.value
|
||||
target_user: Optional[Union[discord.Member, discord.User]] = ctx.author
|
||||
target_guild: Optional[discord.Guild] = ctx.guild
|
||||
specified_user = False
|
||||
|
||||
argument = argument.replace("—", "--")
|
||||
|
||||
command, *arguments = argument.split(" -- ")
|
||||
if arguments:
|
||||
argument = " -- ".join(arguments)
|
||||
else:
|
||||
command = None
|
||||
|
||||
parser = NoExitParser(description="Playlist Scope Parsing.", add_help=False)
|
||||
|
||||
parser.add_argument("--scope", nargs="*", dest="scope", default=[])
|
||||
parser.add_argument("--guild", nargs="*", dest="guild", default=[])
|
||||
parser.add_argument("--server", nargs="*", dest="guild", default=[])
|
||||
parser.add_argument("--author", nargs="*", dest="author", default=[])
|
||||
parser.add_argument("--user", nargs="*", dest="author", default=[])
|
||||
parser.add_argument("--member", nargs="*", dest="author", default=[])
|
||||
|
||||
if not command:
|
||||
parser.add_argument("command", nargs="*")
|
||||
|
||||
try:
|
||||
vals = vars(parser.parse_args(argument.split()))
|
||||
except Exception as exc:
|
||||
raise commands.BadArgument() from exc
|
||||
|
||||
if vals["scope"]:
|
||||
scope_raw = " ".join(vals["scope"]).strip()
|
||||
scope = scope_raw.upper().strip()
|
||||
valid_scopes = PlaylistScope.list() + [
|
||||
"GLOBAL",
|
||||
"GUILD",
|
||||
"AUTHOR",
|
||||
"USER",
|
||||
"SERVER",
|
||||
"MEMBER",
|
||||
"BOT",
|
||||
]
|
||||
if scope not in valid_scopes:
|
||||
raise commands.ArgParserFailure("--scope", scope_raw, custom_help=_SCOPE_HELP)
|
||||
target_scope = standardize_scope(scope)
|
||||
elif "--scope" in argument and not vals["scope"]:
|
||||
raise commands.ArgParserFailure("--scope", "Nothing", custom_help=_SCOPE_HELP)
|
||||
|
||||
is_owner = await ctx.bot.is_owner(ctx.author)
|
||||
guild = vals.get("guild", None) or vals.get("server", None)
|
||||
if is_owner and guild:
|
||||
target_guild = None
|
||||
guild_raw = " ".join(guild).strip()
|
||||
if guild_raw.isnumeric():
|
||||
guild_raw = int(guild_raw)
|
||||
try:
|
||||
target_guild = ctx.bot.get_guild(guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
guild_raw = str(guild_raw)
|
||||
if target_guild is None:
|
||||
try:
|
||||
target_guild = await commands.GuildConverter.convert(ctx, guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
if target_guild is None:
|
||||
try:
|
||||
target_guild = await ctx.bot.fetch_guild(guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
if target_guild is None:
|
||||
raise commands.ArgParserFailure("--guild", guild_raw, custom_help=_GUILD_HELP)
|
||||
elif not is_owner and (guild or any(x in argument for x in ["--guild", "--server"])):
|
||||
raise commands.BadArgument("You cannot use `--guild`")
|
||||
elif any(x in argument for x in ["--guild", "--server"]):
|
||||
raise commands.ArgParserFailure("--guild", "Nothing", custom_help=_GUILD_HELP)
|
||||
|
||||
author = vals.get("author", None) or vals.get("user", None) or vals.get("member", None)
|
||||
if author:
|
||||
target_user = None
|
||||
user_raw = " ".join(author).strip()
|
||||
if user_raw.isnumeric():
|
||||
user_raw = int(user_raw)
|
||||
try:
|
||||
target_user = ctx.bot.get_user(user_raw)
|
||||
except Exception:
|
||||
target_user = None
|
||||
user_raw = str(user_raw)
|
||||
if target_user is None:
|
||||
member_converter = commands.MemberConverter()
|
||||
user_converter = commands.UserConverter()
|
||||
try:
|
||||
target_user = await member_converter.convert(ctx, user_raw)
|
||||
except Exception:
|
||||
try:
|
||||
target_user = await user_converter.convert(ctx, user_raw)
|
||||
except Exception:
|
||||
target_user = None
|
||||
if target_user is None:
|
||||
try:
|
||||
target_user = await ctx.bot.fetch_user(user_raw)
|
||||
except Exception:
|
||||
target_user = None
|
||||
if target_user is None:
|
||||
raise commands.ArgParserFailure("--author", user_raw, custom_help=_USER_HELP)
|
||||
else:
|
||||
specified_user = True
|
||||
elif any(x in argument for x in ["--author", "--user", "--member"]):
|
||||
raise commands.ArgParserFailure("--scope", "Nothing", custom_help=_USER_HELP)
|
||||
|
||||
return target_scope, target_user, target_guild, specified_user
|
||||
|
||||
|
||||
class ComplexScopeParser(commands.Converter):
|
||||
async def convert(
|
||||
self, ctx: commands.Context, argument: str
|
||||
) -> Tuple[
|
||||
str,
|
||||
discord.User,
|
||||
Optional[discord.Guild],
|
||||
bool,
|
||||
str,
|
||||
discord.User,
|
||||
Optional[discord.Guild],
|
||||
bool,
|
||||
]:
|
||||
|
||||
target_scope: str = PlaylistScope.GUILD.value
|
||||
target_user: Optional[Union[discord.Member, discord.User]] = ctx.author
|
||||
target_guild: Optional[discord.Guild] = ctx.guild
|
||||
specified_target_user = False
|
||||
|
||||
source_scope: str = PlaylistScope.GUILD.value
|
||||
source_user: Optional[Union[discord.Member, discord.User]] = ctx.author
|
||||
source_guild: Optional[discord.Guild] = ctx.guild
|
||||
specified_source_user = False
|
||||
|
||||
argument = argument.replace("—", "--")
|
||||
|
||||
command, *arguments = argument.split(" -- ")
|
||||
if arguments:
|
||||
argument = " -- ".join(arguments)
|
||||
else:
|
||||
command = None
|
||||
|
||||
parser = NoExitParser(description="Playlist Scope Parsing.", add_help=False)
|
||||
|
||||
parser.add_argument("--to-scope", nargs="*", dest="to_scope", default=[])
|
||||
parser.add_argument("--to-guild", nargs="*", dest="to_guild", default=[])
|
||||
parser.add_argument("--to-server", nargs="*", dest="to_server", default=[])
|
||||
parser.add_argument("--to-author", nargs="*", dest="to_author", default=[])
|
||||
parser.add_argument("--to-user", nargs="*", dest="to_user", default=[])
|
||||
parser.add_argument("--to-member", nargs="*", dest="to_member", default=[])
|
||||
|
||||
parser.add_argument("--from-scope", nargs="*", dest="from_scope", default=[])
|
||||
parser.add_argument("--from-guild", nargs="*", dest="from_guild", default=[])
|
||||
parser.add_argument("--from-server", nargs="*", dest="from_server", default=[])
|
||||
parser.add_argument("--from-author", nargs="*", dest="from_author", default=[])
|
||||
parser.add_argument("--from-user", nargs="*", dest="from_user", default=[])
|
||||
parser.add_argument("--from-member", nargs="*", dest="from_member", default=[])
|
||||
|
||||
if not command:
|
||||
parser.add_argument("command", nargs="*")
|
||||
|
||||
try:
|
||||
vals = vars(parser.parse_args(argument.split()))
|
||||
except Exception as exc:
|
||||
raise commands.BadArgument() from exc
|
||||
|
||||
is_owner = await ctx.bot.is_owner(ctx.author)
|
||||
valid_scopes = PlaylistScope.list() + [
|
||||
"GLOBAL",
|
||||
"GUILD",
|
||||
"AUTHOR",
|
||||
"USER",
|
||||
"SERVER",
|
||||
"MEMBER",
|
||||
"BOT",
|
||||
]
|
||||
|
||||
if vals["to_scope"]:
|
||||
to_scope_raw = " ".join(vals["to_scope"]).strip()
|
||||
to_scope = to_scope_raw.upper().strip()
|
||||
if to_scope not in valid_scopes:
|
||||
raise commands.ArgParserFailure(
|
||||
"--to-scope", to_scope_raw, custom_help=_SCOPE_HELP
|
||||
)
|
||||
target_scope = standardize_scope(to_scope)
|
||||
elif "--to-scope" in argument and not vals["to_scope"]:
|
||||
raise commands.ArgParserFailure("--to-scope", "Nothing", custom_help=_SCOPE_HELP)
|
||||
|
||||
if vals["from_scope"]:
|
||||
from_scope_raw = " ".join(vals["from_scope"]).strip()
|
||||
from_scope = from_scope_raw.upper().strip()
|
||||
|
||||
if from_scope not in valid_scopes:
|
||||
raise commands.ArgParserFailure(
|
||||
"--from-scope", from_scope_raw, custom_help=_SCOPE_HELP
|
||||
)
|
||||
source_scope = standardize_scope(from_scope)
|
||||
elif "--from-scope" in argument and not vals["to_scope"]:
|
||||
raise commands.ArgParserFailure("--to-scope", "Nothing", custom_help=_SCOPE_HELP)
|
||||
|
||||
to_guild = vals.get("to_guild", None) or vals.get("to_server", None)
|
||||
if is_owner and to_guild:
|
||||
target_guild = None
|
||||
to_guild_raw = " ".join(to_guild).strip()
|
||||
if to_guild_raw.isnumeric():
|
||||
to_guild_raw = int(to_guild_raw)
|
||||
try:
|
||||
target_guild = ctx.bot.get_guild(to_guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
to_guild_raw = str(to_guild_raw)
|
||||
if target_guild is None:
|
||||
try:
|
||||
target_guild = await commands.GuildConverter.convert(ctx, to_guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
if target_guild is None:
|
||||
try:
|
||||
target_guild = await ctx.bot.fetch_guild(to_guild_raw)
|
||||
except Exception:
|
||||
target_guild = None
|
||||
if target_guild is None:
|
||||
raise commands.ArgParserFailure(
|
||||
"--to-guild", to_guild_raw, custom_help=_GUILD_HELP
|
||||
)
|
||||
elif not is_owner and (
|
||||
to_guild or any(x in argument for x in ["--to-guild", "--to-server"])
|
||||
):
|
||||
raise commands.BadArgument("You cannot use `--to-server`")
|
||||
elif any(x in argument for x in ["--to-guild", "--to-server"]):
|
||||
raise commands.ArgParserFailure("--to-server", "Nothing", custom_help=_GUILD_HELP)
|
||||
|
||||
from_guild = vals.get("from_guild", None) or vals.get("from_server", None)
|
||||
if is_owner and from_guild:
|
||||
source_guild = None
|
||||
from_guild_raw = " ".join(from_guild).strip()
|
||||
if from_guild_raw.isnumeric():
|
||||
from_guild_raw = int(from_guild_raw)
|
||||
try:
|
||||
source_guild = ctx.bot.get_guild(from_guild_raw)
|
||||
except Exception:
|
||||
source_guild = None
|
||||
from_guild_raw = str(from_guild_raw)
|
||||
if source_guild is None:
|
||||
try:
|
||||
source_guild = await commands.GuildConverter.convert(ctx, from_guild_raw)
|
||||
except Exception:
|
||||
source_guild = None
|
||||
if source_guild is None:
|
||||
try:
|
||||
source_guild = await ctx.bot.fetch_guild(from_guild_raw)
|
||||
except Exception:
|
||||
source_guild = None
|
||||
if source_guild is None:
|
||||
raise commands.ArgParserFailure(
|
||||
"--from-guild", from_guild_raw, custom_help=_GUILD_HELP
|
||||
)
|
||||
elif not is_owner and (
|
||||
from_guild or any(x in argument for x in ["--from-guild", "--from-server"])
|
||||
):
|
||||
raise commands.BadArgument("You cannot use `--from-server`")
|
||||
elif any(x in argument for x in ["--from-guild", "--from-server"]):
|
||||
raise commands.ArgParserFailure("--from-server", "Nothing", custom_help=_GUILD_HELP)
|
||||
|
||||
to_author = (
|
||||
vals.get("to_author", None) or vals.get("to_user", None) or vals.get("to_member", None)
|
||||
)
|
||||
if to_author:
|
||||
target_user = None
|
||||
to_user_raw = " ".join(to_author).strip()
|
||||
if to_user_raw.isnumeric():
|
||||
to_user_raw = int(to_user_raw)
|
||||
try:
|
||||
source_user = ctx.bot.get_user(to_user_raw)
|
||||
except Exception:
|
||||
source_user = None
|
||||
to_user_raw = str(to_user_raw)
|
||||
if target_user is None:
|
||||
member_converter = commands.MemberConverter()
|
||||
user_converter = commands.UserConverter()
|
||||
try:
|
||||
target_user = await member_converter.convert(ctx, to_user_raw)
|
||||
except Exception:
|
||||
try:
|
||||
target_user = await user_converter.convert(ctx, to_user_raw)
|
||||
except Exception:
|
||||
target_user = None
|
||||
if target_user is None:
|
||||
try:
|
||||
target_user = await ctx.bot.fetch_user(to_user_raw)
|
||||
except Exception:
|
||||
target_user = None
|
||||
if target_user is None:
|
||||
raise commands.ArgParserFailure("--to-author", to_user_raw, custom_help=_USER_HELP)
|
||||
else:
|
||||
specified_target_user = True
|
||||
elif any(x in argument for x in ["--to-author", "--to-user", "--to-member"]):
|
||||
raise commands.ArgParserFailure("--to-user", "Nothing", custom_help=_USER_HELP)
|
||||
|
||||
from_author = (
|
||||
vals.get("from_author", None)
|
||||
or vals.get("from_user", None)
|
||||
or vals.get("from_member", None)
|
||||
)
|
||||
if from_author:
|
||||
source_user = None
|
||||
from_user_raw = " ".join(from_author).strip()
|
||||
if from_user_raw.isnumeric():
|
||||
from_user_raw = int(from_user_raw)
|
||||
try:
|
||||
target_user = ctx.bot.get_user(from_user_raw)
|
||||
except Exception:
|
||||
source_user = None
|
||||
from_user_raw = str(from_user_raw)
|
||||
if source_user is None:
|
||||
member_converter = commands.MemberConverter()
|
||||
user_converter = commands.UserConverter()
|
||||
try:
|
||||
source_user = await member_converter.convert(ctx, from_user_raw)
|
||||
except Exception:
|
||||
try:
|
||||
source_user = await user_converter.convert(ctx, from_user_raw)
|
||||
except Exception:
|
||||
source_user = None
|
||||
if source_user is None:
|
||||
try:
|
||||
source_user = await ctx.bot.fetch_user(from_user_raw)
|
||||
except Exception:
|
||||
source_user = None
|
||||
if source_user is None:
|
||||
raise commands.ArgParserFailure(
|
||||
"--from-author", from_user_raw, custom_help=_USER_HELP
|
||||
)
|
||||
else:
|
||||
specified_source_user = True
|
||||
elif any(x in argument for x in ["--from-author", "--from-user", "--from-member"]):
|
||||
raise commands.ArgParserFailure("--from-user", "Nothing", custom_help=_USER_HELP)
|
||||
|
||||
return (
|
||||
source_scope,
|
||||
source_user,
|
||||
source_guild,
|
||||
specified_source_user,
|
||||
target_scope,
|
||||
target_user,
|
||||
target_guild,
|
||||
specified_target_user,
|
||||
)
|
||||
|
||||
|
||||
class LazyGreedyConverter(commands.Converter):
|
||||
def __init__(self, splitter: str):
|
||||
self.splitter_Value = splitter
|
||||
|
||||
async def convert(self, ctx: commands.Context, argument: str) -> str:
|
||||
full_message = ctx.message.content.partition(f" {argument} ")
|
||||
if len(full_message) == 1:
|
||||
full_message = (
|
||||
(argument if argument not in full_message else "") + " " + full_message[0]
|
||||
)
|
||||
elif len(full_message) > 1:
|
||||
full_message = (
|
||||
(argument if argument not in full_message else "") + " " + full_message[-1]
|
||||
)
|
||||
greedy_output = (" " + full_message.replace("—", "--")).partition(
|
||||
f" {self.splitter_Value}"
|
||||
)[0]
|
||||
return f"{greedy_output}".strip()
|
||||
|
||||
|
||||
def get_lazy_converter(splitter: str) -> type:
|
||||
"""
|
||||
Returns a typechecking safe `LazyGreedyConverter` suitable for use with discord.py.
|
||||
"""
|
||||
|
||||
class PartialMeta(type(LazyGreedyConverter)):
|
||||
__call__ = functools.partialmethod(type(LazyGreedyConverter).__call__, splitter)
|
||||
|
||||
class ValidatedConverter(LazyGreedyConverter, metaclass=PartialMeta):
|
||||
pass
|
||||
|
||||
return ValidatedConverter
|
||||
|
||||
|
||||
def get_playlist_converter() -> type:
|
||||
"""
|
||||
Returns a typechecking safe `PlaylistConverter` suitable for use with discord.py.
|
||||
"""
|
||||
|
||||
class PartialMeta(type(PlaylistConverter)):
|
||||
__call__ = functools.partialmethod(type(PlaylistConverter).__call__)
|
||||
|
||||
class ValidatedConverter(PlaylistConverter, metaclass=PartialMeta):
|
||||
pass
|
||||
|
||||
return ValidatedConverter
|
||||
486
redbot/cogs/audio/dataclasses.py
Normal file
486
redbot/cogs/audio/dataclasses.py
Normal file
@@ -0,0 +1,486 @@
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path, PosixPath, WindowsPath
|
||||
from typing import List, Optional, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import lavalink
|
||||
|
||||
from redbot.core import Config
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.i18n import Translator
|
||||
|
||||
_config: Optional[Config] = None
|
||||
_bot: Optional[Red] = None
|
||||
_localtrack_folder: Optional[str] = None
|
||||
_ = Translator("Audio", __file__)
|
||||
_remove_start = re.compile(r"^(sc|list) ")
|
||||
_re_youtube_timestamp = re.compile(r"&t=(\d+)s?")
|
||||
_re_youtube_index = re.compile(r"&index=(\d+)")
|
||||
_re_spotify_url = re.compile(r"(http[s]?://)?(open.spotify.com)/")
|
||||
_re_spotify_timestamp = re.compile(r"#(\d+):(\d+)")
|
||||
_re_soundcloud_timestamp = re.compile(r"#t=(\d+):(\d+)s?")
|
||||
_re_twitch_timestamp = re.compile(r"\?t=(\d+)h(\d+)m(\d+)s")
|
||||
|
||||
|
||||
def _pass_config_to_dataclasses(config: Config, bot: Red, folder: str):
|
||||
global _config, _bot, _localtrack_folder
|
||||
if _config is None:
|
||||
_config = config
|
||||
if _bot is None:
|
||||
_bot = bot
|
||||
_localtrack_folder = folder
|
||||
|
||||
|
||||
class ChdirClean(object):
|
||||
def __init__(self, directory):
|
||||
self.old_dir = os.getcwd()
|
||||
self.new_dir = directory
|
||||
self.cwd = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, _type, value, traceback):
|
||||
self.chdir_out()
|
||||
return isinstance(value, OSError)
|
||||
|
||||
def chdir_in(self):
|
||||
self.cwd = Path(self.new_dir)
|
||||
os.chdir(self.new_dir)
|
||||
|
||||
def chdir_out(self):
|
||||
self.cwd = Path(self.old_dir)
|
||||
os.chdir(self.old_dir)
|
||||
|
||||
|
||||
class LocalPath(ChdirClean):
|
||||
"""
|
||||
Local tracks class.
|
||||
Used to handle system dir trees in a cross system manner.
|
||||
The only use of this class is for `localtracks`.
|
||||
"""
|
||||
|
||||
_supported_music_ext = (".mp3", ".flac", ".ogg")
|
||||
|
||||
def __init__(self, path, **kwargs):
|
||||
self._path = path
|
||||
if isinstance(path, (Path, WindowsPath, PosixPath, LocalPath)):
|
||||
path = str(path.absolute())
|
||||
elif path is not None:
|
||||
path = str(path)
|
||||
|
||||
self.cwd = Path.cwd()
|
||||
_lt_folder = Path(_localtrack_folder) if _localtrack_folder else self.cwd
|
||||
_path = Path(path) if path else self.cwd
|
||||
|
||||
if _lt_folder.parts[-1].lower() == "localtracks" and not kwargs.get("forced"):
|
||||
self.localtrack_folder = _lt_folder
|
||||
elif kwargs.get("forced"):
|
||||
if _path.parts[-1].lower() == "localtracks":
|
||||
self.localtrack_folder = _path
|
||||
else:
|
||||
self.localtrack_folder = _path / "localtracks"
|
||||
else:
|
||||
self.localtrack_folder = _lt_folder / "localtracks"
|
||||
|
||||
try:
|
||||
_path = Path(path)
|
||||
_path.relative_to(self.localtrack_folder)
|
||||
self.path = _path
|
||||
except (ValueError, TypeError):
|
||||
if path and path.startswith("localtracks//"):
|
||||
path = path.replace("localtracks//", "", 1)
|
||||
elif path and path.startswith("localtracks/"):
|
||||
path = path.replace("localtracks/", "", 1)
|
||||
self.path = self.localtrack_folder.joinpath(path) if path else self.localtrack_folder
|
||||
|
||||
try:
|
||||
if self.path.is_file():
|
||||
parent = self.path.parent
|
||||
else:
|
||||
parent = self.path
|
||||
super().__init__(str(parent.absolute()))
|
||||
|
||||
self.parent = Path(parent)
|
||||
except OSError:
|
||||
self.parent = None
|
||||
|
||||
self.cwd = Path.cwd()
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return str(self.path.name)
|
||||
|
||||
def is_dir(self):
|
||||
try:
|
||||
return self.path.is_dir()
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def exists(self):
|
||||
try:
|
||||
return self.path.exists()
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def is_file(self):
|
||||
try:
|
||||
return self.path.is_file()
|
||||
except OSError:
|
||||
return False
|
||||
|
||||
def absolute(self):
|
||||
try:
|
||||
return self.path.absolute()
|
||||
except OSError:
|
||||
return self._path
|
||||
|
||||
@classmethod
|
||||
def joinpath(cls, *args):
|
||||
modified = cls(None)
|
||||
modified.path = modified.path.joinpath(*args)
|
||||
return modified
|
||||
|
||||
def multiglob(self, *patterns):
|
||||
paths = []
|
||||
for p in patterns:
|
||||
paths.extend(list(self.path.glob(p)))
|
||||
for p in self._filtered(paths):
|
||||
yield p
|
||||
|
||||
def multirglob(self, *patterns):
|
||||
paths = []
|
||||
for p in patterns:
|
||||
paths.extend(list(self.path.rglob(p)))
|
||||
|
||||
for p in self._filtered(paths):
|
||||
yield p
|
||||
|
||||
def _filtered(self, paths: List[Path]):
|
||||
for p in paths:
|
||||
if p.suffix in self._supported_music_ext:
|
||||
yield p
|
||||
|
||||
def __str__(self):
|
||||
return str(self.path.absolute())
|
||||
|
||||
def to_string(self):
|
||||
try:
|
||||
return str(self.path.absolute())
|
||||
except OSError:
|
||||
return str(self._path)
|
||||
|
||||
def to_string_hidden(self, arg: str = None):
|
||||
string = str(self.absolute()).replace(
|
||||
(str(self.localtrack_folder.absolute()) + os.sep) if arg is None else arg, ""
|
||||
)
|
||||
chunked = False
|
||||
while len(string) > 145 and os.sep in string:
|
||||
string = string.split(os.sep, 1)[-1]
|
||||
chunked = True
|
||||
|
||||
if chunked:
|
||||
string = f"...{os.sep}{string}"
|
||||
return string
|
||||
|
||||
def tracks_in_tree(self):
|
||||
tracks = []
|
||||
for track in self.multirglob(*[f"*{ext}" for ext in self._supported_music_ext]):
|
||||
if track.exists() and track.is_file() and track.parent != self.localtrack_folder:
|
||||
tracks.append(Query.process_input(LocalPath(str(track.absolute()))))
|
||||
return tracks
|
||||
|
||||
def subfolders_in_tree(self):
|
||||
files = list(self.multirglob(*[f"*{ext}" for ext in self._supported_music_ext]))
|
||||
folders = []
|
||||
for f in files:
|
||||
if f.exists() and f.parent not in folders and f.parent != self.localtrack_folder:
|
||||
folders.append(f.parent)
|
||||
return_folders = []
|
||||
for folder in folders:
|
||||
if folder.exists() and folder.is_dir():
|
||||
return_folders.append(LocalPath(str(folder.absolute())))
|
||||
return return_folders
|
||||
|
||||
def tracks_in_folder(self):
|
||||
tracks = []
|
||||
for track in self.multiglob(*[f"*{ext}" for ext in self._supported_music_ext]):
|
||||
if track.exists() and track.is_file() and track.parent != self.localtrack_folder:
|
||||
tracks.append(Query.process_input(LocalPath(str(track.absolute()))))
|
||||
return tracks
|
||||
|
||||
def subfolders(self):
|
||||
files = list(self.multiglob(*[f"*{ext}" for ext in self._supported_music_ext]))
|
||||
folders = []
|
||||
for f in files:
|
||||
if f.exists() and f.parent not in folders and f.parent != self.localtrack_folder:
|
||||
folders.append(f.parent)
|
||||
return_folders = []
|
||||
for folder in folders:
|
||||
if folder.exists() and folder.is_dir():
|
||||
return_folders.append(LocalPath(str(folder.absolute())))
|
||||
return return_folders
|
||||
|
||||
|
||||
class Query:
|
||||
"""
|
||||
Query data class.
|
||||
Use: Query.process_input(query) to generate the Query object.
|
||||
"""
|
||||
|
||||
def __init__(self, query: Union[LocalPath, str], **kwargs):
|
||||
query = kwargs.get("queryforced", query)
|
||||
self._raw: Union[LocalPath, str] = query
|
||||
|
||||
_localtrack: LocalPath = LocalPath(query)
|
||||
|
||||
self.track: Union[LocalPath, str] = _localtrack if (
|
||||
(_localtrack.is_file() or _localtrack.is_dir()) and _localtrack.exists()
|
||||
) else query
|
||||
|
||||
self.valid: bool = query != "InvalidQueryPlaceHolderName"
|
||||
self.is_local: bool = kwargs.get("local", False)
|
||||
self.is_spotify: bool = kwargs.get("spotify", False)
|
||||
self.is_youtube: bool = kwargs.get("youtube", False)
|
||||
self.is_soundcloud: bool = kwargs.get("soundcloud", False)
|
||||
self.is_bandcamp: bool = kwargs.get("bandcamp", False)
|
||||
self.is_vimeo: bool = kwargs.get("vimeo", False)
|
||||
self.is_mixer: bool = kwargs.get("mixer", False)
|
||||
self.is_twitch: bool = kwargs.get("twitch", False)
|
||||
self.is_other: bool = kwargs.get("other", False)
|
||||
self.is_playlist: bool = kwargs.get("playlist", False)
|
||||
self.is_album: bool = kwargs.get("album", False)
|
||||
self.is_search: bool = kwargs.get("search", False)
|
||||
self.is_stream: bool = kwargs.get("stream", False)
|
||||
self.single_track: bool = kwargs.get("single", False)
|
||||
self.id: Optional[str] = kwargs.get("id", None)
|
||||
self.invoked_from: Optional[str] = kwargs.get("invoked_from", None)
|
||||
self.local_name: Optional[str] = kwargs.get("name", None)
|
||||
self.search_subfolders: bool = kwargs.get("search_subfolders", False)
|
||||
self.spotify_uri: Optional[str] = kwargs.get("uri", None)
|
||||
|
||||
self.start_time: int = kwargs.get("start_time", 0)
|
||||
self.track_index: Optional[int] = kwargs.get("track_index", None)
|
||||
|
||||
if self.invoked_from == "sc search":
|
||||
self.is_youtube = False
|
||||
self.is_soundcloud = True
|
||||
|
||||
self.lavalink_query: str = self._get_query()
|
||||
|
||||
if self.is_playlist or self.is_album:
|
||||
self.single_track = False
|
||||
|
||||
def __str__(self):
|
||||
return str(self.lavalink_query)
|
||||
|
||||
@classmethod
|
||||
def process_input(cls, query: Union[LocalPath, lavalink.Track, "Query", str], **kwargs):
|
||||
"""
|
||||
A replacement for :code:`lavalink.Player.load_tracks`.
|
||||
This will try to get a valid cached entry first if not found or if in valid
|
||||
it will then call the lavalink API.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
query : Union[Query, LocalPath, lavalink.Track, str]
|
||||
The query string or LocalPath object.
|
||||
Returns
|
||||
-------
|
||||
Query
|
||||
Returns a parsed Query object.
|
||||
"""
|
||||
if not query:
|
||||
query = "InvalidQueryPlaceHolderName"
|
||||
possible_values = dict()
|
||||
|
||||
if isinstance(query, str):
|
||||
query = query.strip("<>")
|
||||
|
||||
elif isinstance(query, Query):
|
||||
for key, val in kwargs.items():
|
||||
setattr(query, key, val)
|
||||
return query
|
||||
elif isinstance(query, lavalink.Track):
|
||||
possible_values["stream"] = query.is_stream
|
||||
query = query.uri
|
||||
|
||||
possible_values.update(dict(**kwargs))
|
||||
possible_values.update(cls._parse(query, **kwargs))
|
||||
return cls(query, **possible_values)
|
||||
|
||||
@staticmethod
|
||||
def _parse(track, **kwargs):
|
||||
returning = {}
|
||||
if (
|
||||
type(track) == type(LocalPath)
|
||||
and (track.is_file() or track.is_dir())
|
||||
and track.exists()
|
||||
):
|
||||
returning["local"] = True
|
||||
returning["name"] = track.name
|
||||
if track.is_file():
|
||||
returning["single"] = True
|
||||
elif track.is_dir():
|
||||
returning["album"] = True
|
||||
else:
|
||||
track = str(track)
|
||||
if track.startswith("spotify:"):
|
||||
returning["spotify"] = True
|
||||
if ":playlist:" in track:
|
||||
returning["playlist"] = True
|
||||
elif ":album:" in track:
|
||||
returning["album"] = True
|
||||
elif ":track:" in track:
|
||||
returning["single"] = True
|
||||
_id = track.split(":", 2)[-1]
|
||||
_id = _id.split("?")[0]
|
||||
returning["id"] = _id
|
||||
if "#" in _id:
|
||||
match = re.search(_re_spotify_timestamp, track)
|
||||
if match:
|
||||
returning["start_time"] = (int(match.group(1)) * 60) + int(match.group(2))
|
||||
returning["uri"] = track
|
||||
return returning
|
||||
if track.startswith("sc ") or track.startswith("list "):
|
||||
if track.startswith("sc "):
|
||||
returning["invoked_from"] = "sc search"
|
||||
returning["soundcloud"] = True
|
||||
elif track.startswith("list "):
|
||||
returning["invoked_from"] = "search list"
|
||||
track = _remove_start.sub("", track, 1)
|
||||
returning["queryforced"] = track
|
||||
|
||||
_localtrack = LocalPath(track)
|
||||
if _localtrack.exists():
|
||||
if _localtrack.is_file():
|
||||
returning["local"] = True
|
||||
returning["single"] = True
|
||||
returning["name"] = _localtrack.name
|
||||
return returning
|
||||
elif _localtrack.is_dir():
|
||||
returning["album"] = True
|
||||
returning["local"] = True
|
||||
returning["name"] = _localtrack.name
|
||||
return returning
|
||||
try:
|
||||
query_url = urlparse(track)
|
||||
if all([query_url.scheme, query_url.netloc, query_url.path]):
|
||||
url_domain = ".".join(query_url.netloc.split(".")[-2:])
|
||||
if not query_url.netloc:
|
||||
url_domain = ".".join(query_url.path.split("/")[0].split(".")[-2:])
|
||||
if url_domain in ["youtube.com", "youtu.be"]:
|
||||
returning["youtube"] = True
|
||||
_has_index = "&index=" in track
|
||||
if "&t=" in track:
|
||||
match = re.search(_re_youtube_timestamp, track)
|
||||
if match:
|
||||
returning["start_time"] = int(match.group(1))
|
||||
if _has_index:
|
||||
match = re.search(_re_youtube_index, track)
|
||||
if match:
|
||||
returning["track_index"] = int(match.group(1)) - 1
|
||||
|
||||
if all(k in track for k in ["&list=", "watch?"]):
|
||||
returning["track_index"] = 0
|
||||
returning["playlist"] = True
|
||||
returning["single"] = False
|
||||
elif all(x in track for x in ["playlist?"]):
|
||||
returning["playlist"] = True if not _has_index else False
|
||||
returning["single"] = True if _has_index else False
|
||||
else:
|
||||
returning["single"] = True
|
||||
elif url_domain == "spotify.com":
|
||||
returning["spotify"] = True
|
||||
if "/playlist/" in track:
|
||||
returning["playlist"] = True
|
||||
elif "/album/" in track:
|
||||
returning["album"] = True
|
||||
elif "/track/" in track:
|
||||
returning["single"] = True
|
||||
val = re.sub(_re_spotify_url, "", track).replace("/", ":")
|
||||
if "user:" in val:
|
||||
val = val.split(":", 2)[-1]
|
||||
_id = val.split(":", 1)[-1]
|
||||
_id = _id.split("?")[0]
|
||||
|
||||
if "#" in _id:
|
||||
_id = _id.split("#")[0]
|
||||
match = re.search(_re_spotify_timestamp, track)
|
||||
if match:
|
||||
returning["start_time"] = (int(match.group(1)) * 60) + int(
|
||||
match.group(2)
|
||||
)
|
||||
|
||||
returning["id"] = _id
|
||||
returning["uri"] = f"spotify:{val}"
|
||||
elif url_domain == "soundcloud.com":
|
||||
returning["soundcloud"] = True
|
||||
if "#t=" in track:
|
||||
match = re.search(_re_soundcloud_timestamp, track)
|
||||
if match:
|
||||
returning["start_time"] = (int(match.group(1)) * 60) + int(
|
||||
match.group(2)
|
||||
)
|
||||
if "/sets/" in track:
|
||||
if "?in=" in track:
|
||||
returning["single"] = True
|
||||
else:
|
||||
returning["playlist"] = True
|
||||
else:
|
||||
returning["single"] = True
|
||||
elif url_domain == "bandcamp.com":
|
||||
returning["bandcamp"] = True
|
||||
if "/album/" in track:
|
||||
returning["album"] = True
|
||||
else:
|
||||
returning["single"] = True
|
||||
elif url_domain == "vimeo.com":
|
||||
returning["vimeo"] = True
|
||||
elif url_domain in ["mixer.com", "beam.pro"]:
|
||||
returning["mixer"] = True
|
||||
elif url_domain == "twitch.tv":
|
||||
returning["twitch"] = True
|
||||
if "?t=" in track:
|
||||
match = re.search(_re_twitch_timestamp, track)
|
||||
if match:
|
||||
returning["start_time"] = (
|
||||
(int(match.group(1)) * 60 * 60)
|
||||
+ (int(match.group(2)) * 60)
|
||||
+ int(match.group(3))
|
||||
)
|
||||
|
||||
if not any(x in track for x in ["/clip/", "/videos/"]):
|
||||
returning["stream"] = True
|
||||
else:
|
||||
returning["other"] = True
|
||||
returning["single"] = True
|
||||
else:
|
||||
if kwargs.get("soundcloud", False):
|
||||
returning["soundcloud"] = True
|
||||
else:
|
||||
returning["youtube"] = True
|
||||
returning["search"] = True
|
||||
returning["single"] = True
|
||||
except Exception:
|
||||
returning["search"] = True
|
||||
returning["youtube"] = True
|
||||
returning["single"] = True
|
||||
return returning
|
||||
|
||||
def _get_query(self):
|
||||
if self.is_local:
|
||||
return self.track.to_string()
|
||||
elif self.is_spotify:
|
||||
return self.spotify_uri
|
||||
elif self.is_search and self.is_youtube:
|
||||
return f"ytsearch:{self.track}"
|
||||
elif self.is_search and self.is_soundcloud:
|
||||
return f"scsearch:{self.track}"
|
||||
return self.track
|
||||
|
||||
def to_string_user(self):
|
||||
if self.is_local:
|
||||
return str(self.track.to_string_hidden())
|
||||
return str(self._raw)
|
||||
@@ -5,7 +5,7 @@
|
||||
class Equalizer:
|
||||
def __init__(self):
|
||||
self._band_count = 15
|
||||
self.bands = [0.0 for x in range(self._band_count)]
|
||||
self.bands = [0.0 for _ in range(self._band_count)]
|
||||
|
||||
def set_gain(self, band: int, gain: float):
|
||||
if band < 0 or band >= self._band_count:
|
||||
@@ -27,14 +27,11 @@ class Equalizer:
|
||||
gains = [1.0, 0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0.0, -0.1, -0.2, -0.25]
|
||||
|
||||
for gain in gains:
|
||||
prefix = " "
|
||||
|
||||
prefix = ""
|
||||
if gain > 0:
|
||||
prefix = "+"
|
||||
elif gain == 0:
|
||||
prefix = " "
|
||||
else:
|
||||
prefix = ""
|
||||
|
||||
block += f"{prefix}{gain:.2f} | "
|
||||
|
||||
|
||||
@@ -31,3 +31,67 @@ class LavalinkDownloadFailed(AudioError, RuntimeError):
|
||||
|
||||
def _response_repr(self) -> str:
|
||||
return f"[{self.response.status} {self.response.reason}]"
|
||||
|
||||
|
||||
class PlayListError(AudioError):
|
||||
"""Base exception for errors related to playlists."""
|
||||
|
||||
|
||||
class InvalidPlaylistScope(PlayListError):
|
||||
"""Provided playlist scope is not valid."""
|
||||
|
||||
|
||||
class MissingGuild(PlayListError):
|
||||
"""Trying to access the Guild scope without a guild."""
|
||||
|
||||
|
||||
class MissingAuthor(PlayListError):
|
||||
"""Trying to access the User scope without an user id."""
|
||||
|
||||
|
||||
class TooManyMatches(PlayListError):
|
||||
"""Too many playlist match user input."""
|
||||
|
||||
|
||||
class NotAllowed(PlayListError):
|
||||
"""Too many playlist match user input."""
|
||||
|
||||
|
||||
class ApiError(AudioError):
|
||||
"""Base exception for API errors in the Audio cog."""
|
||||
|
||||
|
||||
class SpotifyApiError(ApiError):
|
||||
"""Base exception for Spotify API errors."""
|
||||
|
||||
|
||||
class SpotifyFetchError(SpotifyApiError):
|
||||
"""Fetching Spotify data failed."""
|
||||
|
||||
def __init__(self, message, *args):
|
||||
self.message = message
|
||||
super().__init__(*args)
|
||||
|
||||
|
||||
class YouTubeApiError(ApiError):
|
||||
"""Base exception for YouTube Data API errors."""
|
||||
|
||||
|
||||
class DatabaseError(AudioError):
|
||||
"""Base exception for database errors in the Audio cog."""
|
||||
|
||||
|
||||
class InvalidTableError(DatabaseError):
|
||||
"""Provided table to query is not a valid table."""
|
||||
|
||||
|
||||
class LocalTrackError(AudioError):
|
||||
"""Base exception for local track errors."""
|
||||
|
||||
|
||||
class InvalidLocalTrack(LocalTrackError):
|
||||
"""Base exception for local track errors."""
|
||||
|
||||
|
||||
class InvalidLocalTrackFolder(LocalTrackError):
|
||||
"""Base exception for local track errors."""
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import itertools
|
||||
import pathlib
|
||||
import platform
|
||||
import shutil
|
||||
import asyncio
|
||||
import asyncio.subprocess # disables for # https://github.com/PyCQA/pylint/issues/1469
|
||||
import itertools
|
||||
import logging
|
||||
import pathlib
|
||||
import platform
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Optional, Tuple, ClassVar, List
|
||||
import time
|
||||
from typing import ClassVar, List, Optional, Tuple
|
||||
|
||||
import aiohttp
|
||||
from tqdm import tqdm
|
||||
@@ -39,7 +40,6 @@ class ServerManager:
|
||||
_java_available: ClassVar[Optional[bool]] = None
|
||||
_java_version: ClassVar[Optional[Tuple[int, int]]] = None
|
||||
_up_to_date: ClassVar[Optional[bool]] = None
|
||||
|
||||
_blacklisted_archs = []
|
||||
|
||||
def __init__(self) -> None:
|
||||
@@ -154,12 +154,15 @@ class ServerManager:
|
||||
|
||||
async def _wait_for_launcher(self) -> None:
|
||||
log.debug("Waiting for Lavalink server to be ready")
|
||||
lastmessage = 0
|
||||
for i in itertools.cycle(range(50)):
|
||||
line = await self._proc.stdout.readline()
|
||||
if READY_LINE_RE.search(line):
|
||||
self.ready.set()
|
||||
break
|
||||
if self._proc.returncode is not None:
|
||||
if self._proc.returncode is not None and lastmessage + 2 < time.time():
|
||||
# Avoid Console spam only print once every 2 seconds
|
||||
lastmessage = time.time()
|
||||
log.critical("Internal lavalink server exited early")
|
||||
if i == 49:
|
||||
# Sleep after 50 lines to prevent busylooping
|
||||
|
||||
428
redbot/cogs/audio/playlists.py
Normal file
428
redbot/cogs/audio/playlists.py
Normal file
@@ -0,0 +1,428 @@
|
||||
from collections import namedtuple
|
||||
from enum import Enum, unique
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import discord
|
||||
import lavalink
|
||||
|
||||
from redbot.core import Config, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.i18n import Translator
|
||||
from redbot.core.utils.chat_formatting import humanize_list
|
||||
from .errors import InvalidPlaylistScope, MissingAuthor, MissingGuild, NotAllowed
|
||||
|
||||
_config = None
|
||||
_bot = None
|
||||
|
||||
__all__ = [
|
||||
"Playlist",
|
||||
"PlaylistScope",
|
||||
"get_playlist",
|
||||
"get_all_playlist",
|
||||
"create_playlist",
|
||||
"reset_playlist",
|
||||
"delete_playlist",
|
||||
"humanize_scope",
|
||||
"standardize_scope",
|
||||
"FakePlaylist",
|
||||
]
|
||||
|
||||
FakePlaylist = namedtuple("Playlist", "author scope")
|
||||
|
||||
_ = Translator("Audio", __file__)
|
||||
|
||||
|
||||
@unique
|
||||
class PlaylistScope(Enum):
|
||||
GLOBAL = "GLOBALPLAYLIST"
|
||||
GUILD = "GUILDPLAYLIST"
|
||||
USER = "USERPLAYLIST"
|
||||
|
||||
def __str__(self):
|
||||
return "{0}".format(self.value)
|
||||
|
||||
@staticmethod
|
||||
def list():
|
||||
return list(map(lambda c: c.value, PlaylistScope))
|
||||
|
||||
|
||||
def _pass_config_to_playlist(config: Config, bot: Red):
|
||||
global _config, _bot
|
||||
if _config is None:
|
||||
_config = config
|
||||
if _bot is None:
|
||||
_bot = bot
|
||||
|
||||
|
||||
def standardize_scope(scope) -> str:
|
||||
scope = scope.upper()
|
||||
valid_scopes = ["GLOBAL", "GUILD", "AUTHOR", "USER", "SERVER", "MEMBER", "BOT"]
|
||||
|
||||
if scope in PlaylistScope.list():
|
||||
return scope
|
||||
elif scope not in valid_scopes:
|
||||
raise InvalidPlaylistScope(
|
||||
f'"{scope}" is not a valid playlist scope.'
|
||||
f" Scope needs to be one of the following: {humanize_list(valid_scopes)}"
|
||||
)
|
||||
|
||||
if scope in ["GLOBAL", "BOT"]:
|
||||
scope = PlaylistScope.GLOBAL.value
|
||||
elif scope in ["GUILD", "SERVER"]:
|
||||
scope = PlaylistScope.GUILD.value
|
||||
elif scope in ["USER", "MEMBER", "AUTHOR"]:
|
||||
scope = PlaylistScope.USER.value
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
def humanize_scope(scope, ctx=None, the=None):
|
||||
|
||||
if scope == PlaylistScope.GLOBAL.value:
|
||||
return ctx or _("the ") if the else "" + "Global"
|
||||
elif scope == PlaylistScope.GUILD.value:
|
||||
return ctx.name if ctx else _("the ") if the else "" + "Server"
|
||||
elif scope == PlaylistScope.USER.value:
|
||||
return str(ctx) if ctx else _("the ") if the else "" + "User"
|
||||
|
||||
|
||||
def _prepare_config_scope(
|
||||
scope, author: Union[discord.abc.User, int] = None, guild: discord.Guild = None
|
||||
):
|
||||
scope = standardize_scope(scope)
|
||||
|
||||
if scope == PlaylistScope.GLOBAL.value:
|
||||
config_scope = [PlaylistScope.GLOBAL.value]
|
||||
elif scope == PlaylistScope.USER.value:
|
||||
if author is None:
|
||||
raise MissingAuthor("Invalid author for user scope.")
|
||||
config_scope = [PlaylistScope.USER.value, str(getattr(author, "id", author))]
|
||||
else:
|
||||
if guild is None:
|
||||
raise MissingGuild("Invalid guild for guild scope.")
|
||||
config_scope = [PlaylistScope.GUILD.value, str(getattr(guild, "id", guild))]
|
||||
return config_scope
|
||||
|
||||
|
||||
class Playlist:
|
||||
"""A single playlist."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
bot: Red,
|
||||
scope: str,
|
||||
author: int,
|
||||
playlist_id: int,
|
||||
name: str,
|
||||
playlist_url: Optional[str] = None,
|
||||
tracks: Optional[List[dict]] = None,
|
||||
guild: Union[discord.Guild, int, None] = None,
|
||||
):
|
||||
self.bot = bot
|
||||
self.guild = guild
|
||||
self.scope = standardize_scope(scope)
|
||||
self.config_scope = _prepare_config_scope(self.scope, author, guild)
|
||||
self.author = author
|
||||
self.guild_id = (
|
||||
getattr(guild, "id", guild) if self.scope == PlaylistScope.GLOBAL.value else None
|
||||
)
|
||||
self.id = playlist_id
|
||||
self.name = name
|
||||
self.url = playlist_url
|
||||
self.tracks = tracks or []
|
||||
self.tracks_obj = [lavalink.Track(data=track) for track in self.tracks]
|
||||
|
||||
async def edit(self, data: dict):
|
||||
"""
|
||||
Edits a Playlist.
|
||||
Parameters
|
||||
----------
|
||||
data: dict
|
||||
The attributes to change.
|
||||
"""
|
||||
# Disallow ID editing
|
||||
if "id" in data:
|
||||
raise NotAllowed("Playlist ID cannot be edited.")
|
||||
|
||||
for item in list(data.keys()):
|
||||
setattr(self, item, data[item])
|
||||
|
||||
await _config.custom(*self.config_scope, str(self.id)).set(self.to_json())
|
||||
|
||||
def to_json(self) -> dict:
|
||||
"""Transform the object to a dict.
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
The playlist in the form of a dict.
|
||||
"""
|
||||
data = dict(
|
||||
id=self.id,
|
||||
author=self.author,
|
||||
guild=self.guild_id,
|
||||
name=self.name,
|
||||
playlist_url=self.url,
|
||||
tracks=self.tracks,
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
@classmethod
|
||||
async def from_json(cls, bot: Red, scope: str, playlist_number: int, data: dict, **kwargs):
|
||||
"""Get a Playlist object from the provided information.
|
||||
Parameters
|
||||
----------
|
||||
bot: Red
|
||||
The bot's instance. Needed to get the target user.
|
||||
scope:str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
playlist_number: int
|
||||
The playlist's number.
|
||||
data: dict
|
||||
The JSON representation of the playlist to be gotten.
|
||||
**kwargs
|
||||
Extra attributes for the Playlist instance which override values
|
||||
in the data dict. These should be complete objects and not
|
||||
IDs, where possible.
|
||||
Returns
|
||||
-------
|
||||
Playlist
|
||||
The playlist object for the requested playlist.
|
||||
Raises
|
||||
------
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
guild = data.get("guild") or kwargs.get("guild")
|
||||
author = data.get("author")
|
||||
playlist_id = data.get("id") or playlist_number
|
||||
name = data.get("name", "Unnamed")
|
||||
playlist_url = data.get("playlist_url", None)
|
||||
tracks = data.get("tracks", [])
|
||||
|
||||
return cls(
|
||||
bot=bot,
|
||||
guild=guild,
|
||||
scope=scope,
|
||||
author=author,
|
||||
playlist_id=playlist_id,
|
||||
name=name,
|
||||
playlist_url=playlist_url,
|
||||
tracks=tracks,
|
||||
)
|
||||
|
||||
|
||||
async def get_playlist(
|
||||
playlist_number: int,
|
||||
scope: str,
|
||||
bot: Red,
|
||||
guild: Union[discord.Guild, int] = None,
|
||||
author: Union[discord.abc.User, int] = None,
|
||||
) -> Playlist:
|
||||
"""
|
||||
Gets the playlist with the associated playlist number.
|
||||
Parameters
|
||||
----------
|
||||
playlist_number: int
|
||||
The playlist number for the playlist to get.
|
||||
scope: str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
guild: discord.Guild
|
||||
The guild to get the playlist from if scope is GUILDPLAYLIST.
|
||||
author: int
|
||||
The ID of the user to get the playlist from if scope is USERPLAYLIST.
|
||||
bot: Red
|
||||
The bot's instance.
|
||||
Returns
|
||||
-------
|
||||
Playlist
|
||||
The playlist associated with the playlist number.
|
||||
Raises
|
||||
------
|
||||
`RuntimeError`
|
||||
If there is no playlist for the specified number.
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
playlist_data = await _config.custom(
|
||||
*_prepare_config_scope(scope, author, guild), str(playlist_number)
|
||||
).all()
|
||||
if not playlist_data["id"]:
|
||||
raise RuntimeError(f"That playlist does not exist for the following scope: {scope}")
|
||||
return await Playlist.from_json(
|
||||
bot, scope, playlist_number, playlist_data, guild=guild, author=author
|
||||
)
|
||||
|
||||
|
||||
async def get_all_playlist(
|
||||
scope: str,
|
||||
bot: Red,
|
||||
guild: Union[discord.Guild, int] = None,
|
||||
author: Union[discord.abc.User, int] = None,
|
||||
specified_user: bool = False,
|
||||
) -> List[Playlist]:
|
||||
"""
|
||||
Gets all playlist for the specified scope.
|
||||
Parameters
|
||||
----------
|
||||
scope: str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
guild: discord.Guild
|
||||
The guild to get the playlist from if scope is GUILDPLAYLIST.
|
||||
author: int
|
||||
The ID of the user to get the playlist from if scope is USERPLAYLIST.
|
||||
bot: Red
|
||||
The bot's instance
|
||||
specified_user:bool
|
||||
Whether or not user ID was passed as an argparse.
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list of all playlists for the specified scope
|
||||
Raises
|
||||
------
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
playlists = await _config.custom(*_prepare_config_scope(scope, author, guild)).all()
|
||||
if specified_user:
|
||||
user_id = getattr(author, "id", author)
|
||||
return [
|
||||
await Playlist.from_json(
|
||||
bot, scope, playlist_number, playlist_data, guild=guild, author=author
|
||||
)
|
||||
for playlist_number, playlist_data in playlists.items()
|
||||
if user_id == playlist_data.get("author")
|
||||
]
|
||||
else:
|
||||
return [
|
||||
await Playlist.from_json(
|
||||
bot, scope, playlist_number, playlist_data, guild=guild, author=author
|
||||
)
|
||||
for playlist_number, playlist_data in playlists.items()
|
||||
]
|
||||
|
||||
|
||||
async def create_playlist(
|
||||
ctx: commands.Context,
|
||||
scope: str,
|
||||
playlist_name: str,
|
||||
playlist_url: Optional[str] = None,
|
||||
tracks: Optional[List[dict]] = None,
|
||||
author: Optional[discord.User] = None,
|
||||
guild: Optional[discord.Guild] = None,
|
||||
) -> Optional[Playlist]:
|
||||
"""
|
||||
Creates a new Playlist.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx: commands.Context
|
||||
The context in which the play list is being created.
|
||||
scope: str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
playlist_name: str
|
||||
The name of the new playlist.
|
||||
playlist_url:str
|
||||
the url of the new playlist.
|
||||
tracks: List[dict]
|
||||
A list of tracks to add to the playlist.
|
||||
author: discord.User
|
||||
The Author of the playlist.
|
||||
If provided it will create a playlist under this user.
|
||||
This is only required when creating a playlist in User scope.
|
||||
guild: discord.Guild
|
||||
The guild to create this playlist under.
|
||||
This is only used when creating a playlist in the Guild scope
|
||||
|
||||
Raises
|
||||
------
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
|
||||
playlist = Playlist(
|
||||
ctx.bot, scope, author.id, ctx.message.id, playlist_name, playlist_url, tracks, ctx.guild
|
||||
)
|
||||
|
||||
await _config.custom(*_prepare_config_scope(scope, author, guild), str(ctx.message.id)).set(
|
||||
playlist.to_json()
|
||||
)
|
||||
return playlist
|
||||
|
||||
|
||||
async def reset_playlist(
|
||||
scope: str,
|
||||
guild: Union[discord.Guild, int] = None,
|
||||
author: Union[discord.abc.User, int] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Wipes all playlists for the specified scope.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scope: str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
guild: discord.Guild
|
||||
The guild to get the playlist from if scope is GUILDPLAYLIST.
|
||||
author: int
|
||||
The ID of the user to get the playlist from if scope is USERPLAYLIST.
|
||||
|
||||
Raises
|
||||
------
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
await _config.custom(*_prepare_config_scope(scope, author, guild)).clear()
|
||||
|
||||
|
||||
async def delete_playlist(
|
||||
scope: str,
|
||||
playlist_id: Union[str, int],
|
||||
guild: discord.Guild,
|
||||
author: Union[discord.abc.User, int] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Deletes the specified playlist.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
scope: str
|
||||
The custom config scope. One of 'GLOBALPLAYLIST', 'GUILDPLAYLIST' or 'USERPLAYLIST'.
|
||||
playlist_id: Union[str, int]
|
||||
The ID of the playlist.
|
||||
guild: discord.Guild
|
||||
The guild to get the playlist from if scope is GUILDPLAYLIST.
|
||||
author: int
|
||||
The ID of the user to get the playlist from if scope is USERPLAYLIST.
|
||||
|
||||
Raises
|
||||
------
|
||||
`InvalidPlaylistScope`
|
||||
Passing a scope that is not supported.
|
||||
`MissingGuild`
|
||||
Trying to access the Guild scope without a guild.
|
||||
`MissingAuthor`
|
||||
Trying to access the User scope without an user id.
|
||||
"""
|
||||
await _config.custom(*_prepare_config_scope(scope, author, guild), str(playlist_id)).clear()
|
||||
425
redbot/cogs/audio/utils.py
Normal file
425
redbot/cogs/audio/utils.py
Normal file
@@ -0,0 +1,425 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from typing import NoReturn
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import discord
|
||||
import lavalink
|
||||
|
||||
from redbot.core import Config, commands
|
||||
from redbot.core.bot import Red
|
||||
from . import dataclasses
|
||||
|
||||
from .converters import _pass_config_to_converters
|
||||
|
||||
from .playlists import _pass_config_to_playlist
|
||||
|
||||
__all__ = [
|
||||
"pass_config_to_dependencies",
|
||||
"track_limit",
|
||||
"queue_duration",
|
||||
"draw_time",
|
||||
"dynamic_time",
|
||||
"match_url",
|
||||
"clear_react",
|
||||
"match_yt_playlist",
|
||||
"remove_react",
|
||||
"get_description",
|
||||
"track_creator",
|
||||
"time_convert",
|
||||
"url_check",
|
||||
"userlimit",
|
||||
"is_allowed",
|
||||
"CacheLevel",
|
||||
"Notifier",
|
||||
]
|
||||
_re_time_converter = re.compile(r"(?:(\d+):)?([0-5]?[0-9]):([0-5][0-9])")
|
||||
re_yt_list_playlist = re.compile(
|
||||
r"^(https?://)?(www\.)?(youtube\.com|youtu\.?be)(/playlist\?).*(list=)(.*)(&|$)"
|
||||
)
|
||||
|
||||
_config = None
|
||||
_bot = None
|
||||
|
||||
|
||||
def pass_config_to_dependencies(config: Config, bot: Red, localtracks_folder: str):
|
||||
global _bot, _config
|
||||
_bot = bot
|
||||
_config = config
|
||||
_pass_config_to_playlist(config, bot)
|
||||
_pass_config_to_converters(config, bot)
|
||||
dataclasses._pass_config_to_dataclasses(config, bot, localtracks_folder)
|
||||
|
||||
|
||||
def track_limit(track, maxlength):
|
||||
try:
|
||||
length = round(track.length / 1000)
|
||||
except AttributeError:
|
||||
length = round(track / 1000)
|
||||
|
||||
if maxlength < length <= 900000000000000: # livestreams return 9223372036854775807ms
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
async def is_allowed(guild: discord.Guild, query: str):
|
||||
query = query.lower().strip()
|
||||
whitelist = set(await _config.guild(guild).url_keyword_whitelist())
|
||||
if whitelist:
|
||||
return any(i in query for i in whitelist)
|
||||
blacklist = set(await _config.guild(guild).url_keyword_blacklist())
|
||||
return not any(i in query for i in blacklist)
|
||||
|
||||
|
||||
async def queue_duration(ctx):
|
||||
player = lavalink.get_player(ctx.guild.id)
|
||||
duration = []
|
||||
for i in range(len(player.queue)):
|
||||
if not player.queue[i].is_stream:
|
||||
duration.append(player.queue[i].length)
|
||||
queue_dur = sum(duration)
|
||||
if not player.queue:
|
||||
queue_dur = 0
|
||||
try:
|
||||
if not player.current.is_stream:
|
||||
remain = player.current.length - player.position
|
||||
else:
|
||||
remain = 0
|
||||
except AttributeError:
|
||||
remain = 0
|
||||
queue_total_duration = remain + queue_dur
|
||||
return queue_total_duration
|
||||
|
||||
|
||||
async def draw_time(ctx):
|
||||
player = lavalink.get_player(ctx.guild.id)
|
||||
paused = player.paused
|
||||
pos = player.position
|
||||
dur = player.current.length
|
||||
sections = 12
|
||||
loc_time = round((pos / dur) * sections)
|
||||
bar = "\N{BOX DRAWINGS HEAVY HORIZONTAL}"
|
||||
seek = "\N{RADIO BUTTON}"
|
||||
if paused:
|
||||
msg = "\N{DOUBLE VERTICAL BAR}"
|
||||
else:
|
||||
msg = "\N{BLACK RIGHT-POINTING TRIANGLE}"
|
||||
for i in range(sections):
|
||||
if i == loc_time:
|
||||
msg += seek
|
||||
else:
|
||||
msg += bar
|
||||
return msg
|
||||
|
||||
|
||||
def dynamic_time(seconds):
|
||||
m, s = divmod(seconds, 60)
|
||||
h, m = divmod(m, 60)
|
||||
d, h = divmod(h, 24)
|
||||
|
||||
if d > 0:
|
||||
msg = "{0}d {1}h"
|
||||
elif d == 0 and h > 0:
|
||||
msg = "{1}h {2}m"
|
||||
elif d == 0 and h == 0 and m > 0:
|
||||
msg = "{2}m {3}s"
|
||||
elif d == 0 and h == 0 and m == 0 and s > 0:
|
||||
msg = "{3}s"
|
||||
else:
|
||||
msg = ""
|
||||
return msg.format(d, h, m, s)
|
||||
|
||||
|
||||
def match_url(url):
|
||||
try:
|
||||
query_url = urlparse(url)
|
||||
return all([query_url.scheme, query_url.netloc, query_url.path])
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def match_yt_playlist(url):
|
||||
if re_yt_list_playlist.match(url):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
async def remove_react(message, react_emoji, react_user):
|
||||
with contextlib.suppress(discord.HTTPException):
|
||||
await message.remove_reaction(react_emoji, react_user)
|
||||
|
||||
|
||||
async def clear_react(bot: Red, message: discord.Message, emoji: dict = None):
|
||||
try:
|
||||
await message.clear_reactions()
|
||||
except discord.Forbidden:
|
||||
if not emoji:
|
||||
return
|
||||
with contextlib.suppress(discord.HTTPException):
|
||||
for key in emoji.values():
|
||||
await asyncio.sleep(0.2)
|
||||
await message.remove_reaction(key, bot.user)
|
||||
except discord.HTTPException:
|
||||
return
|
||||
|
||||
|
||||
async def get_description(track):
|
||||
if any(x in track.uri for x in [f"{os.sep}localtracks", f"localtracks{os.sep}"]):
|
||||
local_track = dataclasses.LocalPath(track.uri)
|
||||
if track.title != "Unknown title":
|
||||
return "**{} - {}**\n{}".format(
|
||||
track.author, track.title, local_track.to_string_hidden()
|
||||
)
|
||||
else:
|
||||
return local_track.to_string_hidden()
|
||||
else:
|
||||
return "**[{}]({})**".format(track.title, track.uri)
|
||||
|
||||
|
||||
def track_creator(player, position=None, other_track=None):
|
||||
if position == "np":
|
||||
queued_track = player.current
|
||||
elif position is None:
|
||||
queued_track = other_track
|
||||
else:
|
||||
queued_track = player.queue[position]
|
||||
track_keys = queued_track._info.keys()
|
||||
track_values = queued_track._info.values()
|
||||
track_id = queued_track.track_identifier
|
||||
track_info = {}
|
||||
for k, v in zip(track_keys, track_values):
|
||||
track_info[k] = v
|
||||
keys = ["track", "info"]
|
||||
values = [track_id, track_info]
|
||||
track_obj = {}
|
||||
for key, value in zip(keys, values):
|
||||
track_obj[key] = value
|
||||
return track_obj
|
||||
|
||||
|
||||
def time_convert(length):
|
||||
match = re.compile(_re_time_converter).match(length)
|
||||
if match is not None:
|
||||
hr = int(match.group(1)) if match.group(1) else 0
|
||||
mn = int(match.group(2)) if match.group(2) else 0
|
||||
sec = int(match.group(3)) if match.group(3) else 0
|
||||
pos = sec + (mn * 60) + (hr * 3600)
|
||||
return pos
|
||||
else:
|
||||
try:
|
||||
return int(length)
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
|
||||
def url_check(url):
|
||||
valid_tld = [
|
||||
"youtube.com",
|
||||
"youtu.be",
|
||||
"soundcloud.com",
|
||||
"bandcamp.com",
|
||||
"vimeo.com",
|
||||
"beam.pro",
|
||||
"mixer.com",
|
||||
"twitch.tv",
|
||||
"spotify.com",
|
||||
"localtracks",
|
||||
]
|
||||
query_url = urlparse(url)
|
||||
url_domain = ".".join(query_url.netloc.split(".")[-2:])
|
||||
if not query_url.netloc:
|
||||
url_domain = ".".join(query_url.path.split("/")[0].split(".")[-2:])
|
||||
return True if url_domain in valid_tld else False
|
||||
|
||||
|
||||
def userlimit(channel):
|
||||
if channel.user_limit == 0 or channel.user_limit > len(channel.members) + 1:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class CacheLevel:
|
||||
__slots__ = ("value",)
|
||||
|
||||
def __init__(self, level=0):
|
||||
if not isinstance(level, int):
|
||||
raise TypeError(
|
||||
f"Expected int parameter, received {level.__class__.__name__} instead."
|
||||
)
|
||||
elif level < 0:
|
||||
level = 0
|
||||
elif level > 0b11111:
|
||||
level = 0b11111
|
||||
|
||||
self.value = level
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, CacheLevel) and self.value == other.value
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.value)
|
||||
|
||||
def __add__(self, other):
|
||||
return CacheLevel(self.value + other.value)
|
||||
|
||||
def __radd__(self, other):
|
||||
return CacheLevel(other.value + self.value)
|
||||
|
||||
def __sub__(self, other):
|
||||
return CacheLevel(self.value - other.value)
|
||||
|
||||
def __rsub__(self, other):
|
||||
return CacheLevel(other.value - self.value)
|
||||
|
||||
def __str__(self):
|
||||
return "{0:b}".format(self.value)
|
||||
|
||||
def __format__(self, format_spec):
|
||||
return "{r:{f}}".format(r=self.value, f=format_spec)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<CacheLevel value={self.value}>"
|
||||
|
||||
def is_subset(self, other):
|
||||
"""Returns ``True`` if self has the same or fewer caching levels as other."""
|
||||
return (self.value & other.value) == self.value
|
||||
|
||||
def is_superset(self, other):
|
||||
"""Returns ``True`` if self has the same or more caching levels as other."""
|
||||
return (self.value | other.value) == self.value
|
||||
|
||||
def is_strict_subset(self, other):
|
||||
"""Returns ``True`` if the caching level on other are a strict subset of those on self."""
|
||||
return self.is_subset(other) and self != other
|
||||
|
||||
def is_strict_superset(self, other):
|
||||
"""Returns ``True`` if the caching level on
|
||||
other are a strict superset of those on self."""
|
||||
return self.is_superset(other) and self != other
|
||||
|
||||
__le__ = is_subset
|
||||
__ge__ = is_superset
|
||||
__lt__ = is_strict_subset
|
||||
__gt__ = is_strict_superset
|
||||
|
||||
@classmethod
|
||||
def all(cls):
|
||||
"""A factory method that creates a :class:`CacheLevel` with max caching level."""
|
||||
return cls(0b11111)
|
||||
|
||||
@classmethod
|
||||
def none(cls):
|
||||
"""A factory method that creates a :class:`CacheLevel` with no caching."""
|
||||
return cls(0)
|
||||
|
||||
@classmethod
|
||||
def set_spotify(cls):
|
||||
"""A factory method that creates a :class:`CacheLevel` with Spotify caching level."""
|
||||
return cls(0b00011)
|
||||
|
||||
@classmethod
|
||||
def set_youtube(cls):
|
||||
"""A factory method that creates a :class:`CacheLevel` with YouTube caching level."""
|
||||
return cls(0b00100)
|
||||
|
||||
@classmethod
|
||||
def set_lavalink(cls):
|
||||
"""A factory method that creates a :class:`CacheLevel` with lavalink caching level."""
|
||||
return cls(0b11000)
|
||||
|
||||
def _bit(self, index):
|
||||
return bool((self.value >> index) & 1)
|
||||
|
||||
def _set(self, index, value):
|
||||
if value is True:
|
||||
self.value |= 1 << index
|
||||
elif value is False:
|
||||
self.value &= ~(1 << index)
|
||||
else:
|
||||
raise TypeError("Value to set for CacheLevel must be a bool.")
|
||||
|
||||
@property
|
||||
def lavalink(self):
|
||||
""":class:`bool`: Returns ``True`` if a user can deafen other users."""
|
||||
return self._bit(4)
|
||||
|
||||
@lavalink.setter
|
||||
def lavalink(self, value):
|
||||
self._set(4, value)
|
||||
|
||||
@property
|
||||
def youtube(self):
|
||||
""":class:`bool`: Returns ``True`` if a user can move users between other voice
|
||||
channels."""
|
||||
return self._bit(2)
|
||||
|
||||
@youtube.setter
|
||||
def youtube(self, value):
|
||||
self._set(2, value)
|
||||
|
||||
@property
|
||||
def spotify(self):
|
||||
""":class:`bool`: Returns ``True`` if a user can use voice activation in voice channels."""
|
||||
return self._bit(1)
|
||||
|
||||
@spotify.setter
|
||||
def spotify(self, value):
|
||||
self._set(1, value)
|
||||
|
||||
|
||||
class Notifier:
|
||||
def __init__(self, ctx: commands.Context, message: discord.Message, updates: dict, **kwargs):
|
||||
self.context = ctx
|
||||
self.message = message
|
||||
self.updates = updates
|
||||
self.color = None
|
||||
self.last_msg_time = 0
|
||||
self.cooldown = 5
|
||||
|
||||
async def notify_user(
|
||||
self,
|
||||
current: int = None,
|
||||
total: int = None,
|
||||
key: str = None,
|
||||
seconds_key: str = None,
|
||||
seconds: str = None,
|
||||
) -> NoReturn:
|
||||
"""
|
||||
This updates an existing message.
|
||||
Based on the message found in :variable:`Notifier.updates` as per the `key` param
|
||||
"""
|
||||
if self.last_msg_time + self.cooldown > time.time() and not current == total:
|
||||
return
|
||||
if self.color is None:
|
||||
self.color = await self.context.embed_colour()
|
||||
embed2 = discord.Embed(
|
||||
colour=self.color,
|
||||
title=self.updates.get(key).format(num=current, total=total, seconds=seconds),
|
||||
)
|
||||
if seconds and seconds_key:
|
||||
embed2.set_footer(text=self.updates.get(seconds_key).format(seconds=seconds))
|
||||
try:
|
||||
await self.message.edit(embed=embed2)
|
||||
self.last_msg_time = time.time()
|
||||
except discord.errors.NotFound:
|
||||
pass
|
||||
|
||||
async def update_text(self, text: str) -> NoReturn:
|
||||
embed2 = discord.Embed(colour=self.color, title=text)
|
||||
try:
|
||||
await self.message.edit(embed=embed2)
|
||||
except discord.errors.NotFound:
|
||||
pass
|
||||
|
||||
async def update_embed(self, embed: discord.Embed) -> NoReturn:
|
||||
try:
|
||||
await self.message.edit(embed=embed)
|
||||
self.last_msg_time = time.time()
|
||||
except discord.errors.NotFound:
|
||||
pass
|
||||
@@ -3,7 +3,12 @@ import inspect
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
|
||||
__all__ = ["ConversionFailure", "BotMissingPermissions", "UserFeedbackCheckFailure"]
|
||||
__all__ = [
|
||||
"ConversionFailure",
|
||||
"BotMissingPermissions",
|
||||
"UserFeedbackCheckFailure",
|
||||
"ArgParserFailure",
|
||||
]
|
||||
|
||||
|
||||
class ConversionFailure(commands.BadArgument):
|
||||
@@ -30,3 +35,16 @@ class UserFeedbackCheckFailure(commands.CheckFailure):
|
||||
def __init__(self, message=None, *args):
|
||||
self.message = message
|
||||
super().__init__(message, *args)
|
||||
|
||||
|
||||
class ArgParserFailure(UserFeedbackCheckFailure):
|
||||
"""Raised when parsing an argument fails."""
|
||||
|
||||
def __init__(
|
||||
self, cmd: str, user_input: str, custom_help: str = None, ctx_send_help: bool = False
|
||||
):
|
||||
self.cmd = cmd
|
||||
self.user_input = user_input
|
||||
self.send_cmd_help = ctx_send_help
|
||||
self.custom_help_msg = custom_help
|
||||
super().__init__()
|
||||
|
||||
@@ -175,6 +175,13 @@ def init_events(bot, cli_flags):
|
||||
|
||||
if isinstance(error, commands.MissingRequiredArgument):
|
||||
await ctx.send_help()
|
||||
elif isinstance(error, commands.ArgParserFailure):
|
||||
msg = f"`{error.user_input}` is not a valid value for `{error.cmd}`"
|
||||
if error.custom_help_msg:
|
||||
msg += f"\n{error.custom_help_msg}"
|
||||
await ctx.send(msg)
|
||||
if error.send_cmd_help:
|
||||
await ctx.send_help()
|
||||
elif isinstance(error, commands.ConversionFailure):
|
||||
if error.args:
|
||||
await ctx.send(error.args[0])
|
||||
|
||||
Reference in New Issue
Block a user