import asyncio import contextlib import datetime import json import logging import random import time from collections import namedtuple from pathlib import Path from typing import TYPE_CHECKING, Callable, List, MutableMapping, Optional, Tuple, Union, cast import aiohttp import discord import lavalink from lavalink.rest_api import LoadResult, LoadType from redbot.core import Config, commands from redbot.core.bot import Red from redbot.core.commands import Cog, Context from redbot.core.i18n import Translator from redbot.core.utils import AsyncIter from redbot.core.utils.dbtools import APSWConnectionWrapper from ..audio_dataclasses import Query from ..audio_logging import IS_DEBUG, debug_exc_log from ..errors import DatabaseError, SpotifyFetchError, TrackEnqueueError, YouTubeApiError from ..utils import CacheLevel, Notifier from .api_utils import LavalinkCacheFetchForGlobalResult from .global_db import GlobalCacheWrapper from .local_db import LocalCacheWrapper from .persist_queue_wrapper import QueueInterface from .playlist_interface import get_playlist from .playlist_wrapper import PlaylistWrapper from .spotify import SpotifyWrapper from .youtube import YouTubeWrapper if TYPE_CHECKING: from .. import Audio _ = Translator("Audio", Path(__file__)) log = logging.getLogger("red.cogs.Audio.api.AudioAPIInterface") _TOP_100_US = "https://www.youtube.com/playlist?list=PL4fGSI1pDJn5rWitrRWFKdm-ulaFiIyoK" # TODO: Get random from global Cache class AudioAPIInterface: """Handles music queries. Always tries the Local cache first, then Global cache before making API calls. """ def __init__( self, bot: Red, config: Config, session: aiohttp.ClientSession, conn: APSWConnectionWrapper, cog: Union["Audio", Cog], ): self.bot = bot self.config = config self.conn = conn self.cog = cog self.spotify_api: SpotifyWrapper = SpotifyWrapper(self.bot, self.config, session, self.cog) self.youtube_api: YouTubeWrapper = YouTubeWrapper(self.bot, self.config, session, self.cog) self.local_cache_api = LocalCacheWrapper(self.bot, self.config, self.conn, self.cog) self.global_cache_api = GlobalCacheWrapper(self.bot, self.config, session, self.cog) self.persistent_queue_api = QueueInterface(self.bot, self.config, self.conn, self.cog) self._session: aiohttp.ClientSession = session self._tasks: MutableMapping = {} self._lock: asyncio.Lock = asyncio.Lock() async def initialize(self) -> None: """Initialises the Local Cache connection.""" await self.local_cache_api.lavalink.init() await self.persistent_queue_api.init() def close(self) -> None: """Closes the Local Cache connection.""" self.local_cache_api.lavalink.close() async def get_random_track_from_db(self, tries=0) -> Optional[MutableMapping]: """Get a random track from the local database and return it.""" track: Optional[MutableMapping] = {} try: query_data = {} date = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=7) date_timestamp = int(date.timestamp()) query_data["day"] = date_timestamp max_age = await self.config.cache_age() maxage = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.timedelta( days=max_age ) maxage_int = int(time.mktime(maxage.timetuple())) query_data["maxage"] = maxage_int track = await self.local_cache_api.lavalink.fetch_random(query_data) if track is not None: if track.get("loadType") == "V2_COMPACT": track["loadType"] = "V2_COMPAT" results = LoadResult(track) track = random.choice(list(results.tracks)) except Exception as exc: debug_exc_log(log, exc, "Failed to fetch a random track from database") track = {} if not track: return None return track async def route_tasks( self, action_type: str = None, data: Union[List[MutableMapping], MutableMapping] = None, ) -> None: """Separate the tasks and run them in the appropriate functions.""" if not data: return if action_type == "insert" and isinstance(data, list): for table, d in data: if table == "lavalink": await self.local_cache_api.lavalink.insert(d) elif table == "youtube": await self.local_cache_api.youtube.insert(d) elif table == "spotify": await self.local_cache_api.spotify.insert(d) elif action_type == "update" and isinstance(data, dict): for table, d in data: if table == "lavalink": await self.local_cache_api.lavalink.update(data) elif table == "youtube": await self.local_cache_api.youtube.update(data) elif table == "spotify": await self.local_cache_api.spotify.update(data) elif action_type == "global" and isinstance(data, list): await asyncio.gather(*[self.global_cache_api.update_global(**d) for d in data]) async def run_tasks(self, ctx: Optional[commands.Context] = None, message_id=None) -> None: """Run tasks for a specific context.""" if message_id is not None: lock_id = message_id elif ctx is not None: lock_id = ctx.message.id else: return lock_author = ctx.author if ctx else None async with self._lock: if lock_id in self._tasks: if IS_DEBUG: log.debug(f"Running database writes for {lock_id} ({lock_author})") try: tasks = self._tasks[lock_id] tasks = [self.route_tasks(a, tasks[a]) for a in tasks] await asyncio.gather(*tasks, return_exceptions=False) del self._tasks[lock_id] except Exception as exc: debug_exc_log( log, exc, f"Failed database writes for {lock_id} ({lock_author})" ) else: if IS_DEBUG: log.debug(f"Completed database writes for {lock_id} ({lock_author})") async def run_all_pending_tasks(self) -> None: """Run all pending tasks left in the cache, called on cog_unload.""" async with self._lock: if IS_DEBUG: log.debug("Running pending writes to database") try: tasks: MutableMapping = {"update": [], "insert": [], "global": []} async for k, task in AsyncIter(self._tasks.items()): async for t, args in AsyncIter(task.items()): tasks[t].append(args) self._tasks = {} coro_tasks = [self.route_tasks(a, tasks[a]) for a in tasks] await asyncio.gather(*coro_tasks, return_exceptions=False) except Exception as exc: debug_exc_log(log, exc, "Failed database writes") else: if IS_DEBUG: log.debug("Completed pending writes to database have finished") def append_task(self, ctx: commands.Context, event: str, task: Tuple, _id: int = None) -> None: """Add a task to the cache to be run later.""" lock_id = _id or ctx.message.id if lock_id not in self._tasks: self._tasks[lock_id] = {"update": [], "insert": [], "global": []} self._tasks[lock_id][event].append(task) async def fetch_spotify_query( self, ctx: commands.Context, query_type: str, uri: str, notifier: Optional[Notifier], skip_youtube: bool = False, current_cache_level: CacheLevel = CacheLevel.none(), ) -> List[str]: """Return youtube URLS for the spotify URL provided.""" youtube_urls = [] tracks = await self.fetch_from_spotify_api( query_type, uri, params=None, notifier=notifier, ctx=ctx ) total_tracks = len(tracks) database_entries = [] track_count = 0 time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) youtube_cache = CacheLevel.set_youtube().is_subset(current_cache_level) youtube_api_error = None global_api = self.cog.global_api_user.get("can_read") async for track in AsyncIter(tracks): if isinstance(track, str): break elif isinstance(track, dict) and track.get("error", {}).get("message") == "invalid id": continue ( song_url, track_info, uri, artist_name, track_name, _id, _type, ) = await self.spotify_api.get_spotify_track_info(track, ctx) database_entries.append( { "id": _id, "type": _type, "uri": uri, "track_name": track_name, "artist_name": artist_name, "song_url": song_url, "track_info": track_info, "last_updated": time_now, "last_fetched": time_now, } ) if skip_youtube is False: val = None if youtube_cache: try: (val, last_update) = await self.local_cache_api.youtube.fetch_one( {"track": track_info} ) except Exception as exc: debug_exc_log(log, exc, f"Failed to fetch {track_info} from YouTube table") if val is None: try: val = await self.fetch_youtube_query( ctx, track_info, current_cache_level=current_cache_level ) except YouTubeApiError as err: val = None youtube_api_error = err.message if youtube_cache and val: task = ("update", ("youtube", {"track": track_info})) self.append_task(ctx, *task) if val: youtube_urls.append(val) else: youtube_urls.append(track_info) track_count += 1 if notifier is not None and ((track_count % 2 == 0) or (track_count == total_tracks)): await notifier.notify_user(current=track_count, total=total_tracks, key="youtube") if notifier is not None and (youtube_api_error and not global_api): error_embed = discord.Embed( colour=await ctx.embed_colour(), title=_("Failing to get tracks, skipping remaining."), ) await notifier.update_embed(error_embed) break elif notifier is not None and (youtube_api_error and global_api): continue if CacheLevel.set_spotify().is_subset(current_cache_level): task = ("insert", ("spotify", database_entries)) self.append_task(ctx, *task) return youtube_urls async def fetch_from_spotify_api( self, query_type: str, uri: str, recursive: Union[str, bool] = False, params: MutableMapping = None, notifier: Optional[Notifier] = None, ctx: Context = None, ) -> Union[List[MutableMapping], List[str]]: """Gets track info from spotify API.""" if recursive is False: (call, params) = self.spotify_api.spotify_format_call(query_type, uri) results = await self.spotify_api.make_get_call(call, params) else: if isinstance(recursive, str): results = await self.spotify_api.make_get_call(recursive, params) else: results = {} try: if results["error"]["status"] == 401 and not recursive: raise SpotifyFetchError( _( "The Spotify API key or client secret has not been set properly. " "\nUse `{prefix}audioset spotifyapi` for instructions." ) ) elif recursive: return {"next": None} except KeyError: pass if recursive: return results tracks = [] track_count = 0 total_tracks = results.get("tracks", results).get("total", 1) while True: new_tracks: List = [] if query_type == "track": new_tracks = results tracks.append(new_tracks) elif query_type == "album": tracks_raw = results.get("tracks", results).get("items", []) if tracks_raw: new_tracks = tracks_raw tracks.extend(new_tracks) else: tracks_raw = results.get("tracks", results).get("items", []) if tracks_raw: new_tracks = [k["track"] for k in tracks_raw if k.get("track")] tracks.extend(new_tracks) track_count += len(new_tracks) if notifier: await notifier.notify_user(current=track_count, total=total_tracks, key="spotify") try: if results.get("next") is not None: results = await self.fetch_from_spotify_api( query_type, uri, results["next"], params, notifier=notifier ) continue else: break except KeyError: raise SpotifyFetchError( _("This doesn't seem to be a valid Spotify playlist/album URL or code.") ) return tracks async def spotify_query( self, ctx: commands.Context, query_type: str, uri: str, skip_youtube: bool = False, notifier: Optional[Notifier] = None, ) -> List[str]: """Queries the Database then falls back to Spotify and YouTube APIs. Parameters ---------- ctx: commands.Context The context this method is being called under. query_type : str Type of query to perform (Pl uri: str Spotify URL ID. skip_youtube:bool Whether or not to skip YouTube API Calls. notifier: Notifier A Notifier object to handle the user UI notifications while tracks are loaded. Returns ------- List[str] List of Youtube URLs. """ current_cache_level = CacheLevel(await self.config.cache_level()) cache_enabled = CacheLevel.set_spotify().is_subset(current_cache_level) if query_type == "track" and cache_enabled: try: (val, last_update) = await self.local_cache_api.spotify.fetch_one( {"uri": f"spotify:track:{uri}"} ) except Exception as exc: debug_exc_log( log, exc, f"Failed to fetch 'spotify:track:{uri}' from Spotify table" ) val = None else: val = None youtube_urls = [] if val is None: urls = await self.fetch_spotify_query( ctx, query_type, uri, notifier, skip_youtube, current_cache_level=current_cache_level, ) youtube_urls.extend(urls) else: if query_type == "track" and cache_enabled: task = ("update", ("spotify", {"uri": f"spotify:track:{uri}"})) self.append_task(ctx, *task) youtube_urls.append(val) return youtube_urls async def spotify_enqueue( self, ctx: commands.Context, query_type: str, uri: str, enqueue: bool, player: lavalink.Player, lock: Callable, notifier: Optional[Notifier] = None, forced: bool = False, query_global: bool = True, ) -> List[lavalink.Track]: """Queries the Database then falls back to Spotify and YouTube APIs then Enqueued matched tracks. Parameters ---------- ctx: commands.Context The context this method is being called under. query_type : str Type of query to perform (Pl uri: str Spotify URL ID. enqueue:bool Whether or not to enqueue the tracks player: lavalink.Player The current Player. notifier: Notifier A Notifier object to handle the user UI notifications while tracks are loaded. lock: Callable A callable handling the Track enqueue lock while spotify tracks are being added. query_global: bool Whether or not to query the global API. forced: bool Ignore Cache and make a fetch from API. Returns ------- List[str] List of Youtube URLs. """ await self.global_cache_api._get_api_key() globaldb_toggle = self.cog.global_api_user.get("can_read") global_entry = globaldb_toggle and query_global track_list: List = [] has_not_allowed = False youtube_api_error = None skip_youtube_api = False try: current_cache_level = CacheLevel(await self.config.cache_level()) guild_data = await self.config.guild(ctx.guild).all() enqueued_tracks = 0 consecutive_fails = 0 queue_dur = await self.cog.queue_duration(ctx) queue_total_duration = self.cog.format_time(queue_dur) before_queue_length = len(player.queue) tracks_from_spotify = await self.fetch_from_spotify_api( query_type, uri, params=None, notifier=notifier ) total_tracks = len(tracks_from_spotify) if total_tracks < 1 and notifier is not None: lock(ctx, False) embed3 = discord.Embed( colour=await ctx.embed_colour(), title=_("This doesn't seem to be a supported Spotify URL or code."), ) await notifier.update_embed(embed3) return track_list database_entries = [] time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) youtube_cache = CacheLevel.set_youtube().is_subset(current_cache_level) spotify_cache = CacheLevel.set_spotify().is_subset(current_cache_level) async for track_count, track in AsyncIter(tracks_from_spotify).enumerate(start=1): ( song_url, track_info, uri, artist_name, track_name, _id, _type, ) = await self.spotify_api.get_spotify_track_info(track, ctx) database_entries.append( { "id": _id, "type": _type, "uri": uri, "track_name": track_name, "artist_name": artist_name, "song_url": song_url, "track_info": track_info, "last_updated": time_now, "last_fetched": time_now, } ) val = None llresponse = None if youtube_cache: try: (val, last_updated) = await self.local_cache_api.youtube.fetch_one( {"track": track_info} ) except Exception as exc: debug_exc_log(log, exc, f"Failed to fetch {track_info} from YouTube table") should_query_global = globaldb_toggle and query_global and val is None if should_query_global: llresponse = await self.global_cache_api.get_spotify(track_name, artist_name) if llresponse: if llresponse.get("loadType") == "V2_COMPACT": llresponse["loadType"] = "V2_COMPAT" llresponse = LoadResult(llresponse) val = llresponse or None if val is None and not skip_youtube_api: try: val = await self.fetch_youtube_query( ctx, track_info, current_cache_level=current_cache_level ) except YouTubeApiError as err: val = None youtube_api_error = err.message skip_youtube_api = True if not youtube_api_error: if youtube_cache and val and llresponse is None: task = ("update", ("youtube", {"track": track_info})) self.append_task(ctx, *task) if isinstance(llresponse, LoadResult): track_object = llresponse.tracks elif val: result = None if should_query_global: llresponse = await self.global_cache_api.get_call(val) if llresponse: if llresponse.get("loadType") == "V2_COMPACT": llresponse["loadType"] = "V2_COMPAT" llresponse = LoadResult(llresponse) result = llresponse or None if not result: try: (result, called_api) = await self.fetch_track( ctx, player, Query.process_input(val, self.cog.local_folder_current_path), forced=forced, should_query_global=not should_query_global, ) except (RuntimeError, aiohttp.ServerDisconnectedError): lock(ctx, False) error_embed = discord.Embed( colour=await ctx.embed_colour(), title=_( "The connection was reset while loading the playlist." ), ) if notifier is not None: await notifier.update_embed(error_embed) break except asyncio.TimeoutError: lock(ctx, False) error_embed = discord.Embed( colour=await ctx.embed_colour(), title=_("Player timeout, skipping remaining tracks."), ) if notifier is not None: await notifier.update_embed(error_embed) break track_object = result.tracks else: track_object = [] else: track_object = [] if (track_count % 2 == 0) or (track_count == total_tracks): key = "lavalink" seconds = "???" second_key = None if notifier is not None: await notifier.notify_user( current=track_count, total=total_tracks, key=key, seconds_key=second_key, seconds=seconds, ) if (youtube_api_error and not global_entry) or consecutive_fails >= ( 20 if global_entry else 10 ): error_embed = discord.Embed( colour=await ctx.embed_colour(), title=_("Failing to get tracks, skipping remaining."), ) if notifier is not None: await notifier.update_embed(error_embed) if youtube_api_error: lock(ctx, False) raise SpotifyFetchError(message=youtube_api_error) break if not track_object: consecutive_fails += 1 continue consecutive_fails = 0 single_track = track_object[0] query = Query.process_input(single_track, self.cog.local_folder_current_path) if not await self.cog.is_query_allowed( self.config, ctx, f"{single_track.title} {single_track.author} {single_track.uri} {query}", query_obj=query, ): has_not_allowed = True if IS_DEBUG: log.debug(f"Query is not allowed in {ctx.guild} ({ctx.guild.id})") continue track_list.append(single_track) if enqueue: if len(player.queue) >= 10000: continue if guild_data["maxlength"] > 0: if self.cog.is_track_length_allowed(single_track, guild_data["maxlength"]): enqueued_tracks += 1 single_track.extras.update( { "enqueue_time": int(time.time()), "vc": player.channel.id, "requester": ctx.author.id, } ) player.add(ctx.author, single_track) self.bot.dispatch( "red_audio_track_enqueue", player.channel.guild, single_track, ctx.author, ) else: enqueued_tracks += 1 single_track.extras.update( { "enqueue_time": int(time.time()), "vc": player.channel.id, "requester": ctx.author.id, } ) player.add(ctx.author, single_track) self.bot.dispatch( "red_audio_track_enqueue", player.channel.guild, single_track, ctx.author, ) if not player.current: await player.play() if enqueue and tracks_from_spotify: if total_tracks > enqueued_tracks: maxlength_msg = _(" {bad_tracks} tracks cannot be queued.").format( bad_tracks=(total_tracks - enqueued_tracks) ) else: maxlength_msg = "" embed = discord.Embed( colour=await ctx.embed_colour(), title=_("Playlist Enqueued"), description=_("Added {num} tracks to the queue.{maxlength_msg}").format( num=enqueued_tracks, maxlength_msg=maxlength_msg ), ) if not guild_data["shuffle"] and queue_dur > 0: embed.set_footer( text=_( "{time} until start of playlist" " playback: starts at #{position} in queue" ).format(time=queue_total_duration, position=before_queue_length + 1) ) if notifier is not None: await notifier.update_embed(embed) lock(ctx, False) if not track_list and not has_not_allowed: raise SpotifyFetchError( message=_( "Nothing found.\nThe YouTube API key may be invalid " "or you may be rate limited on YouTube's search service.\n" "Check the YouTube API key again and follow the instructions " "at `{prefix}audioset youtubeapi`." ) ) player.maybe_shuffle() if spotify_cache: task = ("insert", ("spotify", database_entries)) self.append_task(ctx, *task) except Exception as exc: lock(ctx, False) raise exc finally: lock(ctx, False) return track_list async def fetch_youtube_query( self, ctx: commands.Context, track_info: str, current_cache_level: CacheLevel = CacheLevel.none(), ) -> Optional[str]: """Call the Youtube API and returns the youtube URL that the query matched.""" track_url = await self.youtube_api.get_call(track_info) if CacheLevel.set_youtube().is_subset(current_cache_level) and track_url: time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) task = ( "insert", ( "youtube", [ { "track_info": track_info, "track_url": track_url, "last_updated": time_now, "last_fetched": time_now, } ], ), ) self.append_task(ctx, *task) return track_url async def fetch_from_youtube_api( self, ctx: commands.Context, track_info: str ) -> Optional[str]: """Gets an YouTube URL from for the query.""" current_cache_level = CacheLevel(await self.config.cache_level()) cache_enabled = CacheLevel.set_youtube().is_subset(current_cache_level) val = None if cache_enabled: try: (val, update) = await self.local_cache_api.youtube.fetch_one({"track": track_info}) except Exception as exc: debug_exc_log(log, exc, f"Failed to fetch {track_info} from YouTube table") if val is None: try: youtube_url = await self.fetch_youtube_query( ctx, track_info, current_cache_level=current_cache_level ) except YouTubeApiError as err: youtube_url = None else: if cache_enabled: task = ("update", ("youtube", {"track": track_info})) self.append_task(ctx, *task) youtube_url = val return youtube_url async def fetch_track( self, ctx: commands.Context, player: lavalink.Player, query: Query, forced: bool = False, lazy: bool = False, should_query_global: bool = True, ) -> Tuple[LoadResult, bool]: """A replacement for :code:`lavalink.Player.load_tracks`. This will try to get a valid cached entry first if not found or if in valid it will then call the lavalink API. Parameters ---------- ctx: commands.Context The context this method is being called under. player : lavalink.Player The player who's requesting the query. query: audio_dataclasses.Query The Query object for the query in question. forced:bool Whether or not to skip cache and call API first. lazy:bool If set to True, it will not call the api if a track is not found. should_query_global:bool If the method should query the global database. Returns ------- Tuple[lavalink.LoadResult, bool] Tuple with the Load result and whether or not the API was called. """ current_cache_level = CacheLevel(await self.config.cache_level()) cache_enabled = CacheLevel.set_lavalink().is_subset(current_cache_level) val = None query = Query.process_input(query, self.cog.local_folder_current_path) query_string = str(query) globaldb_toggle = self.cog.global_api_user.get("can_read") valid_global_entry = False results = None called_api = False prefer_lyrics = await self.cog.get_lyrics_status(ctx) if prefer_lyrics and query.is_youtube and query.is_search: query_string = f"{query} - lyrics" if cache_enabled and not forced and not query.is_local: try: (val, last_updated) = await self.local_cache_api.lavalink.fetch_one( {"query": query_string} ) except Exception as exc: debug_exc_log(log, exc, f"Failed to fetch '{query_string}' from Lavalink table") if val and isinstance(val, dict): if IS_DEBUG: log.debug(f"Updating Local Database with {query_string}") task = ("update", ("lavalink", {"query": query_string})) self.append_task(ctx, *task) else: val = None if val and not forced and isinstance(val, dict): valid_global_entry = False called_api = False else: val = None if ( globaldb_toggle and not val and should_query_global and not forced and not query.is_local and not query.is_spotify ): valid_global_entry = False with contextlib.suppress(Exception): global_entry = await self.global_cache_api.get_call(query=query) if global_entry.get("loadType") == "V2_COMPACT": global_entry["loadType"] = "V2_COMPAT" results = LoadResult(global_entry) if results.load_type in [ LoadType.PLAYLIST_LOADED, LoadType.TRACK_LOADED, LoadType.SEARCH_RESULT, LoadType.V2_COMPAT, ]: valid_global_entry = True if valid_global_entry: if IS_DEBUG: log.debug(f"Querying Global DB api for {query}") results, called_api = results, False if valid_global_entry: pass elif lazy is True: called_api = False elif val and not forced and isinstance(val, dict): data = val data["query"] = query_string if data.get("loadType") == "V2_COMPACT": data["loadType"] = "V2_COMPAT" results = LoadResult(data) called_api = False if results.has_error: # If cached value has an invalid entry make a new call so that it gets updated results, called_api = await self.fetch_track(ctx, player, query, forced=True) valid_global_entry = False else: if IS_DEBUG: log.debug(f"Querying Lavalink api for {query_string}") called_api = True try: results = await player.load_tracks(query_string) except KeyError: results = None except RuntimeError: raise TrackEnqueueError if results is None: results = LoadResult({"loadType": "LOAD_FAILED", "playlistInfo": {}, "tracks": []}) valid_global_entry = False update_global = ( globaldb_toggle and not valid_global_entry and self.global_cache_api.has_api_key ) with contextlib.suppress(Exception): if ( update_global and not query.is_local and not results.has_error and len(results.tracks) >= 1 ): global_task = ("global", dict(llresponse=results, query=query)) self.append_task(ctx, *global_task) if ( cache_enabled and results.load_type and not results.has_error and not query.is_local and results.tracks ): try: time_now = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) data = json.dumps(results._raw) if all(k in data for k in ["loadType", "playlistInfo", "isSeekable", "isStream"]): task = ( "insert", ( "lavalink", [ { "query": query_string, "data": data, "last_updated": time_now, "last_fetched": time_now, } ], ), ) self.append_task(ctx, *task) except Exception as exc: debug_exc_log( log, exc, f"Failed to enqueue write task for '{query_string}' to Lavalink table", ) return results, called_api async def autoplay(self, player: lavalink.Player, playlist_api: PlaylistWrapper): """Enqueue a random track.""" autoplaylist = await self.config.guild(player.channel.guild).autoplaylist() current_cache_level = CacheLevel(await self.config.cache_level()) cache_enabled = CacheLevel.set_lavalink().is_subset(current_cache_level) notify_channel_id = player.fetch("channel") playlist = None tracks = None if autoplaylist["enabled"]: try: playlist = await get_playlist( autoplaylist["id"], autoplaylist["scope"], self.bot, playlist_api, player.channel.guild, player.channel.guild.me, ) tracks = playlist.tracks_obj except Exception as exc: debug_exc_log(log, exc, "Failed to fetch playlist for autoplay") if not tracks or not getattr(playlist, "tracks", None): if cache_enabled: track = await self.get_random_track_from_db() tracks = [] if not track else [track] if not tracks: ctx = namedtuple("Context", "message guild cog") (results, called_api) = await self.fetch_track( cast( commands.Context, ctx(player.channel.guild, player.channel.guild, self.cog) ), player, Query.process_input(_TOP_100_US, self.cog.local_folder_current_path), ) tracks = list(results.tracks) if tracks: multiple = len(tracks) > 1 valid = not multiple tries = len(tracks) track = tracks[0] while valid is False and multiple: tries -= 1 if tries <= 0: raise DatabaseError("No valid entry found") track = random.choice(tracks) query = Query.process_input(track, self.cog.local_folder_current_path) await asyncio.sleep(0.001) if (not query.valid) or ( query.is_local and query.local_track_path is not None and not query.local_track_path.exists() ): continue notify_channel = self.bot.get_channel(notify_channel_id) if not await self.cog.is_query_allowed( self.config, notify_channel, f"{track.title} {track.author} {track.uri} {query}", query_obj=query, ): if IS_DEBUG: log.debug( "Query is not allowed in " f"{player.channel.guild} ({player.channel.guild.id})" ) continue valid = True track.extras.update( { "autoplay": True, "enqueue_time": int(time.time()), "vc": player.channel.id, "requester": player.channel.guild.me.id, } ) player.add(player.channel.guild.me, track) self.bot.dispatch( "red_audio_track_auto_play", player.channel.guild, track, player.channel.guild.me, player, ) if notify_channel_id: await self.config.guild_from_id( guild_id=player.channel.guild.id ).currently_auto_playing_in.set([notify_channel_id, player.channel.id]) else: await self.config.guild_from_id( guild_id=player.channel.guild.id ).currently_auto_playing_in.set([]) if not player.current: await player.play() async def fetch_all_contribute(self) -> List[LavalinkCacheFetchForGlobalResult]: return await self.local_cache_api.lavalink.fetch_all_for_global()