[Streams] Significantly reduce number the quota usage for YouTube Data api (#3237)

* chore

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* Pre-tests

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* *sigh*

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* Streams + black formatting

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* *sigh* lets not spam the logs shall we

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* lets be extra sure

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* chore

Signed-off-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>

* [Streams] Fix Twitch token for streamalert (#2)

* [Streams] Fix Twitch token for streamalert

* [Streams] Fix Twitch token for streamalert

Co-authored-by: PredaaA <46051820+PredaaA@users.noreply.github.com>
This commit is contained in:
Draper 2020-02-15 05:06:03 +00:00 committed by GitHub
parent a763726c89
commit 04b5a5f9ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 37 deletions

View File

@ -0,0 +1 @@
``[p]remove`` command now accepts an URL or Index, if an URL is used it will remove all tracks in the queue with that URL.

View File

@ -0,0 +1 @@
Added ``[p]streamset timer`` command, which can be used to control how often the cog checks for livestreams.

View File

@ -0,0 +1 @@
Changed the YouTube streams logic to use an RSS instead of the search endpoint, significantly reducing quota usage.

View File

@ -3,5 +3,4 @@ from .streams import Streams
async def setup(bot): async def setup(bot):
cog = Streams(bot) cog = Streams(bot)
await cog.initialize()
bot.add_cog(cog) bot.add_cog(cog)

View File

@ -25,10 +25,7 @@ from . import streamtypes as _streamtypes
from collections import defaultdict from collections import defaultdict
import asyncio import asyncio
import re import re
from typing import Optional, List, Tuple from typing import Optional, List, Tuple, Union
CHECK_DELAY = 60
_ = Translator("Streams", __file__) _ = Translator("Streams", __file__)
@ -36,7 +33,7 @@ _ = Translator("Streams", __file__)
@cog_i18n(_) @cog_i18n(_)
class Streams(commands.Cog): class Streams(commands.Cog):
global_defaults = {"tokens": {}, "streams": []} global_defaults = {"refresh_timer": 300, "tokens": {}, "streams": []}
guild_defaults = { guild_defaults = {
"autodelete": False, "autodelete": False,
@ -51,12 +48,10 @@ class Streams(commands.Cog):
def __init__(self, bot: Red): def __init__(self, bot: Red):
super().__init__() super().__init__()
self.db = Config.get_conf(self, 26262626) self.db: Config = Config.get_conf(self, 26262626)
self.db.register_global(**self.global_defaults) self.db.register_global(**self.global_defaults)
self.db.register_guild(**self.guild_defaults) self.db.register_guild(**self.guild_defaults)
self.db.register_role(**self.role_defaults) self.db.register_role(**self.role_defaults)
self.bot: Red = bot self.bot: Red = bot
@ -66,7 +61,10 @@ class Streams(commands.Cog):
self.yt_cid_pattern = re.compile("^UC[-_A-Za-z0-9]{21}[AQgw]$") self.yt_cid_pattern = re.compile("^UC[-_A-Za-z0-9]{21}[AQgw]$")
def check_name_or_id(self, data: str): self._ready_event: asyncio.Event = asyncio.Event()
self._init_task: asyncio.Task = self.bot.loop.create_task(self.initialize())
def check_name_or_id(self, data: str) -> bool:
matched = self.yt_cid_pattern.fullmatch(data) matched = self.yt_cid_pattern.fullmatch(data)
if matched is None: if matched is None:
return True return True
@ -74,12 +72,17 @@ class Streams(commands.Cog):
async def initialize(self) -> None: async def initialize(self) -> None:
"""Should be called straight after cog instantiation.""" """Should be called straight after cog instantiation."""
await self.bot.wait_until_ready()
await self.move_api_keys() await self.move_api_keys()
self.streams = await self.load_streams() self.streams = await self.load_streams()
self.task = self.bot.loop.create_task(self._stream_alerts()) self.task = self.bot.loop.create_task(self._stream_alerts())
self._ready_event.set()
async def move_api_keys(self): async def cog_before_invoke(self, ctx: commands.Context):
await self._ready_event.wait()
async def move_api_keys(self) -> None:
"""Move the API keys from cog stored config to core bot config if they exist.""" """Move the API keys from cog stored config to core bot config if they exist."""
tokens = await self.db.tokens() tokens = await self.db.tokens()
youtube = await self.bot.get_shared_api_tokens("youtube") youtube = await self.bot.get_shared_api_tokens("youtube")
@ -100,8 +103,11 @@ class Streams(commands.Cog):
await self.check_online(ctx, stream) await self.check_online(ctx, stream)
@commands.command() @commands.command()
@commands.cooldown(1, 30, commands.BucketType.guild)
async def youtubestream(self, ctx: commands.Context, channel_id_or_name: str): async def youtubestream(self, ctx: commands.Context, channel_id_or_name: str):
"""Check if a YouTube channel is live.""" """Check if a YouTube channel is live."""
# TODO: Write up a custom check to look up cooldown set by botowner
# This check is here to avoid people spamming this command and eating up quota
apikey = await self.bot.get_shared_api_tokens("youtube") apikey = await self.bot.get_shared_api_tokens("youtube")
is_name = self.check_name_or_id(channel_id_or_name) is_name = self.check_name_or_id(channel_id_or_name)
if is_name: if is_name:
@ -128,7 +134,11 @@ class Streams(commands.Cog):
stream = PicartoStream(name=channel_name) stream = PicartoStream(name=channel_name)
await self.check_online(ctx, stream) await self.check_online(ctx, stream)
async def check_online(self, ctx: commands.Context, stream): async def check_online(
self,
ctx: commands.Context,
stream: Union[PicartoStream, MixerStream, HitboxStream, YoutubeStream, TwitchStream],
):
try: try:
info = await stream.is_online() info = await stream.is_online()
except OfflineStream: except OfflineStream:
@ -318,6 +328,18 @@ class Streams(commands.Cog):
"""Set tokens for accessing streams.""" """Set tokens for accessing streams."""
pass pass
@streamset.command(name="timer")
@checks.is_owner()
async def _streamset_refresh_timer(self, ctx: commands.Context, refresh_time: int):
"""Set stream check refresh time."""
if refresh_time < 60:
return await ctx.send(_("You cannot set the refresh timer to less than 60 seconds"))
await self.db.refresh_timer.set(refresh_time)
await ctx.send(
_("Refresh timer set to {refresh_time} seconds".format(refresh_time=refresh_time))
)
@streamset.command() @streamset.command()
@checks.is_owner() @checks.is_owner()
async def twitchtoken(self, ctx: commands.Context): async def twitchtoken(self, ctx: commands.Context):
@ -546,7 +568,7 @@ class Streams(commands.Cog):
await self.check_streams() await self.check_streams()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
await asyncio.sleep(CHECK_DELAY) await asyncio.sleep(await self.db.refresh_timer())
async def check_streams(self): async def check_streams(self):
for stream in self.streams: for stream in self.streams:
@ -656,6 +678,9 @@ class Streams(commands.Cog):
raw_stream["_messages_cache"].append(msg) raw_stream["_messages_cache"].append(msg)
token = await self.bot.get_shared_api_tokens(_class.token_name) token = await self.bot.get_shared_api_tokens(_class.token_name)
if token: if token:
if _class.__name__ == "TwitchStream":
raw_stream["token"] = token.get("client_id")
else:
raw_stream["token"] = token raw_stream["token"] = token
streams.append(_class(**raw_stream)) streams.append(_class(**raw_stream))

View File

@ -1,3 +1,13 @@
import json
import logging
import xml.etree.ElementTree as ET
from random import choice
from string import ascii_letters
from typing import ClassVar, Optional, List
import aiohttp
import discord
from .errors import ( from .errors import (
StreamNotFound, StreamNotFound,
APIError, APIError,
@ -6,12 +16,6 @@ from .errors import (
InvalidTwitchCredentials, InvalidTwitchCredentials,
) )
from redbot.core.i18n import Translator from redbot.core.i18n import Translator
from random import choice, sample
from string import ascii_letters
from typing import ClassVar, Optional
import discord
import aiohttp
import json
TWITCH_BASE_URL = "https://api.twitch.tv" TWITCH_BASE_URL = "https://api.twitch.tv"
TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/kraken/users?login=" TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/kraken/users?login="
@ -22,13 +26,24 @@ YOUTUBE_BASE_URL = "https://www.googleapis.com/youtube/v3"
YOUTUBE_CHANNELS_ENDPOINT = YOUTUBE_BASE_URL + "/channels" YOUTUBE_CHANNELS_ENDPOINT = YOUTUBE_BASE_URL + "/channels"
YOUTUBE_SEARCH_ENDPOINT = YOUTUBE_BASE_URL + "/search" YOUTUBE_SEARCH_ENDPOINT = YOUTUBE_BASE_URL + "/search"
YOUTUBE_VIDEOS_ENDPOINT = YOUTUBE_BASE_URL + "/videos" YOUTUBE_VIDEOS_ENDPOINT = YOUTUBE_BASE_URL + "/videos"
YOUTUBE_CHANNEL_RSS = "https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
_ = Translator("Streams", __file__) _ = Translator("Streams", __file__)
log = logging.getLogger("redbot.cogs.Streams")
def rnd(url): def rnd(url):
"""Appends a random parameter to the url to avoid Discord's caching""" """Appends a random parameter to the url to avoid Discord's caching"""
return url + "?rnd=" + "".join([choice(ascii_letters) for i in range(6)]) return url + "?rnd=" + "".join([choice(ascii_letters) for _loop_counter in range(6)])
def get_video_ids_from_feed(feed):
root = ET.fromstring(feed)
rss_video_ids = []
for child in root.iter("{http://www.w3.org/2005/Atom}entry"):
for i in child.iter("{http://www.youtube.com/xml/schemas/2015}videoId"):
yield i.text
class Stream: class Stream:
@ -69,6 +84,9 @@ class YoutubeStream(Stream):
def __init__(self, **kwargs): def __init__(self, **kwargs):
self.id = kwargs.pop("id", None) self.id = kwargs.pop("id", None)
self._token = kwargs.pop("token", None) self._token = kwargs.pop("token", None)
self.not_livestreams: List[str] = []
self.livestreams: List[str] = []
super().__init__(**kwargs) super().__init__(**kwargs)
async def is_online(self): async def is_online(self):
@ -80,26 +98,55 @@ class YoutubeStream(Stream):
elif not self.name: elif not self.name:
self.name = await self.fetch_name() self.name = await self.fetch_name()
url = YOUTUBE_SEARCH_ENDPOINT async with aiohttp.ClientSession() as session:
async with session.get(YOUTUBE_CHANNEL_RSS.format(channel_id=self.id)) as r:
rssdata = await r.text()
if self.not_livestreams:
self.not_livestreams = list(dict.fromkeys(self.not_livestreams))
if self.livestreams:
self.livestreams = list(dict.fromkeys(self.livestreams))
for video_id in get_video_ids_from_feed(rssdata):
if video_id in self.not_livestreams:
log.debug(f"video_id in not_livestreams: {video_id}")
continue
log.debug(f"video_id not in not_livestreams: {video_id}")
params = { params = {
"key": self._token["api_key"], "key": self._token["api_key"],
"part": "snippet", "id": video_id,
"channelId": self.id, "part": "id,liveStreamingDetails",
"type": "video",
"eventType": "live",
} }
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as r: async with session.get(YOUTUBE_VIDEOS_ENDPOINT, params=params) as r:
data = await r.json() data = await r.json()
if "items" in data and len(data["items"]) == 0: stream_data = data.get("items", [{}])[0].get("liveStreamingDetails", {})
raise OfflineStream() log.debug(f"stream_data for {video_id}: {stream_data}")
elif "items" in data: if (
vid_id = data["items"][0]["id"]["videoId"] stream_data
params = {"key": self._token["api_key"], "id": vid_id, "part": "snippet"} and stream_data != "None"
and stream_data.get("actualEndTime", None) is None
and stream_data.get("concurrentViewers", None) is not None
):
if video_id not in self.livestreams:
self.livestreams.append(data["items"][0]["id"])
else:
self.not_livestreams.append(data["items"][0]["id"])
if video_id in self.livestreams:
self.livestreams.remove(video_id)
log.debug(f"livestreams for {self.name}: {self.livestreams}")
log.debug(f"not_livestreams for {self.name}: {self.not_livestreams}")
# This is technically redundant since we have the
# info from the RSS ... but incase you dont wanna deal with fully rewritting the
# code for this part, as this is only a 2 quota query.
if self.livestreams:
params = {"key": self._token["api_key"], "id": self.livestreams[-1], "part": "snippet"}
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(YOUTUBE_VIDEOS_ENDPOINT, params=params) as r: async with session.get(YOUTUBE_VIDEOS_ENDPOINT, params=params) as r:
data = await r.json() data = await r.json()
return self.make_embed(data) return self.make_embed(data)
raise OfflineStream()
def make_embed(self, data): def make_embed(self, data):
vid_data = data["items"][0] vid_data = data["items"][0]
@ -162,7 +209,7 @@ class TwitchStream(Stream):
self.id = await self.fetch_id() self.id = await self.fetch_id()
url = TWITCH_STREAMS_ENDPOINT + self.id url = TWITCH_STREAMS_ENDPOINT + self.id
header = {"Client-ID": str(self._token), "Accept": "application/vnd.twitchtv.v5+json"} header = {"Client-ID": str(self._token)}
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url, headers=header) as r: async with session.get(url, headers=header) as r: