Redjumpman 1fd5dffdc7 [V3/Misc] Spelling, Grammar, and doc string fixes. (#1747)
* Update streams.py

* Update filter.py

* Update permissions.py

* Update customcom.py

* Update image.py

* Update trivia.py

* Update warnings.py
2018-06-01 19:20:12 +10:00

670 lines
26 KiB
Python

import discord
from redbot.core import Config, checks, commands
from redbot.core.utils.chat_formatting import pagify
from redbot.core.bot import Red
from redbot.core.i18n import Translator, cog_i18n
from .streamtypes import (
TwitchStream,
HitboxStream,
MixerStream,
PicartoStream,
TwitchCommunity,
YoutubeStream,
)
from .errors import (
OfflineStream,
StreamNotFound,
APIError,
InvalidYoutubeCredentials,
CommunityNotFound,
OfflineCommunity,
StreamsError,
InvalidTwitchCredentials,
)
from . import streamtypes as StreamClasses
from collections import defaultdict
import asyncio
import re
CHECK_DELAY = 60
_ = Translator("Streams", __file__)
@cog_i18n(_)
class Streams:
global_defaults = {"tokens": {}, "streams": [], "communities": []}
guild_defaults = {"autodelete": False, "mention_everyone": False, "mention_here": False}
role_defaults = {"mention": False}
def __init__(self, bot: Red):
self.db = Config.get_conf(self, 26262626)
self.db.register_global(**self.global_defaults)
self.db.register_guild(**self.guild_defaults)
self.db.register_role(**self.role_defaults)
self.bot = bot
self.bot.loop.create_task(self._initialize_lists())
self.yt_cid_pattern = re.compile("^UC[-_A-Za-z0-9]{21}[AQgw]$")
def check_name_or_id(self, data: str):
matched = self.yt_cid_pattern.fullmatch(data)
if matched is None:
return True
return False
async def _initialize_lists(self):
self.streams = await self.load_streams()
self.communities = await self.load_communities()
self.task = self.bot.loop.create_task(self._stream_alerts())
@commands.command()
async def twitch(self, ctx: commands.Context, channel_name: str):
"""Checks if a Twitch channel is streaming"""
token = await self.db.tokens.get_raw(TwitchStream.__name__, default=None)
stream = TwitchStream(name=channel_name, token=token)
await self.check_online(ctx, stream)
@commands.command()
async def youtube(self, ctx: commands.Context, channel_id_or_name: str):
"""Checks if a Youtube channel is streaming"""
apikey = await self.db.tokens.get_raw(YoutubeStream.__name__, default=None)
is_name = self.check_name_or_id(channel_id_or_name)
if is_name:
stream = YoutubeStream(name=channel_id_or_name, token=apikey)
else:
stream = YoutubeStream(id=channel_id_or_name, token=apikey)
await self.check_online(ctx, stream)
@commands.command()
async def hitbox(self, ctx: commands.Context, channel_name: str):
"""Checks if a Hitbox channel is streaming"""
stream = HitboxStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def mixer(self, ctx: commands.Context, channel_name: str):
"""Checks if a Mixer channel is streaming"""
stream = MixerStream(name=channel_name)
await self.check_online(ctx, stream)
@commands.command()
async def picarto(self, ctx: commands.Context, channel_name: str):
"""Checks if a Picarto channel is streaming"""
stream = PicartoStream(name=channel_name)
await self.check_online(ctx, stream)
async def check_online(self, ctx: commands.Context, stream):
try:
embed = await stream.is_online()
except OfflineStream:
await ctx.send(_("The stream is offline."))
except StreamNotFound:
await ctx.send(_("The channel doesn't seem to exist."))
except InvalidTwitchCredentials:
await ctx.send(
_("The twitch token is either invalid or has not been set. See `{}`.").format(
"{}streamset twitchtoken".format(ctx.prefix)
)
)
except InvalidYoutubeCredentials:
await ctx.send(
_("The Youtube API key is either invalid or has not been set. See {}.").format(
"`{}streamset youtubekey`".format(ctx.prefix)
)
)
except APIError:
await ctx.send(
_("Something went wrong whilst trying to contact the stream service's API.")
)
else:
await ctx.send(embed=embed)
@commands.group()
@commands.guild_only()
@checks.mod()
async def streamalert(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamalert.group(name="twitch")
async def _twitch(self, ctx: commands.Context):
"""Twitch stream alerts"""
if ctx.invoked_subcommand is None or ctx.invoked_subcommand == self._twitch:
await ctx.send_help()
@_twitch.command(name="channel")
async def twitch_alert_channel(self, ctx: commands.Context, channel_name: str):
"""Sets a Twitch stream alert notification in the channel"""
await self.stream_alert(ctx, TwitchStream, channel_name.lower())
@_twitch.command(name="community")
async def twitch_alert_community(self, ctx: commands.Context, community: str):
"""Sets a Twitch stream alert notification in the channel for the specified community."""
await self.community_alert(ctx, TwitchCommunity, community.lower())
@streamalert.command(name="youtube")
async def youtube_alert(self, ctx: commands.Context, channel_name_or_id: str):
"""Sets a Youtube stream alert notification in the channel"""
await self.stream_alert(ctx, YoutubeStream, channel_name_or_id)
@streamalert.command(name="hitbox")
async def hitbox_alert(self, ctx: commands.Context, channel_name: str):
"""Sets a Hitbox stream alert notification in the channel"""
await self.stream_alert(ctx, HitboxStream, channel_name)
@streamalert.command(name="mixer")
async def mixer_alert(self, ctx: commands.Context, channel_name: str):
"""Sets a Mixer stream alert notification in the channel"""
await self.stream_alert(ctx, MixerStream, channel_name)
@streamalert.command(name="picarto")
async def picarto_alert(self, ctx: commands.Context, channel_name: str):
"""Sets a Picarto stream alert notification in the channel"""
await self.stream_alert(ctx, PicartoStream, channel_name)
@streamalert.command(name="stop")
async def streamalert_stop(self, ctx: commands.Context, _all: bool = False):
"""Stops all stream notifications in the channel
Adding 'yes' will disable all notifications in the server"""
streams = self.streams.copy()
local_channel_ids = [c.id for c in ctx.guild.channels]
to_remove = []
for stream in streams:
for channel_id in stream.channels:
if channel_id == ctx.channel.id:
stream.channels.remove(channel_id)
elif _all and ctx.channel.id in local_channel_ids:
if channel_id in stream.channels:
stream.channels.remove(channel_id)
if not stream.channels:
to_remove.append(stream)
for stream in to_remove:
streams.remove(stream)
self.streams = streams
await self.save_streams()
msg = _("All {}'s stream alerts have been disabled." "").format(
"server" if _all else "channel"
)
await ctx.send(msg)
@streamalert.command(name="list")
async def streamalert_list(self, ctx: commands.Context):
streams_list = defaultdict(list)
guild_channels_ids = [c.id for c in ctx.guild.channels]
msg = _("Active stream alerts:\n\n")
for stream in self.streams:
for channel_id in stream.channels:
if channel_id in guild_channels_ids:
streams_list[channel_id].append(stream.name.lower())
if not streams_list:
await ctx.send(_("There are no active stream alerts in this server."))
return
for channel_id, streams in streams_list.items():
channel = ctx.guild.get_channel(channel_id)
msg += "** - #{}**\n{}\n".format(channel, ", ".join(streams))
for page in pagify(msg):
await ctx.send(page)
async def stream_alert(self, ctx: commands.Context, _class, channel_name):
stream = self.get_stream(_class, channel_name)
if not stream:
token = await self.db.tokens.get_raw(_class.__name__, default=None)
is_yt = _class.__name__ == "YoutubeStream"
if is_yt and not self.check_name_or_id(channel_name):
stream = _class(id=channel_name, token=token)
else:
stream = _class(name=channel_name, token=token)
try:
exists = await self.check_exists(stream)
except InvalidTwitchCredentials:
await ctx.send(
_("The twitch token is either invalid or has not been set. " "See {}.").format(
"`{}streamset twitchtoken`".format(ctx.prefix)
)
)
return
except InvalidYoutubeCredentials:
await ctx.send(
_(
"The Youtube API key is either invalid or has not been set. " "See {}."
).format("`{}streamset youtubekey`".format(ctx.prefix))
)
return
except APIError:
await ctx.send(
_("Something went wrong whilst trying to contact the stream service's API.")
)
return
else:
if not exists:
await ctx.send(_("That channel doesn't seem to exist."))
return
await self.add_or_remove(ctx, stream)
async def community_alert(self, ctx: commands.Context, _class, community_name):
community = self.get_community(_class, community_name)
if not community:
token = await self.db.tokens.get_raw(_class.__name__, default=None)
community = _class(name=community_name, token=token)
try:
await community.get_community_streams()
except InvalidTwitchCredentials:
await ctx.send(
_("The twitch token is either invalid or has not been set. See {}.").format(
"`{}streamset twitchtoken`".format(ctx.prefix)
)
)
return
except CommunityNotFound:
await ctx.send(_("That community doesn't seem to exist."))
return
except APIError:
await ctx.send(
_("Something went wrong whilst trying to contact the stream service's API.")
)
return
except OfflineCommunity:
pass
await self.add_or_remove_community(ctx, community)
@commands.group()
@checks.mod()
async def streamset(self, ctx: commands.Context):
if ctx.invoked_subcommand is None:
await ctx.send_help()
@streamset.command()
@checks.is_owner()
async def twitchtoken(self, ctx: commands.Context, token: str):
"""Set the Client ID for twitch.
To do this, follow these steps:
1. Go to this page: https://dev.twitch.tv/dashboard/apps.
2. Click *Register Your Application*
3. Enter a name, set the OAuth Redirect URI to `http://localhost`, and
select an Application Category of your choosing.
4. Click *Register*, and on the following page, copy the Client ID.
5. Paste the Client ID into this command. Done!
"""
await self.db.tokens.set_raw("TwitchStream", value=token)
await self.db.tokens.set_raw("TwitchCommunity", value=token)
await ctx.send(_("Twitch token set."))
@streamset.command()
@checks.is_owner()
async def youtubekey(self, ctx: commands.Context, key: str):
"""Sets the API key for Youtube.
To get one, do the following:
1. Create a project (see https://support.google.com/googleapi/answer/6251787 for details)
2. Enable the Youtube Data API v3 (see https://support.google.com/googleapi/answer/6158841 for instructions)
3. Set up your API key (see https://support.google.com/googleapi/answer/6158862 for instructions)
4. Copy your API key and paste it into this command. Done!
"""
await self.db.tokens.set_raw("YoutubeStream", value=key)
await ctx.send(_("Youtube key set."))
@streamset.group()
@commands.guild_only()
async def mention(self, ctx: commands.Context):
"""Sets mentions for stream alerts."""
if ctx.invoked_subcommand is None or ctx.invoked_subcommand == self.mention:
await ctx.send_help()
@mention.command(aliases=["everyone"])
@commands.guild_only()
async def all(self, ctx: commands.Context):
"""Toggles everyone mention"""
guild = ctx.guild
current_setting = await self.db.guild(guild).mention_everyone()
if current_setting:
await self.db.guild(guild).mention_everyone.set(False)
await ctx.send(
_("{} will no longer be mentioned for a stream alert.").format("@\u200beveryone")
)
else:
await self.db.guild(guild).mention_everyone.set(True)
await ctx.send(
_(
"When a stream configured for stream alerts "
"comes online, {} will be mentioned."
).format("@\u200beveryone")
)
@mention.command(aliases=["here"])
@commands.guild_only()
async def online(self, ctx: commands.Context):
"""Toggles here mention"""
guild = ctx.guild
current_setting = await self.db.guild(guild).mention_here()
if current_setting:
await self.db.guild(guild).mention_here.set(False)
await ctx.send(
_("{} will no longer be mentioned for a stream alert.").format("@\u200bhere")
)
else:
await self.db.guild(guild).mention_here.set(True)
await ctx.send(
_(
"When a stream configured for stream alerts "
"comes online, {} will be mentioned."
).format("@\u200bhere")
)
@mention.command()
@commands.guild_only()
async def role(self, ctx: commands.Context, *, role: discord.Role):
"""Toggles role mention"""
current_setting = await self.db.role(role).mention()
if not role.mentionable:
await ctx.send("That role is not mentionable!")
return
if current_setting:
await self.db.role(role).mention.set(False)
await ctx.send(
_("{} will no longer be mentioned for a stream alert.").format(
"@\u200b{}".format(role.name)
)
)
else:
await self.db.role(role).mention.set(True)
await ctx.send(
_(
"When a stream configured for stream alerts "
"comes online, {} will be mentioned."
""
).format("@\u200b{}".format(role.name))
)
@streamset.command()
@commands.guild_only()
async def autodelete(self, ctx: commands.Context, on_off: bool):
"""Toggles automatic deletion of notifications for streams that go offline"""
await self.db.guild(ctx.guild).autodelete.set(on_off)
if on_off:
await ctx.send("The notifications will be deleted once streams go offline.")
else:
await ctx.send("Notifications will never be deleted.")
async def add_or_remove(self, ctx: commands.Context, stream):
if ctx.channel.id not in stream.channels:
stream.channels.append(ctx.channel.id)
if stream not in self.streams:
self.streams.append(stream)
await ctx.send(
_("I'll send a notification in this channel when {} is online.").format(
stream.name
)
)
else:
stream.channels.remove(ctx.channel.id)
if not stream.channels:
self.streams.remove(stream)
await ctx.send(
_("I won't send notifications about {} in this channel anymore.").format(
stream.name
)
)
await self.save_streams()
async def add_or_remove_community(self, ctx: commands.Context, community):
if ctx.channel.id not in community.channels:
community.channels.append(ctx.channel.id)
if community not in self.communities:
self.communities.append(community)
await ctx.send(
_(
"I'll send a notification in this channel when a "
"channel is streaming to the {} community."
""
).format(community.name)
)
else:
community.channels.remove(ctx.channel.id)
if not community.channels:
self.communities.remove(community)
await ctx.send(
_(
"I won't send notifications about channels streaming "
"to the {} community in this channel anymore."
""
).format(community.name)
)
await self.save_communities()
def get_stream(self, _class, name):
for stream in self.streams:
# if isinstance(stream, _class) and stream.name == name:
# return stream
# Reloading this cog causes an issue with this check ^
# isinstance will always return False
# As a workaround, we'll compare the class' name instead.
# Good enough.
if _class.__name__ == "YoutubeStream" and stream.type == _class.__name__:
# Because name could be a username or a channel id
if self.check_name_or_id(name) and stream.name.lower() == name.lower():
return stream
elif not self.check_name_or_id(name) and stream.id == name:
return stream
if stream.type == _class.__name__ and stream.name.lower() == name.lower():
return stream
def get_community(self, _class, name):
for community in self.communities:
if community.type == _class.__name__ and community.name.lower() == name.lower():
return community
async def check_exists(self, stream):
try:
await stream.is_online()
except OfflineStream:
pass
except StreamNotFound:
return False
except StreamsError:
raise
return True
async def _stream_alerts(self):
while True:
try:
await self.check_streams()
except asyncio.CancelledError:
pass
try:
await self.check_communities()
except asyncio.CancelledError:
pass
await asyncio.sleep(CHECK_DELAY)
async def check_streams(self):
for stream in self.streams:
try:
embed = await stream.is_online()
except OfflineStream:
for message in stream._messages_cache:
try:
autodelete = await self.db.guild(message.guild).autodelete()
if autodelete:
await message.delete()
except:
pass
stream._messages_cache.clear()
await self.save_streams()
except:
pass
else:
if stream._messages_cache:
continue
for channel_id in stream.channels:
channel = self.bot.get_channel(channel_id)
mention_str = await self._get_mention_str(channel.guild)
if mention_str:
content = "{}, {} is online!".format(mention_str, stream.name)
else:
content = "{} is online!".format(stream.name)
try:
m = await channel.send(content, embed=embed)
stream._messages_cache.append(m)
await self.save_streams()
except:
pass
async def _get_mention_str(self, guild: discord.Guild):
settings = self.db.guild(guild)
mentions = []
if await settings.mention_everyone():
mentions.append("@everyone")
if await settings.mention_here():
mentions.append("@here")
for role in guild.roles:
if await self.db.role(role).mention():
mentions.append(role.mention)
return " ".join(mentions)
async def check_communities(self):
for community in self.communities:
try:
stream_list = await community.get_community_streams()
except CommunityNotFound:
print(_("Community {} not found!").format(community.name))
continue
except OfflineCommunity:
for message in community._messages_cache:
try:
autodelete = await self.db.guild(message.guild).autodelete()
if autodelete:
await message.delete()
except:
pass
community._messages_cache.clear()
await self.save_communities()
except:
pass
else:
for channel in community.channels:
chn = self.bot.get_channel(channel)
streams = await self.filter_streams(stream_list, chn)
emb = await community.make_embed(streams)
chn_msg = [m for m in community._messages_cache if m.channel == chn]
if not chn_msg:
mentions = await self._get_mention_str(chn.guild)
if mentions:
msg = await chn.send(mentions, embed=emb)
else:
msg = await chn.send(embed=emb)
community._messages_cache.append(msg)
await self.save_communities()
else:
chn_msg = sorted(chn_msg, key=lambda x: x.created_at, reverse=True)[0]
community._messages_cache.remove(chn_msg)
await chn_msg.edit(embed=emb)
community._messages_cache.append(chn_msg)
await self.save_communities()
async def filter_streams(self, streams: list, channel: discord.TextChannel) -> list:
filtered = []
for stream in streams:
tw_id = str(stream["channel"]["_id"])
for alert in self.streams:
if isinstance(alert, TwitchStream) and alert.id == tw_id:
if channel.id in alert.channels:
break
else:
filtered.append(stream)
return filtered
async def load_streams(self):
streams = []
for raw_stream in await self.db.streams():
_class = getattr(StreamClasses, raw_stream["type"], None)
if not _class:
continue
raw_msg_cache = raw_stream["messages"]
raw_stream["_messages_cache"] = []
for raw_msg in raw_msg_cache:
chn = self.bot.get_channel(raw_msg["channel"])
msg = await chn.get_message(raw_msg["message"])
raw_stream["_messages_cache"].append(msg)
token = await self.db.tokens.get_raw(_class.__name__)
streams.append(_class(token=token, **raw_stream))
# issue 1191 extended resolution: Remove this after suitable period
# Fast dedupe below
seen = set()
seen_add = seen.add
return [x for x in streams if not (x.name.lower() in seen or seen_add(x.name.lower()))]
# return streams
async def load_communities(self):
communities = []
for raw_community in await self.db.communities():
_class = getattr(StreamClasses, raw_community["type"], None)
if not _class:
continue
raw_msg_cache = raw_community["messages"]
raw_community["_messages_cache"] = []
for raw_msg in raw_msg_cache:
chn = self.bot.get_channel(raw_msg["channel"])
msg = await chn.get_message(raw_msg["message"])
raw_community["_messages_cache"].append(msg)
token = await self.db.tokens.get_raw(_class.__name__, default=None)
communities.append(_class(token=token, **raw_community))
# issue 1191 extended resolution: Remove this after suitable period
# Fast dedupe below
seen = set()
seen_add = seen.add
return [x for x in communities if not (x.name.lower() in seen or seen_add(x.name.lower()))]
# return communities
async def save_streams(self):
raw_streams = []
for stream in self.streams:
raw_streams.append(stream.export())
await self.db.streams.set(raw_streams)
async def save_communities(self):
raw_communities = []
for community in self.communities:
raw_communities.append(community.export())
await self.db.communities.set(raw_communities)
def __unload(self):
self.task.cancel()