[Streams] Handle Twitch ratelimits (#4883)

* [Streams] Handle Twitch ratelimits.

* Also handle possible API or client errors.

* Add Jack's changes request.

* Update redbot/cogs/streams/streamtypes.py

* Update redbot/cogs/streams/streamtypes.py

* Update redbot/cogs/streams/streamtypes.py

* Update redbot/cogs/streams/streamtypes.py

Co-authored-by: jack1142 <6032823+jack1142@users.noreply.github.com>
This commit is contained in:
PredaaA 2021-04-05 16:23:04 +02:00 committed by GitHub
parent 6e764c7e04
commit 5ddc6d2ea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,6 +1,8 @@
import asyncio
import contextlib import contextlib
import json import json
import logging import logging
import time
from dateutil.parser import parse as parse_time from dateutil.parser import parse as parse_time
from random import choice from random import choice
from string import ascii_letters from string import ascii_letters
@ -282,22 +284,67 @@ class TwitchStream(Stream):
self.id = kwargs.pop("id", None) self.id = kwargs.pop("id", None)
self._client_id = kwargs.pop("token", None) self._client_id = kwargs.pop("token", None)
self._bearer = kwargs.pop("bearer", None) self._bearer = kwargs.pop("bearer", None)
self._rate_limit_resets: set = set()
self._rate_limit_remaining: int = 0
super().__init__(**kwargs) super().__init__(**kwargs)
async def wait_for_rate_limit_reset(self) -> None:
"""Check rate limits in response header and ensure we're following them.
From python-twitch-client and adaptated to asyncio from Trusty-cogs:
https://github.com/tsifrer/python-twitch-client/blob/master/twitch/helix/base.py
https://github.com/TrustyJAID/Trusty-cogs/blob/master/twitch/twitch_api.py
"""
current_time = int(time.time())
self._rate_limit_resets = {x for x in self._rate_limit_resets if x > current_time}
if self._rate_limit_remaining == 0:
if self._rate_limit_resets:
reset_time = next(iter(self._rate_limit_resets))
# Calculate wait time and add 0.1s to the wait time to allow Twitch to reset
# their counter
wait_time = reset_time - current_time + 0.1
await asyncio.sleep(wait_time)
async def get_data(self, url: str, params: dict = {}) -> Tuple[Optional[int], dict]:
header = {"Client-ID": str(self._client_id)}
if self._bearer is not None:
header["Authorization"] = f"Bearer {self._bearer}"
await self.wait_for_rate_limit_reset()
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=header, params=params, timeout=60) as resp:
remaining = resp.headers.get("Ratelimit-Remaining")
if remaining:
self._rate_limit_remaining = int(remaining)
reset = resp.headers.get("Ratelimit-Reset")
if reset:
self._rate_limit_resets.add(int(reset))
if resp.status == 429:
log.info(
"Ratelimited. Trying again at %s.", datetime.fromtimestamp(int(reset))
)
resp.release()
return await self.get_data(url)
if resp.status != 200:
return resp.status, {}
return resp.status, await resp.json(encoding="utf-8")
except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as exc:
log.warning("Connection error occurred when fetching Twitch stream", exc_info=exc)
return None, {}
async def is_online(self): async def is_online(self):
if not self.id: if not self.id:
self.id = await self.fetch_id() self.id = await self.fetch_id()
url = TWITCH_STREAMS_ENDPOINT url = TWITCH_STREAMS_ENDPOINT
header = {"Client-ID": str(self._client_id)}
if self._bearer is not None:
header = {**header, "Authorization": f"Bearer {self._bearer}"}
params = {"user_id": self.id} params = {"user_id": self.id}
async with aiohttp.ClientSession() as session: code, data = await self.get_data(url, params)
async with session.get(url, headers=header, params=params) as r: if code == 200:
data = await r.json(encoding="utf-8")
if r.status == 200:
if not data["data"]: if not data["data"]:
raise OfflineStream() raise OfflineStream()
self.name = data["data"][0]["user_name"] self.name = data["data"][0]["user_name"]
@ -310,31 +357,22 @@ class TwitchStream(Stream):
game_id = data["game_id"] game_id = data["game_id"]
if game_id: if game_id:
params = {"id": game_id} __, game_data = await self.get_data(
async with aiohttp.ClientSession() as session: "https://api.twitch.tv/helix/games", {"id": game_id}
async with session.get( )
"https://api.twitch.tv/helix/games", headers=header, params=params
) as r:
game_data = await r.json(encoding="utf-8")
if game_data: if game_data:
game_data = game_data["data"][0] game_data = game_data["data"][0]
data["game_name"] = game_data["name"] data["game_name"] = game_data["name"]
params = {"to_id": self.id} __, user_data = await self.get_data(
async with aiohttp.ClientSession() as session: "https://api.twitch.tv/helix/users/follows", {"to_id": self.id}
async with session.get( )
"https://api.twitch.tv/helix/users/follows", headers=header, params=params
) as r:
user_data = await r.json(encoding="utf-8")
if user_data: if user_data:
followers = user_data["total"] followers = user_data["total"]
data["followers"] = followers data["followers"] = followers
params = {"id": self.id} __, user_profile_data = await self.get_data(
async with aiohttp.ClientSession() as session: "https://api.twitch.tv/helix/users", {"id": self.id}
async with session.get( )
"https://api.twitch.tv/helix/users", headers=header, params=params
) as r:
user_profile_data = await r.json(encoding="utf-8")
if user_profile_data: if user_profile_data:
profile_image_url = user_profile_data["data"][0]["profile_image_url"] profile_image_url = user_profile_data["data"][0]["profile_image_url"]
data["profile_image_url"] = profile_image_url data["profile_image_url"] = profile_image_url
@ -343,9 +381,9 @@ class TwitchStream(Stream):
is_rerun = False is_rerun = False
return self.make_embed(data), is_rerun return self.make_embed(data), is_rerun
elif r.status == 400: elif code == 400:
raise InvalidTwitchCredentials() raise InvalidTwitchCredentials()
elif r.status == 404: elif code == 404:
raise StreamNotFound() raise StreamNotFound()
else: else:
raise APIError(data) raise APIError(data)
@ -357,17 +395,15 @@ class TwitchStream(Stream):
url = TWITCH_ID_ENDPOINT url = TWITCH_ID_ENDPOINT
params = {"login": self.name} params = {"login": self.name}
async with aiohttp.ClientSession() as session: status, data = await self.get_data(url, params)
async with session.get(url, headers=header, params=params) as r:
data = await r.json()
if r.status == 200: if status == 200:
if not data["data"]: if not data["data"]:
raise StreamNotFound() raise StreamNotFound()
return data["data"][0]["id"] return data["data"][0]["id"]
elif r.status == 400: elif status == 400:
raise StreamNotFound() raise StreamNotFound()
elif r.status == 401: elif status == 401:
raise InvalidTwitchCredentials() raise InvalidTwitchCredentials()
else: else:
raise APIError(data) raise APIError(data)