mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 11:18:54 -05:00
* create cog disbale base * Because defaults... * lol * announcer needs to respect this * defaultdict mishap * Allow None as guild - Mostly for interop with with ctx.guild * a whitespace issue * Apparently, I broke this too * Apply suggestions from code review Co-authored-by: jack1142 <6032823+jack1142@users.noreply.github.com> * This can probably be more optimized later, but since this is a cached value, it's not a large issue * Report tunnel closing * mod too * whitespace issue * Fix Artifact of prior method naming * these 3 places should have the check if i understood it correctly * Announce the closed tunnels * tunnel oversight * Make the player stop at next track * added where draper said to put it * Apply suggestions from code review Co-authored-by: jack1142 <6032823+jack1142@users.noreply.github.com> Co-authored-by: jack1142 <6032823+jack1142@users.noreply.github.com> Co-authored-by: Drapersniper <27962761+drapersniper@users.noreply.github.com>
75 lines
2.2 KiB
Python
75 lines
2.2 KiB
Python
import asyncio
|
|
from typing import Optional
|
|
|
|
import discord
|
|
from redbot.core import commands
|
|
from redbot.core.i18n import Translator
|
|
from redbot.core.utils import AsyncIter
|
|
from redbot.core.utils.chat_formatting import humanize_list, inline
|
|
|
|
_ = Translator("Announcer", __file__)
|
|
|
|
|
|
class Announcer:
|
|
def __init__(self, ctx: commands.Context, message: str, config=None):
|
|
"""
|
|
:param ctx:
|
|
:param message:
|
|
:param config: Used to determine channel overrides
|
|
"""
|
|
self.ctx = ctx
|
|
self.message = message
|
|
self.config = config
|
|
|
|
self.active = None
|
|
|
|
def start(self):
|
|
"""
|
|
Starts an announcement.
|
|
:return:
|
|
"""
|
|
if self.active is None:
|
|
self.active = True
|
|
self.ctx.bot.loop.create_task(self.announcer())
|
|
|
|
def cancel(self):
|
|
"""
|
|
Cancels a running announcement.
|
|
:return:
|
|
"""
|
|
self.active = False
|
|
|
|
async def _get_announce_channel(self, guild: discord.Guild) -> Optional[discord.TextChannel]:
|
|
if await self.ctx.bot.cog_disabled_in_guild_raw("Admin", guild.id):
|
|
return
|
|
channel_id = await self.config.guild(guild).announce_channel()
|
|
return guild.get_channel(channel_id)
|
|
|
|
async def announcer(self):
|
|
guild_list = self.ctx.bot.guilds
|
|
failed = []
|
|
async for g in AsyncIter(guild_list, delay=0.5):
|
|
if not self.active:
|
|
return
|
|
|
|
channel = await self._get_announce_channel(g)
|
|
|
|
if channel:
|
|
if channel.permissions_for(g.me).send_messages:
|
|
try:
|
|
await channel.send(self.message)
|
|
except discord.Forbidden:
|
|
failed.append(str(g.id))
|
|
else:
|
|
failed.append(str(g.id))
|
|
|
|
if failed:
|
|
msg = (
|
|
_("I could not announce to the following server: ")
|
|
if len(failed) == 1
|
|
else _("I could not announce to the following servers: ")
|
|
)
|
|
msg += humanize_list(tuple(map(inline, failed)))
|
|
await self.ctx.bot.send_to_owners(msg)
|
|
self.active = False
|