diff --git a/redbot/cogs/streams/streamtypes.py b/redbot/cogs/streams/streamtypes.py index 0847b00ba..3e5ae817a 100644 --- a/redbot/cogs/streams/streamtypes.py +++ b/redbot/cogs/streams/streamtypes.py @@ -1,6 +1,8 @@ +import asyncio import contextlib import json import logging +import time from dateutil.parser import parse as parse_time from random import choice from string import ascii_letters @@ -282,22 +284,67 @@ class TwitchStream(Stream): self.id = kwargs.pop("id", None) self._client_id = kwargs.pop("token", None) self._bearer = kwargs.pop("bearer", None) + self._rate_limit_resets: set = set() + self._rate_limit_remaining: int = 0 super().__init__(**kwargs) + async def wait_for_rate_limit_reset(self) -> None: + """Check rate limits in response header and ensure we're following them. + + From python-twitch-client and adaptated to asyncio from Trusty-cogs: + https://github.com/tsifrer/python-twitch-client/blob/master/twitch/helix/base.py + https://github.com/TrustyJAID/Trusty-cogs/blob/master/twitch/twitch_api.py + """ + current_time = int(time.time()) + self._rate_limit_resets = {x for x in self._rate_limit_resets if x > current_time} + if self._rate_limit_remaining == 0: + + if self._rate_limit_resets: + reset_time = next(iter(self._rate_limit_resets)) + # Calculate wait time and add 0.1s to the wait time to allow Twitch to reset + # their counter + wait_time = reset_time - current_time + 0.1 + await asyncio.sleep(wait_time) + + async def get_data(self, url: str, params: dict = {}) -> Tuple[Optional[int], dict]: + header = {"Client-ID": str(self._client_id)} + if self._bearer is not None: + header["Authorization"] = f"Bearer {self._bearer}" + await self.wait_for_rate_limit_reset() + async with aiohttp.ClientSession() as session: + try: + async with session.get(url, headers=header, params=params, timeout=60) as resp: + remaining = resp.headers.get("Ratelimit-Remaining") + if remaining: + self._rate_limit_remaining = int(remaining) + reset = resp.headers.get("Ratelimit-Reset") + if reset: + self._rate_limit_resets.add(int(reset)) + + if resp.status == 429: + log.info( + "Ratelimited. Trying again at %s.", datetime.fromtimestamp(int(reset)) + ) + resp.release() + return await self.get_data(url) + + if resp.status != 200: + return resp.status, {} + + return resp.status, await resp.json(encoding="utf-8") + except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as exc: + log.warning("Connection error occurred when fetching Twitch stream", exc_info=exc) + return None, {} + async def is_online(self): if not self.id: self.id = await self.fetch_id() url = TWITCH_STREAMS_ENDPOINT - header = {"Client-ID": str(self._client_id)} - if self._bearer is not None: - header = {**header, "Authorization": f"Bearer {self._bearer}"} params = {"user_id": self.id} - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=header, params=params) as r: - data = await r.json(encoding="utf-8") - if r.status == 200: + code, data = await self.get_data(url, params) + if code == 200: if not data["data"]: raise OfflineStream() self.name = data["data"][0]["user_name"] @@ -310,31 +357,22 @@ class TwitchStream(Stream): game_id = data["game_id"] if game_id: - params = {"id": game_id} - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.twitch.tv/helix/games", headers=header, params=params - ) as r: - game_data = await r.json(encoding="utf-8") + __, game_data = await self.get_data( + "https://api.twitch.tv/helix/games", {"id": game_id} + ) if game_data: game_data = game_data["data"][0] data["game_name"] = game_data["name"] - params = {"to_id": self.id} - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.twitch.tv/helix/users/follows", headers=header, params=params - ) as r: - user_data = await r.json(encoding="utf-8") + __, user_data = await self.get_data( + "https://api.twitch.tv/helix/users/follows", {"to_id": self.id} + ) if user_data: followers = user_data["total"] data["followers"] = followers - params = {"id": self.id} - async with aiohttp.ClientSession() as session: - async with session.get( - "https://api.twitch.tv/helix/users", headers=header, params=params - ) as r: - user_profile_data = await r.json(encoding="utf-8") + __, user_profile_data = await self.get_data( + "https://api.twitch.tv/helix/users", {"id": self.id} + ) if user_profile_data: profile_image_url = user_profile_data["data"][0]["profile_image_url"] data["profile_image_url"] = profile_image_url @@ -343,9 +381,9 @@ class TwitchStream(Stream): is_rerun = False return self.make_embed(data), is_rerun - elif r.status == 400: + elif code == 400: raise InvalidTwitchCredentials() - elif r.status == 404: + elif code == 404: raise StreamNotFound() else: raise APIError(data) @@ -357,17 +395,15 @@ class TwitchStream(Stream): url = TWITCH_ID_ENDPOINT params = {"login": self.name} - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=header, params=params) as r: - data = await r.json() + status, data = await self.get_data(url, params) - if r.status == 200: + if status == 200: if not data["data"]: raise StreamNotFound() return data["data"][0]["id"] - elif r.status == 400: + elif status == 400: raise StreamNotFound() - elif r.status == 401: + elif status == 401: raise InvalidTwitchCredentials() else: raise APIError(data)