[Streams] Add kick.com streams support (#6547)

Co-authored-by: palmtree5 <3577255+palmtree5@users.noreply.github.com>
This commit is contained in:
Predä
2026-05-17 07:18:40 +02:00
committed by GitHub
parent de11f52c41
commit 5659bad5d8
3 changed files with 282 additions and 16 deletions
+4
View File
@@ -27,6 +27,10 @@ class InvalidYoutubeCredentials(StreamsError):
pass
class InvalidKickCredentials(StreamsError):
pass
class YoutubeQuotaExceeded(StreamsError):
pass
+159 -8
View File
@@ -1,3 +1,4 @@
from operator import is_
import discord
from redbot.core.utils.chat_formatting import humanize_list
from redbot.core.bot import Red
@@ -7,6 +8,7 @@ from redbot.core.utils._internal_utils import send_to_owners_with_prefix_replace
from redbot.core.utils.chat_formatting import escape, inline, pagify
from .streamtypes import (
KickStream,
PicartoStream,
Stream,
TwitchStream,
@@ -14,6 +16,7 @@ from .streamtypes import (
)
from .errors import (
APIError,
InvalidKickCredentials,
InvalidTwitchCredentials,
InvalidYoutubeCredentials,
OfflineStream,
@@ -51,6 +54,7 @@ class Streams(commands.Cog):
"tokens": {},
"streams": [],
"notified_owner_missing_twitch_secret": False,
"notified_owner_missing_kick_secret": False,
}
guild_defaults = {
@@ -70,6 +74,7 @@ class Streams(commands.Cog):
super().__init__()
self.config: Config = Config.get_conf(self, 26262626)
self.ttv_bearer_cache: dict = {}
self.kick_bearer_cache: dict = {}
self.config.register_global(**self.global_defaults)
self.config.register_guild(**self.guild_defaults)
self.config.register_role(**self.role_defaults)
@@ -105,6 +110,8 @@ class Streams(commands.Cog):
async def on_red_api_tokens_update(self, service_name, api_tokens):
if service_name == "twitch":
await self.get_twitch_bearer_token(api_tokens)
elif service_name == "kick":
await self.get_kick_bearer_token(api_tokens)
async def move_api_keys(self) -> None:
"""Move the API keys from cog stored config to core bot config if they exist."""
@@ -126,7 +133,7 @@ class Streams(commands.Cog):
"1. Go to this page: {link}.\n"
'2. Click "Manage" on your application.\n'
'3. Click on "New secret".\n'
"5. Copy your client ID and your client secret into:\n"
"4. Copy your client ID and your client secret into:\n"
"{command}"
"\n\n"
"Note: These tokens are sensitive and should only be used in a private channel "
@@ -142,6 +149,28 @@ class Streams(commands.Cog):
await send_to_owners_with_prefix_replaced(self.bot, message)
await self.config.notified_owner_missing_twitch_secret.set(True)
async def _notify_owner_about_missing_kick_secret(self) -> None:
message = _(
"You need a client secret key if you want to use the Kick API on this cog.\n"
"Follow these steps:\n"
"1. Go to this page: {link}.\n"
'2. Click "Manage" on your application.\n'
"3. Copy your client ID and your client secret into:\n"
"{command}"
"\n\n"
"Note: These tokens are sensitive and should only be used in a private channel "
"or in DM with the bot."
).format(
link="https://kick.com/settings/developer",
command=inline(
"[p]set api kick client_id {} client_secret {}".format(
_("<your_client_id_here>"), _("<your_client_secret_here>")
)
),
)
await send_to_owners_with_prefix_replaced(self.bot, message)
await self.config.notified_owner_missing_kick_secret.set(True)
async def get_twitch_bearer_token(self, api_tokens: Optional[Dict] = None) -> None:
tokens = (
await self.bot.get_shared_api_tokens("twitch") if api_tokens is None else api_tokens
@@ -198,10 +227,65 @@ class Streams(commands.Cog):
self.ttv_bearer_cache["expires_at"] = datetime.now().timestamp() + data.get("expires_in")
async def maybe_renew_twitch_bearer_token(self) -> None:
if self.ttv_bearer_cache:
if self.ttv_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60:
if (
self.ttv_bearer_cache
and self.ttv_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60
):
await self.get_twitch_bearer_token()
async def get_kick_bearer_token(self, api_tokens: Optional[Dict] = None) -> None:
tokens = await self.bot.get_shared_api_tokens("kick") if api_tokens is None else api_tokens
if tokens.get("client_id"):
notified_owner_missing_kick_secret = (
await self.config.notified_owner_missing_kick_secret()
)
try:
tokens["client_secret"]
if notified_owner_missing_kick_secret is True:
await self.config.notified_owner_missing_kick_secret.set(False)
except KeyError:
if notified_owner_missing_kick_secret is False:
asyncio.create_task(self._notify_owner_about_missing_kick_secret())
async with aiohttp.ClientSession() as session:
async with session.post(
"https://id.kick.com/oauth/token",
params={
"client_id": tokens.get("client_id", ""),
"client_secret": tokens.get("client_secret", ""),
"grant_type": "client_credentials",
},
) as req:
try:
data = await req.json()
except aiohttp.ContentTypeError:
data = {}
if req.status == 200:
pass
elif req.status == 401 and data.get("error") == "invalid_client":
log.error("Kick API request failed authentication: set Client ID is invalid.")
elif "error" in data:
log.error(
"Kick OAuth2 API request failed with status code %s and error message: %s",
req.status,
data["error"],
)
else:
log.error("Kick OAuth2 API request failed with status code %s", req.status)
if req.status != 200:
return
self.kick_bearer_cache = data
self.kick_bearer_cache["expires_at"] = datetime.now().timestamp() + data.get("expires_in")
async def maybe_renew_kick_token(self) -> None:
if (
self.kick_bearer_cache
and self.kick_bearer_cache["expires_at"] - datetime.now().timestamp() <= 60
):
await self.get_kick_bearer_token()
@commands.guild_only()
@commands.command()
async def twitchstream(self, ctx: commands.Context, channel_name: str):
@@ -242,10 +326,19 @@ class Streams(commands.Cog):
stream = PicartoStream(_bot=self.bot, name=channel_name)
await self.check_online(ctx, stream)
@commands.guild_only()
@commands.command()
async def kickstream(self, ctx: commands.Context, channel_name: str):
"""Check if a Kick channel is live."""
await self.maybe_renew_kick_token()
token = self.kick_bearer_cache.get("access_token")
stream = _streamtypes.KickStream(_bot=self.bot, name=channel_name, token=token)
await self.check_online(ctx, stream)
async def check_online(
self,
ctx: commands.Context,
stream: Union[PicartoStream, YoutubeStream, TwitchStream],
stream: Union[PicartoStream, YoutubeStream, TwitchStream, KickStream],
):
try:
info = await stream.is_online()
@@ -265,6 +358,12 @@ class Streams(commands.Cog):
"The YouTube API key is either invalid or has not been set. See {command}."
).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey"))
)
except InvalidKickCredentials:
await ctx.send(
_("The Kick API key is either invalid or has not been set. See {command}.").format(
command=inline(f"{ctx.clean_prefix}streamset kicktoken")
)
)
except YoutubeQuotaExceeded:
await ctx.send(
_(
@@ -363,6 +462,18 @@ class Streams(commands.Cog):
"""Toggle alerts in this channel for a Picarto stream."""
await self.stream_alert(ctx, PicartoStream, channel_name, discord_channel)
@streamalert.command(name="kick")
async def kick_alert(
self,
ctx: commands.Context,
channel_name: str,
discord_channel: Union[
discord.TextChannel, discord.VoiceChannel, discord.StageChannel
] = commands.CurrentChannel,
):
"""Toggle alerts in this channel for a Kick stream."""
await self.stream_alert(ctx, KickStream, channel_name, discord_channel)
@streamalert.command(name="stop", usage="[disable_all=No]")
async def streamalert_stop(self, ctx: commands.Context, _all: bool = False):
"""Disable all stream alerts in this channel or server.
@@ -435,6 +546,7 @@ class Streams(commands.Cog):
token = await self.bot.get_shared_api_tokens(_class.token_name)
is_yt = _class.__name__ == "YoutubeStream"
is_twitch = _class.__name__ == "TwitchStream"
is_kick = _class.__name__ == "KickStream"
if is_yt and not self.check_name_or_id(channel_name):
stream = _class(_bot=self.bot, id=channel_name, token=token, config=self.config)
elif is_twitch:
@@ -445,6 +557,10 @@ class Streams(commands.Cog):
token=token.get("client_id"),
bearer=self.ttv_bearer_cache.get("access_token", None),
)
elif is_kick:
await self.maybe_renew_kick_token()
token = self.kick_bearer_cache.get("access_token")
stream = _class(_bot=self.bot, name=channel_name, token=token)
else:
if is_yt:
stream = _class(
@@ -464,8 +580,7 @@ class Streams(commands.Cog):
except InvalidYoutubeCredentials:
await ctx.send(
_(
"The YouTube API key is either invalid or has not been set. See "
"{command}."
"The YouTube API key is either invalid or has not been set. See {command}."
).format(command=inline(f"{ctx.clean_prefix}streamset youtubekey"))
)
return
@@ -476,6 +591,13 @@ class Streams(commands.Cog):
" Try again later or contact the owner if this continues."
)
)
except InvalidKickCredentials:
await ctx.send(
_(
"The Kick API key is either invalid or has not been set. See {command}."
).format(command=inline(f"{ctx.clean_prefix}streamset kicktoken"))
)
return
except APIError as e:
log.error(
"Something went wrong whilst trying to contact the stream service's API.\n"
@@ -537,6 +659,30 @@ class Streams(commands.Cog):
await ctx.maybe_send_embed(message)
@streamset.command()
@commands.is_owner()
async def kicktoken(self, ctx: commands.Context):
"""Explain how to set the Kick token."""
message = _(
"To get one, do the following:\n"
"1. Go to this page: {link}.\n"
"2. Click on *Create new*.\n"
"3. Fill the name and description, for *Redirection URL* add *http://localhost*.\n"
"4. Click on *Create Application*.\n"
"5. Copy your client ID and your client secret into:\n"
"{command}"
"\n\n"
"Note: These tokens are sensitive and should only be used in a private channel\n"
"or in DM with the bot.\n"
).format(
link="https://kick.com/settings/developer",
command="`{}set api kick client_id {} client_secret {}`".format(
ctx.clean_prefix, _("<your_client_id_here>"), _("<your_client_secret_here>")
),
)
await ctx.maybe_send_embed(message)
@streamset.command()
@commands.is_owner()
async def youtubekey(self, ctx: commands.Context):
@@ -830,8 +976,7 @@ class Streams(commands.Cog):
for stream in self.streams:
try:
try:
is_rerun = False
is_schedule = False
is_rerun, is_schedule = False, False
if stream.__class__.__name__ == "TwitchStream":
await self.maybe_renew_twitch_bearer_token()
embed, is_rerun = await stream.is_online()
@@ -839,6 +984,10 @@ class Streams(commands.Cog):
elif stream.__class__.__name__ == "YoutubeStream":
embed, is_schedule = await stream.is_online()
elif stream.__class__.__name__ == "KickStream":
await self.maybe_renew_kick_token()
embed = await stream.is_online()
else:
embed = await stream.is_online()
except StreamNotFound:
@@ -1016,6 +1165,8 @@ class Streams(commands.Cog):
if _class.__name__ == "TwitchStream":
raw_stream["token"] = token.get("client_id")
raw_stream["bearer"] = self.ttv_bearer_cache.get("access_token", None)
elif _class.__name__ == "KickStream":
raw_stream["token"] = self.kick_bearer_cache.get("access_token", None)
else:
if _class.__name__ == "YoutubeStream":
raw_stream["config"] = self.config
+118 -7
View File
@@ -3,26 +3,28 @@ import contextlib
import json
import logging
import time
from dateutil.parser import parse as parse_time
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from random import choice
from string import ascii_letters
from datetime import datetime, timedelta, timezone
import xml.etree.ElementTree as ET
from typing import ClassVar, Optional, List, Tuple
from typing import ClassVar, List, Optional, Tuple
import aiohttp
import discord
from dateutil.parser import parse as parse_time
from redbot.core.i18n import Translator
from redbot.core.utils.chat_formatting import humanize_number
from .errors import (
APIError,
OfflineStream,
InvalidKickCredentials,
InvalidTwitchCredentials,
InvalidYoutubeCredentials,
OfflineStream,
StreamNotFound,
YoutubeQuotaExceeded,
)
from redbot.core.i18n import Translator
from redbot.core.utils.chat_formatting import humanize_number, humanize_timedelta
TWITCH_BASE_URL = "https://api.twitch.tv"
TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/helix/users"
@@ -35,6 +37,10 @@ YOUTUBE_SEARCH_ENDPOINT = YOUTUBE_BASE_URL + "/search"
YOUTUBE_VIDEOS_ENDPOINT = YOUTUBE_BASE_URL + "/videos"
YOUTUBE_CHANNEL_RSS = "https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
KICK_BASE_URL = "https://api.kick.com/public/v1/"
KICK_USERS_ENDPOINT = KICK_BASE_URL + "users"
KICK_CHANNELS_ENDPOINT = KICK_BASE_URL + "channels"
_ = Translator("Streams", __file__)
log = logging.getLogger("red.core.cogs.Streams")
@@ -509,3 +515,108 @@ class PicartoStream(Stream):
embed.set_footer(text=_("{adult}Category: {category} | Tags: {tags}").format(**data))
return embed
class KickStream(Stream):
token_name = "kick"
platform_name = "Kick"
def __init__(self, **kwargs):
self.id = kwargs.pop("id", None)
self._display_name = None
self._token = kwargs.pop("token", None)
super().__init__(**kwargs)
@property
def display_name(self) -> Optional[str]:
return self._display_name or self.name
@display_name.setter
def display_name(self, value: str) -> None:
self._display_name = value
async def get_data(self, url: str, params: dict = {}) -> Tuple[Optional[int], dict]:
if self._token is None:
raise InvalidKickCredentials()
headers = {"Authorization": f"Bearer {self._token}"}
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, headers=headers, params=params, timeout=60) as resp:
if resp.status != 200:
return resp.status, {}
data = await resp.json(encoding="utf-8")
return resp.status, data["data"][0] if data["data"] else []
except (aiohttp.ClientConnectionError, asyncio.TimeoutError) as exc:
log.warning("Connection error occurred when fetching Kick stream", exc_info=exc)
return None, {}
async def is_online(self):
channel_code, channel_data = await self.get_data(
KICK_CHANNELS_ENDPOINT, {"slug": self.name}
)
if not channel_data:
raise StreamNotFound()
if channel_code == 200:
if channel_data["stream"]["is_live"] is False:
raise OfflineStream()
self.id = channel_data["broadcaster_user_id"]
user_profile_data = await self._fetch_user_profile()
final_data = dict.fromkeys(
("game_name", "followers", "name", "slug", "profile_picture", "view_count")
)
if user_profile_data is not None:
final_data["user_name"] = self.display_name = user_profile_data["name"]
final_data["profile_picture"] = user_profile_data["profile_picture"]
stream_data = channel_data["stream"]
final_data["game_name"] = channel_data["category"]["name"]
final_data["title"] = channel_data["stream_title"]
final_data["thumbnail_url"] = stream_data["thumbnail"]
final_data["view_count"] = stream_data["viewer_count"]
final_data["slug"] = channel_data["slug"]
return self.make_embed(final_data)
elif channel_code == 401:
raise InvalidKickCredentials()
elif channel_code == 400:
raise StreamNotFound()
else:
raise APIError(channel_code, stream_data)
async def _fetch_user_profile(self):
code, data = await self.get_data(KICK_USERS_ENDPOINT, {"id": self.id})
if code == 200:
if not data:
raise StreamNotFound()
return data
elif code == 400:
raise StreamNotFound()
elif code == 401:
raise InvalidKickCredentials()
else:
raise APIError(code, data)
def make_embed(self, data):
url = f"https://www.kick.com/{data['slug']}" if data["slug"] is not None else None
logo = (
data["profile_picture"] or "https://www.google.com/s2/favicons?domain=kick.com&sz=256"
)
status = data["title"] or _("Untitled broadcast")
embed = discord.Embed(title=status, url=url, color=0x00E701)
embed.set_author(name=data["user_name"])
embed.add_field(name=_("Total views"), value=humanize_number(data["view_count"]))
embed.set_thumbnail(url=logo)
if data["thumbnail_url"]:
embed.set_image(url=rnd(data["thumbnail_url"]))
if data["game_name"]:
embed.set_footer(text=_("Playing: ") + data["game_name"])
return embed
def __repr__(self):
return "<{0.__class__.__name__}: {0.name} (ID: {0.id})>".format(self)