[V3 Streams] Improve command responses and help, rename modules (#1194)

* Command response improvements for streams

Fix #1182

Fix #1183

Various other command response improvements

* Rename main to streams, streams to streamtypes

* Raise errors for communities
This commit is contained in:
Tobotimus 2017-12-18 14:56:17 +11:00 committed by palmtree5
parent 103f79eca5
commit 76ae62cb84
4 changed files with 825 additions and 774 deletions

View File

@ -1,4 +1,4 @@
from .main import Streams from .streams import Streams
def setup(bot): def setup(bot):

View File

@ -1,492 +0,0 @@
import discord
from discord.ext import commands
from redbot.core import Config, checks, RedContext
from redbot.core.utils.chat_formatting import pagify, box
from redbot.core.bot import Red
from .streams import TwitchStream, HitboxStream, MixerStream, PicartoStream, TwitchCommunity
from .errors import OfflineStream, StreamNotFound, APIError, InvalidCredentials, CommunityNotFound, OfflineCommunity
from . import streams as StreamClasses
from collections import defaultdict
import asyncio
CHECK_DELAY = 60
class Streams:
global_defaults = {
"tokens": {},
"streams": [],
"communities": []
}
guild_defaults = {
"autodelete": False,
"mention_everyone": False,
"mention_here": False
}
role_defaults = {
"mention": False
}
def __init__(self, bot: Red):
self.db = Config.get_conf(self, 26262626)
self.db.register_global(**self.global_defaults)
self.db.register_guild(**self.guild_defaults)
self.db.register_role(**self.role_defaults)
self.bot = bot
self.bot.loop.create_task(self._initialize_lists())
async def _initialize_lists(self):
self.streams = await self.load_streams()
self.communities = await self.load_communities()
self.task = self.bot.loop.create_task(self._stream_alerts())
@commands.command()
async def twitch(self, ctx, channel_name: str):
"""Checks if a Twitch channel is streaming"""
token = await self.db.tokens.get_attr(TwitchStream.__name__)
stream = TwitchStream(name=channel_name,
token=token)
await self.check_online(ctx, stream)
@commands.command()
async def hitbox(self, ctx, channel_name: str):
"""Checks if a Hitbox channel is streaming"""
stream = HitboxStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def mixer(self, ctx, channel_name: str):
"""Checks if a Mixer channel is streaming"""
stream = MixerStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def picarto(self, ctx, channel_name: str):
"""Checks if a Picarto channel is streaming"""
stream = PicartoStream(name=channel_name)
await self.check_online(ctx, stream)
async def check_online(self, ctx, stream):
try:
embed = await stream.is_online()
except OfflineStream:
await ctx.send("The stream is offline.")
except StreamNotFound:
await ctx.send("The channel doesn't seem to exist.")
except InvalidCredentials:
await ctx.send("Invalid twitch token.")
except APIError:
await ctx.send("Error contacting the API.")
else:
await ctx.send(embed=embed)
@commands.group()
@commands.guild_only()
@checks.mod()
async def streamalert(self, ctx):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamalert.group(name="twitch")
async def _twitch(self, ctx):
"""Twitch stream alerts"""
if isinstance(ctx.invoked_subcommand, commands.Group):
await ctx.send_help()
@_twitch.command(name="channel")
async def twitch_alert_channel(self, ctx: RedContext, channel_name: str):
"""Sets a Twitch stream alert notification in the channel"""
await self.stream_alert(ctx, TwitchStream, channel_name)
@_twitch.command(name="community")
async def twitch_alert_community(self, ctx: RedContext, community: str):
"""Sets a Twitch stream alert notification in the channel
for the specified community."""
await self.community_alert(ctx, TwitchCommunity, community)
@streamalert.command(name="hitbox")
async def hitbox_alert(self, ctx, channel_name: str):
"""Sets a Hitbox stream alert notification in the channel"""
await self.stream_alert(ctx, HitboxStream, channel_name)
@streamalert.command(name="mixer")
async def mixer_alert(self, ctx, channel_name: str):
"""Sets a Mixer stream alert notification in the channel"""
await self.stream_alert(ctx, MixerStream, channel_name)
@streamalert.command(name="picarto")
async def picarto_alert(self, ctx, channel_name: str):
"""Sets a Picarto stream alert notification in the channel"""
await self.stream_alert(ctx, PicartoStream, channel_name)
@streamalert.command(name="stop")
async def streamalert_stop(self, ctx, _all: bool=False):
"""Stops all stream notifications in the channel
Adding 'yes' will disable all notifications in the server"""
streams = self.streams.copy()
local_channel_ids = [c.id for c in ctx.guild.channels]
to_remove = []
for stream in streams:
for channel_id in stream.channels:
if channel_id == ctx.channel.id:
stream.channels.remove(channel_id)
elif _all and ctx.channel.id in local_channel_ids:
if channel_id in stream.channels:
stream.channels.remove(channel_id)
if not stream.channels:
to_remove.append(stream)
for stream in to_remove:
streams.remove(stream)
self.streams = streams
await self.save_streams()
msg = "All {}'s stream alerts have been disabled." \
"".format("server" if _all else "channel")
await ctx.send(msg)
@streamalert.command(name="list")
async def streamalert_list(self, ctx):
streams_list = defaultdict(list)
guild_channels_ids = [c.id for c in ctx.guild.channels]
msg = "Active stream alerts:\n\n"
for stream in self.streams:
for channel_id in stream.channels:
if channel_id in guild_channels_ids:
streams_list[channel_id].append(stream.name)
if not streams_list:
await ctx.send("There are no active stream alerts in this server.")
return
for channel_id, streams in streams_list.items():
channel = ctx.guild.get_channel(channel_id)
msg += "** - #{}**\n{}\n".format(channel, ", ".join(streams))
for page in pagify(msg):
await ctx.send(page)
async def stream_alert(self, ctx, _class, channel_name):
stream = self.get_stream(_class, channel_name)
if not stream:
token = await self.db.tokens.get_attr(_class.__name__)
stream = _class(name=channel_name,
token=token)
if not await self.check_exists(stream):
await ctx.send("That channel doesn't seem to exist.")
return
await self.add_or_remove(ctx, stream)
async def community_alert(self, ctx, _class, community_name):
community = self.get_community(_class, community_name)
if not community:
token = await self.db.tokens.get_attr(_class.__name__)
community = _class(name=community_name, token=token)
try:
await community.get_community_streams()
except CommunityNotFound:
await ctx.send("That community doesn't seem to exist")
return
await self.add_or_remove_community(ctx, community)
@commands.group()
@checks.mod()
async def streamset(self, ctx):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamset.command()
@checks.is_owner()
async def twitchtoken(self, ctx, token: str):
tokens = await self.db.tokens()
tokens["TwitchStream"] = token
tokens["TwitchCommunity"] = token
await self.db.tokens.set(tokens)
await ctx.send("Twitch token set.")
@streamset.group()
@commands.guild_only()
async def mention(self, ctx):
"""Sets mentions for stream alerts
Types: everyone, here, role, none"""
if ctx.invoked_subcommand is None:
await ctx.send_help()
@mention.command()
@commands.guild_only()
async def all(self, ctx):
"""Toggles everyone mention"""
guild = ctx.guild
current_setting = await self.db.guild(guild).mention_everyone()
if current_setting:
await self.db.guild(guild).mention_everyone.set(False)
await ctx.send("@\u200beveryone will no longer be mentioned "
"for a stream alert.")
else:
await self.db.guild(guild).mention_everyone.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200beveryone will be mentioned")
@mention.command()
@commands.guild_only()
async def online(self, ctx):
"""Toggles here mention"""
guild = ctx.guild
current_setting = await self.db.guild(guild).mention_here()
if current_setting:
await self.db.guild(guild).mention_here.set(False)
await ctx.send("@\u200bhere will no longer be mentioned "
"for a stream alert.")
else:
await self.db.guild(guild).mention_here.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200bhere will be mentioned")
@mention.command()
@commands.guild_only()
async def role(self, ctx, role: discord.Role):
"""Toggles role mention"""
current_setting = await self.db.role(role).mention()
if not role.mentionable:
await ctx.send("That role is not mentionable!")
return
if current_setting:
await self.db.role(role).mention.set(False)
await ctx.send("@\u200b{} will no longer be mentioned "
"for a stream alert".format(role.name))
else:
await self.db.role(role).mention.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200b{} will be mentioned"
"".format(role.name))
@streamset.command()
@commands.guild_only()
async def autodelete(self, ctx, on_off: bool):
"""Toggles automatic deletion of notifications for streams that go offline"""
await self.db.guild(ctx.guild).autodelete.set(on_off)
if on_off:
await ctx.send("The notifications will be deleted once "
"streams go offline.")
else:
await ctx.send("Notifications will never be deleted.")
async def add_or_remove(self, ctx, stream):
if ctx.channel.id not in stream.channels:
stream.channels.append(ctx.channel.id)
if stream not in self.streams:
self.streams.append(stream)
await ctx.send("I'll send a notification in this channel when {} "
"is online.".format(stream.name))
else:
stream.channels.remove(ctx.channel.id)
if not stream.channels:
self.streams.remove(stream)
await ctx.send("I won't send notifications about {} in this "
"channel anymore.".format(stream.name))
await self.save_streams()
async def add_or_remove_community(self, ctx, community):
if ctx.channel.id not in community.channels:
community.channels.append(ctx.channel.id)
if community not in self.communities:
self.communities.append(community)
await ctx.send("I'll send a notification in this channel when a "
"channel is streaming to the {} community"
"".format(community.name))
else:
community.channels.remove(ctx.channel.id)
if not community.channels:
self.communities.remove(community)
await ctx.send("I won't send notifications about channels streaming "
"to the {} community in this channel anymore"
"".format(community.name))
await self.save_communities()
def get_stream(self, _class, name):
for stream in self.streams:
# if isinstance(stream, _class) and stream.name == name:
# return stream
# Reloading this cog causes an issue with this check ^
# isinstance will always return False
# As a workaround, we'll compare the class' name instead.
# Good enough.
if stream.type == _class.__name__ and stream.name == name:
return stream
def get_community(self, _class, name):
for community in self.communities:
if community.type == _class.__name__ and community.name == name:
return community
async def check_exists(self, stream):
try:
await stream.is_online()
except OfflineStream:
pass
except:
return False
return True
async def _stream_alerts(self):
while True:
try:
await self.check_streams()
except asyncio.CancelledError:
pass
try:
await self.check_communities()
except asyncio.CancelledError:
pass
await asyncio.sleep(CHECK_DELAY)
async def check_streams(self):
for stream in self.streams:
try:
embed = await stream.is_online()
except OfflineStream:
for message in stream._messages_cache:
try:
autodelete = self.db.guild(message.guild).autodelete()
if autodelete:
await message.delete()
except:
pass
stream._messages_cache.clear()
except:
pass
else:
if stream._messages_cache:
continue
for channel_id in stream.channels:
channel = self.bot.get_channel(channel_id)
mention_everyone = await self.db.guild(channel.guild).mention_everyone()
mention_here = await self.db.guild(channel.guild).mention_here()
mention_roles = []
for r in channel.guild.roles:
to_append = {
"role": r,
"enabled": await self.db.role(r).mention()
}
mention_roles.append(to_append)
mention = None
if mention_everyone or mention_here or any(mention_roles):
mention = True
if mention:
mention_str = ""
if mention_everyone:
mention_str += "@everyone "
if mention_here:
mention_str += "@here "
if any(mention_roles):
mention_str += " ".join(
[
r["role"].mention for r in mention_roles
if r["role"].mentionable and r["enabled"]
]
)
mention_str = mention_str.strip()
try:
m = await channel.send(
"{}, {} is online!".format(
mention_str, stream.name
), embed=embed
)
stream._messages_cache.append(m)
except:
pass
else:
try:
m = await channel.send("%s is online!" % stream.name,
embed=embed)
stream._messages_cache.append(m)
except:
pass
async def check_communities(self):
for community in self.communities:
try:
streams = community.get_community_streams()
except CommunityNotFound:
print("Community {} not found!".format(community.name))
continue
except OfflineCommunity:
pass
else:
token = self.db.tokens().get(TwitchStream.__name__)
for channel in community.channels:
chn = self.bot.get_channel(channel)
await chn.send("Online streams for {}".format(community.name))
for stream in streams:
stream_obj = TwitchStream(
token=token, name=stream["channel"]["name"],
id=stream["_id"]
)
try:
emb = await stream_obj.is_online()
except:
pass
else:
for channel in community.channels:
chn = self.bot.get_channel(channel)
await chn.send(embed=emb)
async def load_streams(self):
streams = []
for raw_stream in await self.db.streams():
_class = getattr(StreamClasses, raw_stream["type"], None)
if not _class:
continue
token = await self.db.tokens.get_attr(_class.__name__)
streams.append(_class(token=token, **raw_stream))
return streams
async def load_communities(self):
communities = []
for raw_community in await self.db.communities():
_class = getattr(StreamClasses, raw_community["type"], None)
if not _class:
continue
token = await self.db.tokens.get_attr(_class.__name__)
communities.append(_class(token=token, **raw_community))
return communities
async def save_streams(self):
raw_streams = []
for stream in self.streams:
raw_streams.append(stream.export())
await self.db.streams.set(raw_streams)
async def save_communities(self):
raw_communities = []
for community in self.communities:
raw_communities.append(community.export())
await self.db.communities.set(raw_communities)
def __unload(self):
self.task.cancel()

View File

@ -1,312 +1,531 @@
from .errors import StreamNotFound, APIError, InvalidCredentials, OfflineStream, CommunityNotFound, OfflineCommunity
from random import choice
from string import ascii_letters
import discord import discord
import aiohttp from discord.ext import commands
import json from redbot.core import Config, checks, RedContext
from redbot.core.utils.chat_formatting import pagify, box
from redbot.core.bot import Red
from .streamtypes import TwitchStream, HitboxStream, MixerStream, PicartoStream, TwitchCommunity
from .errors import (OfflineStream, StreamNotFound, APIError, InvalidCredentials,
CommunityNotFound, OfflineCommunity, StreamsError)
from . import streamtypes as StreamClasses
from collections import defaultdict
import asyncio
TWITCH_BASE_URL = "https://api.twitch.tv" CHECK_DELAY = 60
TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/kraken/users?login="
TWITCH_STREAMS_ENDPOINT = TWITCH_BASE_URL + "/kraken/streams/"
TWITCH_COMMUNITIES_ENDPOINT = TWITCH_BASE_URL + "/kraken/communities"
def rnd(url): class Streams:
"""Appends a random parameter to the url to avoid Discord's caching"""
return url + "?rnd=" + "".join([choice(ascii_letters) for i in range(6)])
global_defaults = {
"tokens": {},
"streams": [],
"communities": []
}
class TwitchCommunity: guild_defaults = {
def __init__(self, **kwargs): "autodelete": False,
self.name = kwargs.pop("name") "mention_everyone": False,
self.id = kwargs.pop("id", None) "mention_here": False
self.channels = kwargs.pop("channels", []) }
self._token = kwargs.pop("token", None)
self.type = self.__class__.__name__
async def get_community_id(self): role_defaults = {
session = aiohttp.ClientSession() "mention": False
headers = { }
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": str(self._token)
}
params = {
"name": self.name
}
async with session.get(TWITCH_COMMUNITIES_ENDPOINT, headers=headers, params=params) as r:
data = await r.json()
await session.close()
if "status" in data and data["status"] == 404:
raise CommunityNotFound()
return data["_id"]
async def get_community_streams(self): def __init__(self, bot: Red):
if not self.id: self.db = Config.get_conf(self, 26262626)
self.db.register_global(**self.global_defaults)
self.db.register_guild(**self.guild_defaults)
self.db.register_role(**self.role_defaults)
self.bot = bot
self.bot.loop.create_task(self._initialize_lists())
async def _initialize_lists(self):
self.streams = await self.load_streams()
self.communities = await self.load_communities()
self.task = self.bot.loop.create_task(self._stream_alerts())
@commands.command()
async def twitch(self, ctx, channel_name: str):
"""Checks if a Twitch channel is streaming"""
token = await self.db.tokens.get_attr(TwitchStream.__name__)
stream = TwitchStream(name=channel_name,
token=token)
await self.check_online(ctx, stream)
@commands.command()
async def hitbox(self, ctx, channel_name: str):
"""Checks if a Hitbox channel is streaming"""
stream = HitboxStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def mixer(self, ctx, channel_name: str):
"""Checks if a Mixer channel is streaming"""
stream = MixerStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def picarto(self, ctx, channel_name: str):
"""Checks if a Picarto channel is streaming"""
stream = PicartoStream(name=channel_name)
await self.check_online(ctx, stream)
async def check_online(self, ctx, stream):
try:
embed = await stream.is_online()
except OfflineStream:
await ctx.send("The stream is offline.")
except StreamNotFound:
await ctx.send("The channel doesn't seem to exist.")
except InvalidCredentials:
await ctx.send("The twitch token is either invalid or has not been set. "
"See `{}streamset twitchtoken`.".format(ctx.prefix))
except APIError:
await ctx.send("Something went wrong whilst trying to contact the "
"stream service's API.")
else:
await ctx.send(embed=embed)
@commands.group()
@commands.guild_only()
@checks.mod()
async def streamalert(self, ctx):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamalert.group(name="twitch")
async def _twitch(self, ctx):
"""Twitch stream alerts"""
if ctx.invoked_subcommand is None or ctx.invoked_subcommand == self._twitch:
await ctx.send_help()
@_twitch.command(name="channel")
async def twitch_alert_channel(self, ctx: RedContext, channel_name: str):
"""Sets a Twitch stream alert notification in the channel"""
await self.stream_alert(ctx, TwitchStream, channel_name)
@_twitch.command(name="community")
async def twitch_alert_community(self, ctx: RedContext, community: str):
"""Sets a Twitch stream alert notification in the channel
for the specified community."""
await self.community_alert(ctx, TwitchCommunity, community)
@streamalert.command(name="hitbox")
async def hitbox_alert(self, ctx, channel_name: str):
"""Sets a Hitbox stream alert notification in the channel"""
await self.stream_alert(ctx, HitboxStream, channel_name)
@streamalert.command(name="mixer")
async def mixer_alert(self, ctx, channel_name: str):
"""Sets a Mixer stream alert notification in the channel"""
await self.stream_alert(ctx, MixerStream, channel_name)
@streamalert.command(name="picarto")
async def picarto_alert(self, ctx, channel_name: str):
"""Sets a Picarto stream alert notification in the channel"""
await self.stream_alert(ctx, PicartoStream, channel_name)
@streamalert.command(name="stop")
async def streamalert_stop(self, ctx, _all: bool=False):
"""Stops all stream notifications in the channel
Adding 'yes' will disable all notifications in the server"""
streams = self.streams.copy()
local_channel_ids = [c.id for c in ctx.guild.channels]
to_remove = []
for stream in streams:
for channel_id in stream.channels:
if channel_id == ctx.channel.id:
stream.channels.remove(channel_id)
elif _all and ctx.channel.id in local_channel_ids:
if channel_id in stream.channels:
stream.channels.remove(channel_id)
if not stream.channels:
to_remove.append(stream)
for stream in to_remove:
streams.remove(stream)
self.streams = streams
await self.save_streams()
msg = "All {}'s stream alerts have been disabled." \
"".format("server" if _all else "channel")
await ctx.send(msg)
@streamalert.command(name="list")
async def streamalert_list(self, ctx):
streams_list = defaultdict(list)
guild_channels_ids = [c.id for c in ctx.guild.channels]
msg = "Active stream alerts:\n\n"
for stream in self.streams:
for channel_id in stream.channels:
if channel_id in guild_channels_ids:
streams_list[channel_id].append(stream.name)
if not streams_list:
await ctx.send("There are no active stream alerts in this server.")
return
for channel_id, streams in streams_list.items():
channel = ctx.guild.get_channel(channel_id)
msg += "** - #{}**\n{}\n".format(channel, ", ".join(streams))
for page in pagify(msg):
await ctx.send(page)
async def stream_alert(self, ctx, _class, channel_name):
stream = self.get_stream(_class, channel_name)
if not stream:
token = await self.db.tokens.get_attr(_class.__name__)
stream = _class(name=channel_name,
token=token)
try: try:
self.id = await self.get_community_id() exists = await self.check_exists(stream)
except InvalidCredentials:
await ctx.send("The twitch token is either invalid or has not been set. "
"See `{}streamset twitchtoken`.".format(ctx.prefix))
return
except APIError:
await ctx.send("Something went wrong whilst trying to contact the "
"stream service's API.")
return
else:
if not exists:
await ctx.send("That channel doesn't seem to exist.")
return
await self.add_or_remove(ctx, stream)
async def community_alert(self, ctx, _class, community_name):
community = self.get_community(_class, community_name)
if not community:
token = await self.db.tokens.get_attr(_class.__name__)
community = _class(name=community_name, token=token)
try:
await community.get_community_streams()
except InvalidCredentials:
await ctx.send(
"The twitch token is either invalid or has not been set. "
"See `{}streamset twitchtoken`.".format(ctx.prefix))
return
except CommunityNotFound: except CommunityNotFound:
raise await ctx.send("That community doesn't seem to exist.")
session = aiohttp.ClientSession() return
headers = { except APIError:
"Accept": "application/vnd.twitchtv.v5+json", await ctx.send(
"Client-ID": str(self._token) "Something went wrong whilst trying to contact the "
} "stream service's API.")
params = { return
"community_id": self.id except OfflineCommunity:
} pass
url = TWITCH_BASE_URL + "/kraken/streams"
async with session.get(url, headers=headers, params=params) as r: await self.add_or_remove_community(ctx, community)
data = await r.json()
if data["_total"] == 0: @commands.group()
raise OfflineCommunity() @checks.mod()
async def streamset(self, ctx):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamset.command()
@checks.is_owner()
async def twitchtoken(self, ctx, token: str):
"""Set the Client ID for twitch.
To do this, follow these steps:
1. Go to this page: https://dev.twitch.tv/dashboard/apps.
2. Click *Register Your Application*
3. Enter a name, set the OAuth Redirect URI to `http://localhost`, and
select an Application Category of your choosing.
4. Click *Register*, and on the following page, copy the Client ID.
5. Paste the Client ID into this command. Done!
"""
tokens = await self.db.tokens()
tokens["TwitchStream"] = token
tokens["TwitchCommunity"] = token
await self.db.tokens.set(tokens)
await ctx.send("Twitch token set.")
@streamset.group()
@commands.guild_only()
async def mention(self, ctx):
"""Sets mentions for stream alerts."""
if ctx.invoked_subcommand is None or ctx.invoked_subcommand == self.mention:
await ctx.send_help()
@mention.command(aliases=["everyone"])
@commands.guild_only()
async def all(self, ctx):
"""Toggles everyone mention"""
guild = ctx.guild
current_setting = await self.db.guild(guild).mention_everyone()
if current_setting:
await self.db.guild(guild).mention_everyone.set(False)
await ctx.send("@\u200beveryone will no longer be mentioned "
"for a stream alert.")
else: else:
return data["streams"] await self.db.guild(guild).mention_everyone.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200beveryone will be mentioned")
def export(self): @mention.command(aliases=["here"])
data = {} @commands.guild_only()
for k, v in self.__dict__.items(): async def online(self, ctx):
if not k.startswith("_"): """Toggles here mention"""
data[k] = v guild = ctx.guild
return data current_setting = await self.db.guild(guild).mention_here()
if current_setting:
def __repr__(self): await self.db.guild(guild).mention_here.set(False)
return "<{0.__class__.__name__}: {0.name}>".format(self) await ctx.send("@\u200bhere will no longer be mentioned "
"for a stream alert.")
class Stream:
def __init__(self, **kwargs):
self.name = kwargs.pop("name")
self.channels = kwargs.pop("channels", [])
#self.already_online = kwargs.pop("already_online", False)
self._messages_cache = []
self.type = self.__class__.__name__
async def is_online(self):
raise NotImplementedError()
def make_embed(self):
raise NotImplementedError()
def export(self):
data = {}
for k, v in self.__dict__.items():
if not k.startswith("_"):
data[k] = v
return data
def __repr__(self):
return "<{0.__class__.__name__}: {0.name}>".format(self)
class TwitchStream(Stream):
def __init__(self, **kwargs):
self.id = kwargs.pop("id", None)
self._token = kwargs.pop("token", None)
super().__init__(**kwargs)
async def is_online(self):
if not self.id:
self.id = await self.fetch_id()
session = aiohttp.ClientSession()
url = TWITCH_STREAMS_ENDPOINT + self.id
header = {
'Client-ID': str(self._token),
'Accept': 'application/vnd.twitchtv.v5+json'
}
async with session.get(url, headers=header) as r:
data = await r.json(encoding='utf-8')
await session.close()
if r.status == 200:
if data["stream"] is None:
#self.already_online = False
raise OfflineStream()
#self.already_online = True
# In case of rename
self.name = data["stream"]["channel"]["name"]
return self.make_embed(data)
elif r.status == 400:
raise InvalidCredentials()
elif r.status == 404:
raise StreamNotFound()
else: else:
raise APIError() await self.db.guild(guild).mention_here.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200bhere will be mentioned")
async def fetch_id(self): @mention.command()
header = { @commands.guild_only()
'Client-ID': str(self._token), async def role(self, ctx, *, role: discord.Role):
'Accept': 'application/vnd.twitchtv.v5+json' """Toggles role mention"""
} current_setting = await self.db.role(role).mention()
url = TWITCH_ID_ENDPOINT + self.name if not role.mentionable:
session = aiohttp.ClientSession() await ctx.send("That role is not mentionable!")
return
async with session.get(url, headers=header) as r: if current_setting:
data = await r.json() await self.db.role(role).mention.set(False)
await session.close() await ctx.send("@\u200b{} will no longer be mentioned "
"for a stream alert".format(role.name))
if r.status == 200:
if not data["users"]:
raise StreamNotFound()
return data["users"][0]["_id"]
elif r.status == 400:
raise InvalidCredentials()
else: else:
raise APIError() await self.db.role(role).mention.set(True)
await ctx.send("When a stream configured for stream alerts "
"comes online, @\u200b{} will be mentioned"
"".format(role.name))
def make_embed(self, data): @streamset.command()
channel = data["stream"]["channel"] @commands.guild_only()
url = channel["url"] async def autodelete(self, ctx, on_off: bool):
logo = channel["logo"] """Toggles automatic deletion of notifications for streams that go offline"""
if logo is None: await self.db.guild(ctx.guild).autodelete.set(on_off)
logo = ("https://static-cdn.jtvnw.net/" if on_off:
"jtv_user_pictures/xarth/404_user_70x70.png") await ctx.send("The notifications will be deleted once "
status = channel["status"] "streams go offline.")
if not status: else:
status = "Untitled broadcast" await ctx.send("Notifications will never be deleted.")
embed = discord.Embed(title=status, url=url)
embed.set_author(name=channel["display_name"])
embed.add_field(name="Followers", value=channel["followers"])
embed.add_field(name="Total views", value=channel["views"])
embed.set_thumbnail(url=logo)
if data["stream"]["preview"]["medium"]:
embed.set_image(url=rnd(data["stream"]["preview"]["medium"]))
if channel["game"]:
embed.set_footer(text="Playing: " + channel["game"])
embed.color = 0x6441A4
return embed async def add_or_remove(self, ctx, stream):
if ctx.channel.id not in stream.channels:
stream.channels.append(ctx.channel.id)
if stream not in self.streams:
self.streams.append(stream)
await ctx.send("I'll send a notification in this channel when {} "
"is online.".format(stream.name))
else:
stream.channels.remove(ctx.channel.id)
if not stream.channels:
self.streams.remove(stream)
await ctx.send("I won't send notifications about {} in this "
"channel anymore.".format(stream.name))
def __repr__(self): await self.save_streams()
return "<{0.__class__.__name__}: {0.name} (ID: {0.id})>".format(self)
async def add_or_remove_community(self, ctx, community):
if ctx.channel.id not in community.channels:
community.channels.append(ctx.channel.id)
if community not in self.communities:
self.communities.append(community)
await ctx.send("I'll send a notification in this channel when a "
"channel is streaming to the {} community"
"".format(community.name))
else:
community.channels.remove(ctx.channel.id)
if not community.channels:
self.communities.remove(community)
await ctx.send("I won't send notifications about channels streaming "
"to the {} community in this channel anymore"
"".format(community.name))
await self.save_communities()
class HitboxStream(Stream): def get_stream(self, _class, name):
async def is_online(self): for stream in self.streams:
session = aiohttp.ClientSession() # if isinstance(stream, _class) and stream.name == name:
url = "https://api.hitbox.tv/media/live/" + self.name # return stream
# Reloading this cog causes an issue with this check ^
# isinstance will always return False
# As a workaround, we'll compare the class' name instead.
# Good enough.
if stream.type == _class.__name__ and stream.name == name:
return stream
async with session.get(url) as r: def get_community(self, _class, name):
#data = await r.json(encoding='utf-8') for community in self.communities:
data = await r.text() if community.type == _class.__name__ and community.name == name:
await session.close() return community
data = json.loads(data, strict=False)
if "livestream" not in data:
raise StreamNotFound()
elif data["livestream"][0]["media_is_live"] == "0":
#self.already_online = False
raise OfflineStream()
elif data["livestream"][0]["media_is_live"] == "1":
#self.already_online = True
return self.make_embed(data)
raise APIError() async def check_exists(self, stream):
try:
await stream.is_online()
except OfflineStream:
pass
except StreamNotFound:
return False
except StreamsError:
raise
return True
def make_embed(self, data): async def _stream_alerts(self):
base_url = "https://edge.sf.hitbox.tv" while True:
livestream = data["livestream"][0] try:
channel = livestream["channel"] await self.check_streams()
url = channel["channel_link"] except asyncio.CancelledError:
embed = discord.Embed(title=livestream["media_status"], url=url) pass
embed.set_author(name=livestream["media_name"]) try:
embed.add_field(name="Followers", value=channel["followers"]) await self.check_communities()
embed.set_thumbnail(url=base_url + channel["user_logo"]) except asyncio.CancelledError:
if livestream["media_thumbnail"]: pass
embed.set_image(url=rnd(base_url + livestream["media_thumbnail"])) await asyncio.sleep(CHECK_DELAY)
embed.set_footer(text="Playing: " + livestream["category_name"])
embed.color = 0x98CB00
return embed async def check_streams(self):
for stream in self.streams:
try:
class MixerStream(Stream): embed = await stream.is_online()
async def is_online(self): except OfflineStream:
url = "https://mixer.com/api/v1/channels/" + self.name for message in stream._messages_cache:
try:
session = aiohttp.ClientSession() autodelete = self.db.guild(message.guild).autodelete()
async with session.get(url) as r: if autodelete:
#data = await r.json(encoding='utf-8') await message.delete()
data = await r.text(encoding='utf-8') except:
await session.close() pass
if r.status == 200: stream._messages_cache.clear()
data = json.loads(data, strict=False) except:
if data["online"] is True: pass
#self.already_online = True
return self.make_embed(data)
else: else:
#self.already_online = False if stream._messages_cache:
raise OfflineStream() continue
elif r.status == 404: for channel_id in stream.channels:
raise StreamNotFound() channel = self.bot.get_channel(channel_id)
else: mention_everyone = await self.db.guild(channel.guild).mention_everyone()
raise APIError() mention_here = await self.db.guild(channel.guild).mention_here()
mention_roles = []
for r in channel.guild.roles:
to_append = {
"role": r,
"enabled": await self.db.role(r).mention()
}
mention_roles.append(to_append)
mention = None
if mention_everyone or mention_here or any(mention_roles):
mention = True
if mention:
mention_str = ""
if mention_everyone:
mention_str += "@everyone "
if mention_here:
mention_str += "@here "
if any(mention_roles):
mention_str += " ".join(
[
r["role"].mention for r in mention_roles
if r["role"].mentionable and r["enabled"]
]
)
mention_str = mention_str.strip()
try:
m = await channel.send(
"{}, {} is online!".format(
mention_str, stream.name
), embed=embed
)
stream._messages_cache.append(m)
except:
pass
else:
try:
m = await channel.send("%s is online!" % stream.name,
embed=embed)
stream._messages_cache.append(m)
except:
pass
def make_embed(self, data): async def check_communities(self):
default_avatar = ("https://mixer.com/_latest/assets/images/main/" for community in self.communities:
"avatars/default.jpg") try:
user = data["user"] streams = community.get_community_streams()
url = "https://mixer.com/" + data["token"] except CommunityNotFound:
embed = discord.Embed(title=data["name"], url=url) print("Community {} not found!".format(community.name))
embed.set_author(name=user["username"]) continue
embed.add_field(name="Followers", value=data["numFollowers"]) except OfflineCommunity:
embed.add_field(name="Total views", value=data["viewersTotal"]) pass
if user["avatarUrl"]:
embed.set_thumbnail(url=user["avatarUrl"])
else:
embed.set_thumbnail(url=default_avatar)
if data["thumbnail"]:
embed.set_image(url=rnd(data["thumbnail"]["url"]))
embed.color = 0x4C90F3
if data["type"] is not None:
embed.set_footer(text="Playing: " + data["type"]["name"])
return embed
class PicartoStream(Stream):
async def is_online(self):
url = "https://api.picarto.tv/v1/channel/name/" + self.name
session = aiohttp.ClientSession()
async with session.get(url) as r:
data = await r.text(encoding='utf-8')
await session.close()
if r.status == 200:
data = json.loads(data)
if data["online"] is True:
#self.already_online = True
return self.make_embed(data)
else: else:
#self.already_online = False token = self.db.tokens().get(TwitchStream.__name__)
raise OfflineStream() for channel in community.channels:
elif r.status == 404: chn = self.bot.get_channel(channel)
raise StreamNotFound() await chn.send("Online streams for {}".format(community.name))
else: for stream in streams:
raise APIError() stream_obj = TwitchStream(
token=token, name=stream["channel"]["name"],
id=stream["_id"]
)
try:
emb = await stream_obj.is_online()
except:
pass
else:
for channel in community.channels:
chn = self.bot.get_channel(channel)
await chn.send(embed=emb)
def make_embed(self, data): async def load_streams(self):
avatar = rnd("https://picarto.tv/user_data/usrimg/{}/dsdefault.jpg" streams = []
"".format(data["name"].lower()))
url = "https://picarto.tv/" + data["name"]
thumbnail = data["thumbnails"]["web"]
embed = discord.Embed(title=data["title"], url=url)
embed.set_author(name=data["name"])
embed.set_image(url=rnd(thumbnail))
embed.add_field(name="Followers", value=data["followers"])
embed.add_field(name="Total views", value=data["viewers_total"])
embed.set_thumbnail(url=avatar)
embed.color = 0x132332
data["tags"] = ", ".join(data["tags"])
if not data["tags"]: for raw_stream in await self.db.streams():
data["tags"] = "None" _class = getattr(StreamClasses, raw_stream["type"], None)
if not _class:
continue
if data["adult"]: token = await self.db.tokens.get_attr(_class.__name__)
data["adult"] = "NSFW | " streams.append(_class(token=token, **raw_stream))
else:
data["adult"] = ""
embed.color = 0x4C90F3 return streams
embed.set_footer(text="{adult}Category: {category} | Tags: {tags}"
"".format(**data)) async def load_communities(self):
return embed communities = []
for raw_community in await self.db.communities():
_class = getattr(StreamClasses, raw_community["type"], None)
if not _class:
continue
token = await self.db.tokens.get_attr(_class.__name__)
communities.append(_class(token=token, **raw_community))
return communities
async def save_streams(self):
raw_streams = []
for stream in self.streams:
raw_streams.append(stream.export())
await self.db.streams.set(raw_streams)
async def save_communities(self):
raw_communities = []
for community in self.communities:
raw_communities.append(community.export())
await self.db.communities.set(raw_communities)
def __unload(self):
self.task.cancel()

View File

@ -0,0 +1,324 @@
from .errors import StreamNotFound, APIError, InvalidCredentials, OfflineStream, CommunityNotFound, OfflineCommunity
from random import choice
from string import ascii_letters
import discord
import aiohttp
import json
TWITCH_BASE_URL = "https://api.twitch.tv"
TWITCH_ID_ENDPOINT = TWITCH_BASE_URL + "/kraken/users?login="
TWITCH_STREAMS_ENDPOINT = TWITCH_BASE_URL + "/kraken/streams/"
TWITCH_COMMUNITIES_ENDPOINT = TWITCH_BASE_URL + "/kraken/communities"
def rnd(url):
"""Appends a random parameter to the url to avoid Discord's caching"""
return url + "?rnd=" + "".join([choice(ascii_letters) for i in range(6)])
class TwitchCommunity:
def __init__(self, **kwargs):
self.name = kwargs.pop("name")
self.id = kwargs.pop("id", None)
self.channels = kwargs.pop("channels", [])
self._token = kwargs.pop("token", None)
self.type = self.__class__.__name__
async def get_community_id(self):
session = aiohttp.ClientSession()
headers = {
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": str(self._token)
}
params = {
"name": self.name
}
async with session.get(TWITCH_COMMUNITIES_ENDPOINT, headers=headers, params=params) as r:
data = await r.json()
await session.close()
if r.status == 200:
return data["_id"]
elif r.status == 400:
raise InvalidCredentials()
elif r.status == 404:
raise CommunityNotFound()
else:
raise APIError()
async def get_community_streams(self):
if not self.id:
try:
self.id = await self.get_community_id()
except CommunityNotFound:
raise
session = aiohttp.ClientSession()
headers = {
"Accept": "application/vnd.twitchtv.v5+json",
"Client-ID": str(self._token)
}
params = {
"community_id": self.id
}
url = TWITCH_BASE_URL + "/kraken/streams"
async with session.get(url, headers=headers, params=params) as r:
data = await r.json()
if r.status == 200:
if data["_total"] == 0:
raise OfflineCommunity()
else:
return data["streams"]
elif r.status == 400:
raise InvalidCredentials()
elif r.status == 404:
raise CommunityNotFound()
else:
raise APIError()
def export(self):
data = {}
for k, v in self.__dict__.items():
if not k.startswith("_"):
data[k] = v
return data
def __repr__(self):
return "<{0.__class__.__name__}: {0.name}>".format(self)
class Stream:
def __init__(self, **kwargs):
self.name = kwargs.pop("name")
self.channels = kwargs.pop("channels", [])
#self.already_online = kwargs.pop("already_online", False)
self._messages_cache = []
self.type = self.__class__.__name__
async def is_online(self):
raise NotImplementedError()
def make_embed(self):
raise NotImplementedError()
def export(self):
data = {}
for k, v in self.__dict__.items():
if not k.startswith("_"):
data[k] = v
return data
def __repr__(self):
return "<{0.__class__.__name__}: {0.name}>".format(self)
class TwitchStream(Stream):
def __init__(self, **kwargs):
self.id = kwargs.pop("id", None)
self._token = kwargs.pop("token", None)
super().__init__(**kwargs)
async def is_online(self):
if not self.id:
self.id = await self.fetch_id()
session = aiohttp.ClientSession()
url = TWITCH_STREAMS_ENDPOINT + self.id
header = {
'Client-ID': str(self._token),
'Accept': 'application/vnd.twitchtv.v5+json'
}
async with session.get(url, headers=header) as r:
data = await r.json(encoding='utf-8')
await session.close()
if r.status == 200:
if data["stream"] is None:
#self.already_online = False
raise OfflineStream()
#self.already_online = True
# In case of rename
self.name = data["stream"]["channel"]["name"]
return self.make_embed(data)
elif r.status == 400:
raise InvalidCredentials()
elif r.status == 404:
raise StreamNotFound()
else:
raise APIError()
async def fetch_id(self):
header = {
'Client-ID': str(self._token),
'Accept': 'application/vnd.twitchtv.v5+json'
}
url = TWITCH_ID_ENDPOINT + self.name
session = aiohttp.ClientSession()
async with session.get(url, headers=header) as r:
data = await r.json()
await session.close()
if r.status == 200:
if not data["users"]:
raise StreamNotFound()
return data["users"][0]["_id"]
elif r.status == 400:
raise InvalidCredentials()
else:
raise APIError()
def make_embed(self, data):
channel = data["stream"]["channel"]
url = channel["url"]
logo = channel["logo"]
if logo is None:
logo = ("https://static-cdn.jtvnw.net/"
"jtv_user_pictures/xarth/404_user_70x70.png")
status = channel["status"]
if not status:
status = "Untitled broadcast"
embed = discord.Embed(title=status, url=url)
embed.set_author(name=channel["display_name"])
embed.add_field(name="Followers", value=channel["followers"])
embed.add_field(name="Total views", value=channel["views"])
embed.set_thumbnail(url=logo)
if data["stream"]["preview"]["medium"]:
embed.set_image(url=rnd(data["stream"]["preview"]["medium"]))
if channel["game"]:
embed.set_footer(text="Playing: " + channel["game"])
embed.color = 0x6441A4
return embed
def __repr__(self):
return "<{0.__class__.__name__}: {0.name} (ID: {0.id})>".format(self)
class HitboxStream(Stream):
async def is_online(self):
session = aiohttp.ClientSession()
url = "https://api.hitbox.tv/media/live/" + self.name
async with session.get(url) as r:
#data = await r.json(encoding='utf-8')
data = await r.text()
await session.close()
data = json.loads(data, strict=False)
if "livestream" not in data:
raise StreamNotFound()
elif data["livestream"][0]["media_is_live"] == "0":
#self.already_online = False
raise OfflineStream()
elif data["livestream"][0]["media_is_live"] == "1":
#self.already_online = True
return self.make_embed(data)
raise APIError()
def make_embed(self, data):
base_url = "https://edge.sf.hitbox.tv"
livestream = data["livestream"][0]
channel = livestream["channel"]
url = channel["channel_link"]
embed = discord.Embed(title=livestream["media_status"], url=url)
embed.set_author(name=livestream["media_name"])
embed.add_field(name="Followers", value=channel["followers"])
embed.set_thumbnail(url=base_url + channel["user_logo"])
if livestream["media_thumbnail"]:
embed.set_image(url=rnd(base_url + livestream["media_thumbnail"]))
embed.set_footer(text="Playing: " + livestream["category_name"])
embed.color = 0x98CB00
return embed
class MixerStream(Stream):
async def is_online(self):
url = "https://mixer.com/api/v1/channels/" + self.name
session = aiohttp.ClientSession()
async with session.get(url) as r:
#data = await r.json(encoding='utf-8')
data = await r.text(encoding='utf-8')
await session.close()
if r.status == 200:
data = json.loads(data, strict=False)
if data["online"] is True:
#self.already_online = True
return self.make_embed(data)
else:
#self.already_online = False
raise OfflineStream()
elif r.status == 404:
raise StreamNotFound()
else:
raise APIError()
def make_embed(self, data):
default_avatar = ("https://mixer.com/_latest/assets/images/main/"
"avatars/default.jpg")
user = data["user"]
url = "https://mixer.com/" + data["token"]
embed = discord.Embed(title=data["name"], url=url)
embed.set_author(name=user["username"])
embed.add_field(name="Followers", value=data["numFollowers"])
embed.add_field(name="Total views", value=data["viewersTotal"])
if user["avatarUrl"]:
embed.set_thumbnail(url=user["avatarUrl"])
else:
embed.set_thumbnail(url=default_avatar)
if data["thumbnail"]:
embed.set_image(url=rnd(data["thumbnail"]["url"]))
embed.color = 0x4C90F3
if data["type"] is not None:
embed.set_footer(text="Playing: " + data["type"]["name"])
return embed
class PicartoStream(Stream):
async def is_online(self):
url = "https://api.picarto.tv/v1/channel/name/" + self.name
session = aiohttp.ClientSession()
async with session.get(url) as r:
data = await r.text(encoding='utf-8')
await session.close()
if r.status == 200:
data = json.loads(data)
if data["online"] is True:
#self.already_online = True
return self.make_embed(data)
else:
#self.already_online = False
raise OfflineStream()
elif r.status == 404:
raise StreamNotFound()
else:
raise APIError()
def make_embed(self, data):
avatar = rnd("https://picarto.tv/user_data/usrimg/{}/dsdefault.jpg"
"".format(data["name"].lower()))
url = "https://picarto.tv/" + data["name"]
thumbnail = data["thumbnails"]["web"]
embed = discord.Embed(title=data["title"], url=url)
embed.set_author(name=data["name"])
embed.set_image(url=rnd(thumbnail))
embed.add_field(name="Followers", value=data["followers"])
embed.add_field(name="Total views", value=data["viewers_total"])
embed.set_thumbnail(url=avatar)
embed.color = 0x132332
data["tags"] = ", ".join(data["tags"])
if not data["tags"]:
data["tags"] = "None"
if data["adult"]:
data["adult"] = "NSFW | "
else:
data["adult"] = ""
embed.color = 0x4C90F3
embed.set_footer(text="{adult}Category: {category} | Tags: {tags}"
"".format(**data))
return embed