mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-12-22 01:02:34 -05:00
First commit
This commit is contained in:
614
cogs/audio.py
Normal file
614
cogs/audio.py
Normal file
@@ -0,0 +1,614 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
import asyncio
|
||||
import threading
|
||||
import youtube_dl
|
||||
import os
|
||||
from random import choice as rndchoice
|
||||
from random import shuffle
|
||||
from .utils.dataIO import fileIO
|
||||
from .utils import checks
|
||||
import glob
|
||||
import re
|
||||
import aiohttp
|
||||
from bs4 import BeautifulSoup
|
||||
import __main__
|
||||
import json
|
||||
|
||||
if not discord.opus.is_loaded():
|
||||
discord.opus.load_opus('libopus-0.dll')
|
||||
|
||||
main_path = os.path.dirname(os.path.realpath(__main__.__file__))
|
||||
|
||||
settings = {"VOLUME" : 0.5, "MAX_LENGTH" : 3700, "QUEUE_MODE" : True}
|
||||
|
||||
youtube_dl_options = {
|
||||
'format': 'bestaudio/best',
|
||||
'extractaudio': True,
|
||||
'audioformat': "mp3",
|
||||
'outtmpl': '%(id)s',
|
||||
'noplaylist': True,
|
||||
'nocheckcertificate': True,
|
||||
'ignoreerrors': True,
|
||||
'quiet': True,
|
||||
'no_warnings': True,
|
||||
'outtmpl': "data/audio/cache/%(id)s"}
|
||||
|
||||
class Audio:
|
||||
"""Music streaming."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.music_player = EmptyPlayer()
|
||||
self.queue_mode = False
|
||||
self.queue = []
|
||||
self.playlist = []
|
||||
self.current = -1 #current track index in self.playlist
|
||||
self.downloader = {"DONE" : False, "TITLE" : False, "ID" : False, "URL" : False, "DURATION" : False, "DOWNLOADING" : False}
|
||||
self.quit_manager = False
|
||||
self.skip_votes = []
|
||||
|
||||
self.sing = ["https://www.youtube.com/watch?v=zGTkAVsrfg8", "https://www.youtube.com/watch?v=cGMWL8cOeAU",
|
||||
"https://www.youtube.com/watch?v=vFrjMq4aL-g", "https://www.youtube.com/watch?v=WROI5WYBU_A",
|
||||
"https://www.youtube.com/watch?v=41tIUr_ex3g", "https://www.youtube.com/watch?v=f9O2Rjn1azc"]
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def play(self, ctx, link : str):
|
||||
"""Plays link
|
||||
"""
|
||||
if self.downloader["DOWNLOADING"]:
|
||||
await self.bot.say("I'm already downloading a track.")
|
||||
return
|
||||
msg = ctx.message
|
||||
if await self.check_voice(msg.author, msg):
|
||||
if self.is_playlist_valid([link]): # reusing a function
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
self.queue = []
|
||||
self.playlist = []
|
||||
self.current = -1
|
||||
await self.play_video(link)
|
||||
else:
|
||||
self.playlist = []
|
||||
self.current = -1
|
||||
if not queue: await self.bot.say("The link has been put into queue.")
|
||||
self.queue.append(link)
|
||||
else:
|
||||
await self.bot.say("That link is not allowed.")
|
||||
|
||||
@commands.command(aliases=["title"])
|
||||
async def song(self):
|
||||
"""Shows song title
|
||||
"""
|
||||
if self.downloader["TITLE"]:
|
||||
await self.bot.say(self.downloader["TITLE"])
|
||||
else:
|
||||
await self.bot.say("No title available.")
|
||||
|
||||
@commands.command(name="playlist", pass_context=True, no_pm=True)
|
||||
async def _playlist(self, ctx, name : str): #some checks here
|
||||
"""Plays saved playlist
|
||||
"""
|
||||
await self.start_playlist(ctx, name, random=False)
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def mix(self, ctx, name : str): #some checks here
|
||||
"""Plays saved playlist (shuffled)
|
||||
"""
|
||||
await self.start_playlist(ctx, name, random=True)
|
||||
|
||||
async def start_playlist(self, ctx, name, random=None):
|
||||
if self.downloader["DOWNLOADING"]:
|
||||
await self.bot.say("I'm already downloading a track.")
|
||||
return
|
||||
msg = ctx.message
|
||||
name += ".txt"
|
||||
if await self.check_voice(msg.author, msg):
|
||||
if os.path.isfile("data/audio/playlists/" + name):
|
||||
self.queue = []
|
||||
self.current = -1
|
||||
self.playlist = fileIO("data/audio/playlists/" + name, "load")["playlist"]
|
||||
if random: shuffle(self.playlist)
|
||||
self.music_player.stop()
|
||||
|
||||
@commands.command(pass_context=True, aliases=["next"], no_pm=True)
|
||||
async def skip(self, ctx):
|
||||
"""Skips song
|
||||
"""
|
||||
msg = ctx.message
|
||||
if self.music_player.is_playing():
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
self.music_player.stop()
|
||||
else:
|
||||
await self.vote_skip(msg)
|
||||
|
||||
async def vote_skip(self, msg):
|
||||
v_channel = msg.server.me.voice_channel
|
||||
if msg.author.voice_channel.id == v_channel.id:
|
||||
if msg.author.id in self.skip_votes:
|
||||
await self.bot.say("You already voted.")
|
||||
return
|
||||
self.skip_votes.append(msg.author.id)
|
||||
if msg.server.me.id not in self.skip_votes: self.skip_votes.append(msg.server.me.id)
|
||||
current_users = []
|
||||
for m in v_channel.voice_members:
|
||||
current_users.append(m.id)
|
||||
|
||||
clean_skip_votes = [] #Removes votes of people no longer in the channel
|
||||
for m_id in self.skip_votes:
|
||||
if m_id in current_users:
|
||||
clean_skip_votes.append(m_id)
|
||||
self.skip_votes = clean_skip_votes
|
||||
|
||||
votes_needed = int((len(current_users)-1) / 2)
|
||||
|
||||
if len(self.skip_votes)-1 >= votes_needed:
|
||||
self.music_player.stop()
|
||||
self.skip_votes = []
|
||||
return
|
||||
await self.bot.say("You voted to skip. Votes: [{0}/{1}]".format(str(len(self.skip_votes)-1), str(votes_needed)))
|
||||
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def local(self, ctx, name : str):
|
||||
"""Plays a local playlist"""
|
||||
if self.downloader["DOWNLOADING"]:
|
||||
await self.bot.say("I'm already downloading a track.")
|
||||
return
|
||||
msg = ctx.message
|
||||
localplaylists = self.get_local_playlists()
|
||||
if localplaylists and ("data/audio/localtracks/" not in name and "\\" not in name):
|
||||
if name in localplaylists:
|
||||
files = []
|
||||
if glob.glob("data/audio/localtracks/" + name + "/*.mp3"):
|
||||
files.extend(glob.glob("data/audio/localtracks/" + name + "/*.mp3"))
|
||||
if glob.glob("data/audio/localtracks/" + name + "/*.flac"):
|
||||
files.extend(glob.glob("data/audio/localtracks/" + name + "/*.flac"))
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
if await self.check_voice(msg.author, ctx.message):
|
||||
self.queue = []
|
||||
self.current = -1
|
||||
self.playlist = files
|
||||
self.music_player.stop()
|
||||
else:
|
||||
await self.bot.say("I'm in queue mode. Controls are disabled if you're in a room with multiple people.")
|
||||
else:
|
||||
await self.bot.say("There is no local playlist with that name.")
|
||||
else:
|
||||
await self.bot.say(message.channel, "There are no valid playlists in the localtracks folder.")
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def loop(self, ctx):
|
||||
"""Loops single song
|
||||
"""
|
||||
msg = ctx.message
|
||||
if self.music_player.is_playing():
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
if self.playlist:
|
||||
self.playlist = self.playlist[[self.current]]
|
||||
elif self.queue:
|
||||
self.playlist = self.playlist[[self.queue[0]]]
|
||||
await self.bot.say("I will play this song on repeat.")
|
||||
else:
|
||||
await self.bot.say("I'm in queue mode. Controls are disabled if you're in a room with multiple people.")
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def shuffle(self, ctx):
|
||||
"""Shuffle playlist
|
||||
"""
|
||||
msg = ctx.message
|
||||
if self.music_player.is_playing():
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
if self.playlist:
|
||||
shuffle(self.playlist)
|
||||
await self.bot.say("The order of this playlist has been mixed")
|
||||
else:
|
||||
await self.bot.say("I'm in queue mode. Controls are disabled if you're in a room with multiple people.")
|
||||
|
||||
@commands.command(pass_context=True, aliases=["previous"], no_pm=True) #TODO, PLAYLISTS
|
||||
async def prev(self, ctx):
|
||||
"""Previous song
|
||||
"""
|
||||
msg = ctx.message
|
||||
if self.music_player.is_playing() and self.playlist:
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
self.current -= 2
|
||||
if self.current == -1:
|
||||
self.current = len(self.playlist) -3
|
||||
elif self.current == -2:
|
||||
self.current = len(self.playlist) -2
|
||||
self.music_player.stop()
|
||||
|
||||
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def stop(self, ctx):
|
||||
"""Stops audio activity
|
||||
"""
|
||||
msg = ctx.message
|
||||
if self.music_player.is_playing():
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
await self.close_audio()
|
||||
else:
|
||||
await self.bot.say("You can't stop music when there are other people in the channel! Vote to skip instead.")
|
||||
else:
|
||||
await self.close_audio()
|
||||
|
||||
async def close_audio(self):
|
||||
self.queue = []
|
||||
self.playlist = []
|
||||
self.current = -1
|
||||
self.music_player.stop()
|
||||
await asyncio.sleep(1)
|
||||
await self.bot.voice.disconnect()
|
||||
|
||||
@commands.command(name="queue", pass_context=True, no_pm=True) #check that author is in the same channel as the bot
|
||||
async def _queue(self, ctx, link : str):
|
||||
"""Add link to queue
|
||||
"""
|
||||
if await self.check_voice(ctx.message.author, ctx.message):
|
||||
if not self.playlist:
|
||||
self.queue.append(link)
|
||||
await self.bot.say(self.queue)
|
||||
else:
|
||||
await self.bot.say("I'm already playing a playlist.")
|
||||
|
||||
async def is_alone_or_admin(self, author): #Direct control. fix everything
|
||||
if not settings["QUEUE_MODE"]:
|
||||
return True
|
||||
elif discord.utils.get(author.roles, name=checks.settings["ADMIN_ROLE"]) is not None:
|
||||
return True
|
||||
elif discord.utils.get(author.roles, name=checks.settings["MOD_ROLE"]) is not None:
|
||||
return True
|
||||
elif len(author.voice_channel.voice_members) == 2:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@commands.command(name="sing", pass_context=True, no_pm=True)
|
||||
async def _sing(self, ctx):
|
||||
"""Makes Red sing"""
|
||||
if self.downloader["DOWNLOADING"]:
|
||||
await self.bot.say("I'm already downloading a track.")
|
||||
return
|
||||
msg = ctx.message
|
||||
if await self.check_voice(msg.author, msg):
|
||||
if not self.music_player.is_playing():
|
||||
self.queue = []
|
||||
await self.play_video(rndchoice(self.sing))
|
||||
else:
|
||||
if await self.is_alone_or_admin(msg.author):
|
||||
self.queue = []
|
||||
await self.play_video(rndchoice(self.sing))
|
||||
else:
|
||||
await self.bot.say("I'm already playing music for someone else at the moment.")
|
||||
|
||||
@commands.group(name="list", pass_context=True)
|
||||
async def _list(self, ctx):
|
||||
"""Lists playlists"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await self.bot.say("Type help list for info.")
|
||||
|
||||
@_list.command(name="playlist", pass_context=True)
|
||||
async def list_playlist(self, ctx):
|
||||
msg = "Available playlists: \n\n```"
|
||||
files = os.listdir("data/audio/playlists/")
|
||||
if files:
|
||||
for i, f in enumerate(files):
|
||||
if f.endswith(".txt"):
|
||||
if i % 4 == 0 and i != 0:
|
||||
msg = msg + f.replace(".txt", "") + "\n"
|
||||
else:
|
||||
msg = msg + f.replace(".txt", "") + "\t"
|
||||
msg += "```"
|
||||
await self.bot.send_message(ctx.message.author, msg)
|
||||
else:
|
||||
await self.bot.say("There are no playlists.")
|
||||
|
||||
@_list.command(name="local", pass_context=True)
|
||||
async def list_local(self, ctx):
|
||||
msg = "Available local playlists: \n\n```"
|
||||
dirs = self.get_local_playlists()
|
||||
if dirs:
|
||||
for i, d in enumerate(dirs):
|
||||
if i % 4 == 0 and i != 0:
|
||||
msg = msg + d + "\n"
|
||||
else:
|
||||
msg = msg + d + "\t"
|
||||
msg += "```"
|
||||
await self.bot.send_message(ctx.message.author, msg)
|
||||
else:
|
||||
await self.bot.say("There are no local playlists.")
|
||||
|
||||
@commands.group(pass_context=True)
|
||||
@checks.mod_or_permissions()
|
||||
async def audioset(self, ctx):
|
||||
"""Changes audio module settings"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
msg = "```"
|
||||
for k, v in settings.items():
|
||||
msg += str(k) + ": " + str(v) + "\n"
|
||||
msg += "\nType help audioset to see the list of commands.```"
|
||||
await self.bot.say(msg)
|
||||
|
||||
@audioset.command(name="queue")
|
||||
async def queueset(self, status : str):
|
||||
"""Enables/disables queue"""
|
||||
global settings
|
||||
status = status.lower()
|
||||
if status == "on" or status == "true":
|
||||
settings["QUEUE_MODE"] = True
|
||||
await self.bot.say("Queue mode is now on.")
|
||||
elif status == "off" or status == "false":
|
||||
settings["QUEUE_MODE"] = False
|
||||
await self.bot.say("Queue mode is now off.")
|
||||
else:
|
||||
await self.bot.say("Queue status can be either on or off.")
|
||||
return
|
||||
self.save_settings()
|
||||
|
||||
@audioset.command()
|
||||
async def maxlength(self, length : int):
|
||||
"""Maximum track length for requested links"""
|
||||
global settings
|
||||
settings["MAX_LENGTH"] = length
|
||||
await self.bot.say("Maximum length is now " + str(length) + " seconds.")
|
||||
self.save_settings()
|
||||
|
||||
@audioset.command()
|
||||
async def volume(self, level : float):
|
||||
"""Sets the volume (0-1)"""
|
||||
global settings
|
||||
if level >= 0 and level <= 1:
|
||||
settings["VOLUME"] = level
|
||||
await self.bot.say("Volume is now set at " + str(level) + ". It will take effect after the current track.")
|
||||
self.save_settings()
|
||||
else:
|
||||
await self.bot.say("Volume must be between 0 and 1. Example: 0.40")
|
||||
|
||||
async def play_video(self, link):
|
||||
self.downloader = {"DONE" : False, "TITLE" : False, "ID" : False, "URL": False, "DURATION" : False, "DOWNLOADING" : False}
|
||||
if "https://" in link or "http://" in link:
|
||||
path = "data/audio/cache/"
|
||||
t = threading.Thread(target=self.get_video, args=(link,self,))
|
||||
t.start()
|
||||
else: #local
|
||||
path = ""
|
||||
self.downloader = {"DONE" : True, "TITLE" : link, "ID" : link, "URL": False, "DURATION" : False, "DOWNLOADING" : False}
|
||||
while not self.downloader["DONE"]:
|
||||
await asyncio.sleep(1)
|
||||
if self.downloader["ID"]:
|
||||
try:
|
||||
self.music_player.stop()
|
||||
self.music_player = self.bot.voice.create_ffmpeg_player(path + self.downloader["ID"], options='''-filter:a "volume={}"'''.format(settings["VOLUME"]))
|
||||
self.music_player.start()
|
||||
if path != "": await self.bot.change_status(discord.Game(name=self.downloader["TITLE"]))
|
||||
except discord.errors.ClientException:
|
||||
print("Error: I can't play music without ffmpeg. Install it.")
|
||||
self.downloader = {"DONE" : False, "TITLE" : False, "ID" : False, "URL": False, "DURATION" : False, "DOWNLOADING" : False}
|
||||
self.queue = []
|
||||
self.playlist = []
|
||||
except Exception as e:
|
||||
print(e)
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
async def check_voice(self, author, message):
|
||||
if self.bot.is_voice_connected():
|
||||
v_channel = message.server.me.voice_channel
|
||||
if author.voice_channel == v_channel:
|
||||
return True
|
||||
elif len(v_channel.voice_members) == 1:
|
||||
if author.is_voice_connected():
|
||||
if author.voice_channel.permissions_for(message.server.me).connect:
|
||||
await self.bot.join_voice_channel(author.voice_channel)
|
||||
return True
|
||||
else:
|
||||
await self.bot.say("I need permissions to join that voice channel.")
|
||||
return False
|
||||
else:
|
||||
await self.bot.say("You need to be in a voice channel.")
|
||||
return False
|
||||
else:
|
||||
if not self.playlist and not self.queue:
|
||||
return True
|
||||
else:
|
||||
await self.bot.say("I'm already playing music for other people.")
|
||||
return False
|
||||
elif author.voice_channel:
|
||||
if author.voice_channel.permissions_for(message.server.me).connect:
|
||||
await self.bot.join_voice_channel(author.voice_channel)
|
||||
return True
|
||||
else:
|
||||
await self.bot.say("I need permissions to join that voice channel.")
|
||||
return False
|
||||
else:
|
||||
await self.bot.say("You need to be in a voice channel.")
|
||||
return False
|
||||
|
||||
async def queue_manager(self):
|
||||
while not self.quit_manager:
|
||||
if self.queue and not self.music_player.is_playing():
|
||||
new_link = self.queue[0]
|
||||
self.queue.pop(0)
|
||||
self.skip_votes = []
|
||||
await self.play_video(new_link)
|
||||
elif self.playlist and not self.music_player.is_playing():
|
||||
if not self.current == len(self.playlist)-1:
|
||||
self.current += 1
|
||||
else:
|
||||
self.current = 0
|
||||
new_link = self.playlist[self.current]
|
||||
self.skip_votes = []
|
||||
await self.play_video(new_link)
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def get_video(self, url, audio):
|
||||
try:
|
||||
self.downloader["DOWNLOADING"] = True
|
||||
yt = youtube_dl.YoutubeDL(youtube_dl_options)
|
||||
v = yt.extract_info(url, download=False)
|
||||
if v["duration"] > settings["MAX_LENGTH"]: raise MaximumLength("Track exceeded maximum length. See help audioset maxlength")
|
||||
if not os.path.isfile("data/audio/cache/" + v["id"]):
|
||||
v = yt.extract_info(url, download=True)
|
||||
audio.downloader = {"DONE" : True, "TITLE" : v["title"], "ID" : v["id"], "URL" : url, "DURATION" : v["duration"], "DOWNLOADING" : False} #Errors out here if invalid link
|
||||
except Exception as e:
|
||||
print(e) # TODO
|
||||
audio.downloader = {"DONE" : True, "TITLE" : False, "ID" : False, "URL" : False, "DOWNLOADING" : False}
|
||||
|
||||
async def incoming_messages(self, msg): # Workaround, need to fix
|
||||
if msg.author.id != self.bot.user.id:
|
||||
cmds = ("unload cogs.audio", "reload cogs.audio")
|
||||
|
||||
if msg.content in cmds:
|
||||
self.quit_manager = True
|
||||
if msg.channel.is_private and msg.attachments != []:
|
||||
await self.transfer_playlist(msg)
|
||||
if not msg.channel.is_private:
|
||||
if not self.playlist and not self.queue and not self.music_player.is_playing() and msg.server.me.game != None:
|
||||
await self.bot.change_status(None)
|
||||
|
||||
def get_local_playlists(self):
|
||||
dirs = []
|
||||
files = os.listdir("data/audio/localtracks/")
|
||||
for f in files:
|
||||
if os.path.isdir("data/audio/localtracks/" + f) and " " not in f:
|
||||
if glob.glob("data/audio/localtracks/" + f + "/*.mp3") != []:
|
||||
dirs.append(f)
|
||||
elif glob.glob("data/audio/localtracks/" + f + "/*.flac") != []:
|
||||
dirs.append(f)
|
||||
if dirs != []:
|
||||
return dirs
|
||||
else:
|
||||
return False
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def addplaylist(self, ctx, name : str, link : str): #CHANGE COMMAND NAME
|
||||
"""Adds tracks from youtube playlist link"""
|
||||
if self.is_playlist_name_valid(name) and len(name) < 25 and self.is_playlist_link_valid(link):
|
||||
if fileIO("playlists/" + name + ".txt", "check"):
|
||||
await self.bot.say("`A playlist with that name already exists.`")
|
||||
return False
|
||||
links = await self.parse_yt_playlist(link)
|
||||
if links:
|
||||
data = { "author" : ctx.message.author.id,
|
||||
"playlist": links,
|
||||
"link" : link}
|
||||
fileIO("data/audio/playlists/" + name + ".txt", "save", data)
|
||||
await self.bot.say("Playlist added. Name: {}".format(name))
|
||||
else:
|
||||
await self.bot.say("Something went wrong. Either the link was incorrect or I was unable to retrieve the page.")
|
||||
else:
|
||||
await self.bot.say("Something is wrong with the playlist's link or its filename. Remember, the name must be with only numbers, letters and underscores. Link must be this format: https://www.youtube.com/playlist?list=PLe8jmEHFkvsaDOOWcREvkgFoj6MD0pXXX")
|
||||
|
||||
async def transfer_playlist(self, message):
|
||||
msg = message.attachments[0]
|
||||
if msg["filename"].endswith(".txt"):
|
||||
if not fileIO("data/audio/playlists/" + msg["filename"], "check"): #returns false if file already exists
|
||||
r = await aiohttp.get(msg["url"])
|
||||
r = await r.text()
|
||||
data = r.replace("\r", "")
|
||||
data = data.split()
|
||||
if self.is_playlist_valid(data) and self.is_playlist_name_valid(msg["filename"].replace(".txt", "")):
|
||||
data = { "author" : message.author.id,
|
||||
"playlist": data,
|
||||
"link" : False}
|
||||
fileIO("data/audio/playlists/" + msg["filename"], "save", data)
|
||||
await self.bot.send_message(message.channel, "Playlist added. Name: {}".format(msg["filename"].replace(".txt", "")))
|
||||
else:
|
||||
await self.bot.send_message(message.channel, "Something is wrong with the playlist or its filename.") # Add formatting info
|
||||
else:
|
||||
await self.bot.send_message(message.channel, "A playlist with that name already exists. Change the filename and resubmit it.")
|
||||
|
||||
def is_playlist_valid(self, data):
|
||||
data = [y for y in data if y != ""] # removes all empty elements
|
||||
data = [y for y in data if y != "\n"]
|
||||
pattern = "|".join(fileIO("data/audio/accepted_links.json", "load"))
|
||||
for link in data:
|
||||
rr = re.search(pattern, link, re.I | re.U)
|
||||
if rr == None:
|
||||
return False
|
||||
return True
|
||||
|
||||
def is_playlist_link_valid(self, link):
|
||||
pattern = "^https:\/\/www.youtube.com\/playlist\?list=(.[^:/]*)"
|
||||
rr = re.search(pattern, link, re.I | re.U)
|
||||
if not rr == None:
|
||||
return rr.group(1)
|
||||
else:
|
||||
return False
|
||||
|
||||
def is_playlist_name_valid(self, name):
|
||||
for l in name:
|
||||
if l.isdigit() or l.isalpha() or l == "_":
|
||||
pass
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def parse_yt_playlist(self, url):
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
|
||||
try:
|
||||
page = await aiohttp.post(url, headers=headers)
|
||||
page = await page.text()
|
||||
soup = BeautifulSoup(page, 'html.parser')
|
||||
tags = soup.find_all("tr", class_="pl-video yt-uix-tile ")
|
||||
links = []
|
||||
|
||||
for tag in tags:
|
||||
links.append("https://www.youtube.com/watch?v=" + tag['data-video-id'])
|
||||
if links != []:
|
||||
return links
|
||||
else:
|
||||
return False
|
||||
except:
|
||||
return False
|
||||
|
||||
def save_settings(self):
|
||||
with open(main_path + "/data/audio/settings.json", "w") as f:
|
||||
f.write(json.dumps(settings))
|
||||
|
||||
class EmptyPlayer(): #dummy player
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
pass
|
||||
|
||||
def is_playing(self):
|
||||
return False
|
||||
|
||||
class MaximumLength(Exception):
|
||||
def __init__(self, m):
|
||||
self.message = m
|
||||
def __str__(self):
|
||||
return self.message
|
||||
|
||||
def check_folders():
|
||||
folders = ("data/audio", "data/audio/cache", "data/audio/playlists")
|
||||
for folder in folders:
|
||||
if not os.path.exists(folder):
|
||||
print("Creating " + folder + " folder...")
|
||||
os.makedirs(folder)
|
||||
|
||||
def check_files(n):
|
||||
if not os.path.isfile(main_path + "/data/audio/settings.json"):
|
||||
print("Creating default audio settings.json...")
|
||||
n.save_settings()
|
||||
|
||||
allowed = ["^(https:\/\/www\\.youtube\\.com\/watch\\?v=...........*)", "^(https:\/\/youtu.be\/...........*)",
|
||||
"^(https:\/\/youtube\\.com\/watch\\?v=...........*)", "^(https:\/\/soundcloud\\.com\/.*)"]
|
||||
|
||||
if not os.path.isfile(main_path + "/data/audio/accepted_links.json"):
|
||||
print("Creating accepted_links.json...")
|
||||
with open(main_path + "/data/audio/accepted_links.json", "w") as f:
|
||||
f.write(json.dumps(allowed))
|
||||
|
||||
def setup(bot):
|
||||
loop = asyncio.get_event_loop()
|
||||
n = Audio(bot)
|
||||
check_folders()
|
||||
check_files(n)
|
||||
loop.create_task(n.queue_manager())
|
||||
bot.add_listener(n.incoming_messages, "on_message")
|
||||
bot.add_cog(n)
|
||||
112
cogs/customcom.py
Normal file
112
cogs/customcom.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from .utils.dataIO import fileIO
|
||||
from .utils import checks
|
||||
import os
|
||||
|
||||
class CustomCommands:
|
||||
"""Custom commands."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.c_commands = fileIO("data/customcom/commands.json", "load")
|
||||
|
||||
@checks.mod_or_permissions()
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def addcom(self, ctx, command : str, *text):
|
||||
"""Adds a custom command
|
||||
|
||||
Example:
|
||||
!addcom yourcommand Text you want
|
||||
"""
|
||||
if text == ():
|
||||
await self.bot.say("addcom [command] [text/url]")
|
||||
return
|
||||
server = ctx.message.server
|
||||
channel = ctx.message.channel
|
||||
text = " ".join(text)
|
||||
if not server.id in self.c_commands:
|
||||
self.c_commands[server.id] = {}
|
||||
cmdlist = self.c_commands[server.id]
|
||||
if command not in cmdlist:
|
||||
cmdlist[command] = text
|
||||
self.c_commands[server.id] = cmdlist
|
||||
fileIO("data/customcom/commands.json", "save", self.c_commands)
|
||||
await self.bot.say("`Custom command successfully added.`")
|
||||
else:
|
||||
await self.bot.say("`This command already exists. Use editcom to edit it.`")
|
||||
|
||||
@checks.mod_or_permissions()
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def editcom(self, ctx, command : str, *text):
|
||||
"""Edits a custom command
|
||||
|
||||
Example:
|
||||
!editcom yourcommand Text you want
|
||||
"""
|
||||
if text == ():
|
||||
await self.bot.say("editcom [command] [text/url]")
|
||||
return
|
||||
server = ctx.message.server
|
||||
channel = ctx.message.channel
|
||||
text = " ".join(text)
|
||||
if server.id in self.c_commands:
|
||||
cmdlist = self.c_commands[server.id]
|
||||
if command in cmdlist:
|
||||
cmdlist[command] = text
|
||||
self.c_commands[server.id] = cmdlist
|
||||
fileIO("data/customcom/commands.json", "save", self.c_commands)
|
||||
await self.bot.say("`Custom command successfully edited.`")
|
||||
else:
|
||||
await self.bot.say("`That command doesn't exist. Use addcom [command] [text]`")
|
||||
else:
|
||||
await self.bot.say("`There are no custom commands in this server. Use addcom [command] [text]`")
|
||||
|
||||
@checks.mod_or_permissions()
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def delcom(self, ctx, command : str):
|
||||
"""Deletes a custom command
|
||||
|
||||
Example:
|
||||
!delcom yourcommand"""
|
||||
server = ctx.message.server
|
||||
channel = ctx.message.channel
|
||||
if server.id in self.c_commands:
|
||||
cmdlist = self.c_commands[server.id]
|
||||
if command in cmdlist:
|
||||
cmdlist.pop(command, None)
|
||||
self.c_commands[server.id] = cmdlist
|
||||
fileIO("data/customcom/commands.json", "save", self.c_commands)
|
||||
await self.bot.send_message(channel, "`Custom command successfully deleted.`")
|
||||
else:
|
||||
await self.bot.say("`That command doesn't exist.`")
|
||||
else:
|
||||
await self.bot.send_message(channel, "`There are no custom commands in this server. Use addcom [command] [text]`")
|
||||
|
||||
async def checkCC(self, message):
|
||||
if message.author.id == self.bot.user.id or len(message.content) < 2 or message.channel.is_private:
|
||||
return
|
||||
msg = message.content
|
||||
server = message.server
|
||||
if msg[0] in self.bot.command_prefix and server.id in self.c_commands.keys():
|
||||
cmdlist = self.c_commands[server.id]
|
||||
if msg[1:] in cmdlist:
|
||||
await self.bot.send_message(message.channel, cmdlist[msg[1:]])
|
||||
|
||||
def check_folders():
|
||||
if not os.path.exists("data/customcom"):
|
||||
print("Creating data/customcom folder...")
|
||||
os.makedirs("data/customcom")
|
||||
|
||||
def check_files():
|
||||
f = "data/customcom/commands.json"
|
||||
if not fileIO(f, "check"):
|
||||
print("Creating empty commands.json...")
|
||||
fileIO(f, "save", {})
|
||||
|
||||
def setup(bot):
|
||||
check_folders()
|
||||
check_files()
|
||||
n = CustomCommands(bot)
|
||||
bot.add_listener(n.checkCC, "on_message")
|
||||
bot.add_cog(n)
|
||||
277
cogs/general.py
Normal file
277
cogs/general.py
Normal file
@@ -0,0 +1,277 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from random import randint
|
||||
from random import choice as randchoice
|
||||
import datetime
|
||||
import time
|
||||
import aiohttp
|
||||
import asyncio
|
||||
|
||||
settings = {"POLL_DURATION" : 60}
|
||||
|
||||
class General:
|
||||
"""General commands."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.stopwatches = {}
|
||||
self.ball = ["As I see it, yes", "It is certain", "It is decidedly so", "Most likely", "Outlook good",
|
||||
"Signs point to yes", "Without a doubt", "Yes", "Yes – definitely", "You may rely on it", "Reply hazy, try again",
|
||||
"Ask again later", "Better not tell you now", "Cannot predict now", "Concentrate and ask again",
|
||||
"Don't count on it", "My reply is no", "My sources say no", "Outlook not so good", "Very doubtful"]
|
||||
self.poll_sessions = []
|
||||
|
||||
@commands.command()
|
||||
async def choose(self, *choices):
|
||||
"""Chooses between multiple choices.
|
||||
|
||||
To denote multiple choices, you should use double quotes.
|
||||
"""
|
||||
if len(choices) < 2:
|
||||
await self.bot.say('Not enough choices to pick from.')
|
||||
else:
|
||||
await self.bot.say(choice(choices))
|
||||
|
||||
@commands.command()
|
||||
async def roll(self, number : int = 100):
|
||||
"""Rolls random number (between 1 and user choice)
|
||||
|
||||
Defaults to 100.
|
||||
"""
|
||||
if number > 1:
|
||||
return await self.bot.say(":game_die: " + str(randint(1, number)) + " :game_die:")
|
||||
else:
|
||||
return await self.bot.say("Maybe higher than 1? ;P")
|
||||
|
||||
@commands.command(pass_context=True)
|
||||
async def flip(self, ctx, user : discord.Member=None):
|
||||
"""Flips a coin... or a user.
|
||||
|
||||
Defaults to coin.
|
||||
"""
|
||||
if user != None:
|
||||
msg = ""
|
||||
if user.id == self.bot.user.id:
|
||||
user = ctx.message.author
|
||||
msg = "Nice try. You think this is funny? How about *this* instead:\n\n"
|
||||
char = "abcdefghijklmnopqrstuvwxyz"
|
||||
tran = "ɐqɔpǝɟƃɥᴉɾʞlɯuodbɹsʇnʌʍxʎz"
|
||||
table = str.maketrans(char, tran)
|
||||
name = user.name.translate(table)
|
||||
char = char.upper()
|
||||
tran = "∀qƆpƎℲפHIſʞ˥WNOԀQᴚS┴∩ΛMX⅄Z"
|
||||
table = str.maketrans(char, tran)
|
||||
name = name.translate(table)
|
||||
return await self.bot.say(msg + "(╯°□°)╯︵ " + name[::-1])
|
||||
else:
|
||||
return await self.bot.say("*flips a coin and... " + randchoice(["HEADS!*", "TAILS!*"]))
|
||||
|
||||
@commands.command(pass_context=True)
|
||||
async def rps(self, ctx, choice : str):
|
||||
"""Play rock paper scissors"""
|
||||
author = ctx.message.author
|
||||
rpsbot = {"rock" : ":moyai:",
|
||||
"paper": ":page_facing_up:",
|
||||
"scissors":":scissors:"}
|
||||
choice = choice.lower()
|
||||
if choice in rpsbot.keys():
|
||||
botchoice = randchoice(list(rpsbot.keys()))
|
||||
msgs = {
|
||||
"win": " You win {}!".format(author.mention),
|
||||
"square": " We're square {}!".format(author.mention),
|
||||
"lose": " You lose {}!".format(author.mention)
|
||||
}
|
||||
if choice == botchoice:
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["square"])
|
||||
elif choice == "rock" and botchoice == "paper":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["lose"])
|
||||
elif choice == "rock" and botchoice == "scissors":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["win"])
|
||||
elif choice == "paper" and botchoice == "rock":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["win"])
|
||||
elif choice == "paper" and botchoice == "scissors":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["lose"])
|
||||
elif choice == "scissors" and botchoice == "rock":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["lose"])
|
||||
elif choice == "scissors" and botchoice == "paper":
|
||||
await self.bot.say(rpsbot[botchoice] + msgs["win"])
|
||||
else:
|
||||
await self.bot.say("Choose rock, paper or scissors.")
|
||||
|
||||
@commands.command(name="8", aliases=["8ball"])
|
||||
async def _8ball(self, question : str):
|
||||
"""Ask 8 ball a question
|
||||
|
||||
Question must end with a question mark.
|
||||
"""
|
||||
if question.endswith("?") and question != "?":
|
||||
return await self.bot.say("```" + randchoice(self.ball) + "```")
|
||||
else:
|
||||
return await self.bot.say("That doesn't look like a question.")
|
||||
|
||||
@commands.command(aliases=["sw"], pass_context=True)
|
||||
async def stopwatch(self, ctx):
|
||||
"""Starts/stops stopwatch"""
|
||||
author = ctx.message.author
|
||||
if not author.id in self.stopwatches:
|
||||
self.stopwatches[author.id] = int(time.perf_counter())
|
||||
await self.bot.say(author.mention + " Stopwatch started!")
|
||||
else:
|
||||
tmp = abs(self.stopwatches[author.id] - int(time.perf_counter()))
|
||||
tmp = str(datetime.timedelta(seconds=tmp))
|
||||
await self.bot.say(author.mention + " Stopwatch stopped! Time: **" + str(tmp) + "**")
|
||||
self.stopwatches.pop(author.id, None)
|
||||
|
||||
@commands.command()
|
||||
async def lmgtfy(self, *text):
|
||||
"""Creates a lmgtfy link"""
|
||||
if text == ():
|
||||
await self.bot.say("lmgtfy [search terms]")
|
||||
return
|
||||
text = "+".join(text)
|
||||
await self.bot.say("http://lmgtfy.com/?q=" + text)
|
||||
|
||||
@commands.command(no_pm=True, hidden=True)
|
||||
async def hug(self, member : discord.Member = None):
|
||||
"""Because everyone likes hugs"""
|
||||
await self.bot.say("(っ´▽`)っ" + " *" + member.name + "*")
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def info(self, ctx, member : discord.Member = None):
|
||||
"""Shows member's information"""
|
||||
author = ctx.message.author
|
||||
if not member:
|
||||
member = author
|
||||
roles = []
|
||||
for m in member.roles:
|
||||
if m.name != "@everyone":
|
||||
roles.append('"' + m.name + '"') #.replace("@", "@\u200b")
|
||||
if not roles: roles = ["None"]
|
||||
data = "```\n"
|
||||
data += "Name: " + member.name + "\n"
|
||||
data += "ID: " + member.id + "\n"
|
||||
data += "Joined: " + str(member.joined_at) + "\n"
|
||||
data += "Roles: " + " ".join(roles) + "\n"
|
||||
data += "Avatar: " + member.avatar_url + "\n"
|
||||
data += "```"
|
||||
await self.bot.say(data)
|
||||
|
||||
@commands.command()
|
||||
async def urban(self, *, search_terms : str):
|
||||
"""Urban Dictionary search"""
|
||||
search_terms = search_terms.split(" ")
|
||||
search_terms = "+".join(search_terms)
|
||||
search = "http://api.urbandictionary.com/v0/define?term=" + search_terms
|
||||
try:
|
||||
async with aiohttp.get(search) as r:
|
||||
result = await r.json()
|
||||
if result["list"] != []:
|
||||
definition = result['list'][0]['definition']
|
||||
example = result['list'][0]['example']
|
||||
await self.bot.say("**Definition:** " + definition + "\n\n" + "**Example:** " + example )
|
||||
else:
|
||||
await self.bot.say("Your search terms gave no results.")
|
||||
except:
|
||||
await self.bot.say("Error.")
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def poll(self, ctx, *text):
|
||||
"""Starts/stops a poll
|
||||
|
||||
Usage example:
|
||||
poll Is this a poll?;Yes;No;Maybe"""
|
||||
message = ctx.message
|
||||
print(text)
|
||||
if len(text) == 1:
|
||||
if text[0].lower() == "stop":
|
||||
await self.endpoll(message)
|
||||
return
|
||||
if not self.getPollByChannel(message):
|
||||
p = NewPoll(message, self)
|
||||
if p.valid:
|
||||
self.poll_sessions.append(p)
|
||||
await p.start()
|
||||
else:
|
||||
await self.bot.say("`poll question;option1;option2 (...)`")
|
||||
else:
|
||||
await self.bot.say("`A poll is already ongoing in this channel.`")
|
||||
|
||||
async def endpoll(self, message):
|
||||
if self.getPollByChannel(message):
|
||||
p = self.getPollByChannel(message)
|
||||
if p.author == message.author.id: # or isMemberAdmin(message)
|
||||
await self.getPollByChannel(message).endPoll()
|
||||
else:
|
||||
await self.bot.say("`Only admins and the author can stop the poll.`")
|
||||
else:
|
||||
await self.bot.say("`There's no poll ongoing in this channel.`")
|
||||
|
||||
def getPollByChannel(self, message):
|
||||
for poll in self.poll_sessions:
|
||||
if poll.channel == message.channel:
|
||||
return poll
|
||||
return False
|
||||
|
||||
async def check_poll_votes(self, message):
|
||||
if message.author.id != self.bot.user.id:
|
||||
if self.getPollByChannel(message):
|
||||
self.getPollByChannel(message).checkAnswer(message)
|
||||
|
||||
|
||||
class NewPoll():
|
||||
def __init__(self, message, main):
|
||||
self.channel = message.channel
|
||||
self.author = message.author.id
|
||||
self.client = main.bot
|
||||
self.poll_sessions = main.poll_sessions
|
||||
msg = message.content[6:]
|
||||
msg = msg.split(";")
|
||||
if len(msg) < 2: # Needs at least one question and 2 choices
|
||||
self.valid = False
|
||||
return None
|
||||
else:
|
||||
self.valid = True
|
||||
self.already_voted = []
|
||||
self.question = msg[0]
|
||||
msg.remove(self.question)
|
||||
self.answers = {}
|
||||
i = 1
|
||||
for answer in msg: # {id : {answer, votes}}
|
||||
self.answers[i] = {"ANSWER" : answer, "VOTES" : 0}
|
||||
i += 1
|
||||
|
||||
async def start(self):
|
||||
msg = "**POLL STARTED!**\n\n{}\n\n".format(self.question)
|
||||
for id, data in self.answers.items():
|
||||
msg += "{}. *{}*\n".format(id, data["ANSWER"])
|
||||
msg += "\nType the number to vote!"
|
||||
await self.client.send_message(self.channel, msg)
|
||||
await asyncio.sleep(settings["POLL_DURATION"])
|
||||
if self.valid:
|
||||
await self.endPoll()
|
||||
|
||||
async def endPoll(self):
|
||||
self.valid = False
|
||||
msg = "**POLL ENDED!**\n\n{}\n\n".format(self.question)
|
||||
for data in self.answers.values():
|
||||
msg += "*{}* - {} votes\n".format(data["ANSWER"], str(data["VOTES"]))
|
||||
await self.client.send_message(self.channel, msg)
|
||||
self.poll_sessions.remove(self)
|
||||
|
||||
def checkAnswer(self, message):
|
||||
try:
|
||||
i = int(message.content)
|
||||
if i in self.answers.keys():
|
||||
if message.author.id not in self.already_voted:
|
||||
data = self.answers[i]
|
||||
data["VOTES"] += 1
|
||||
self.answers[i] = data
|
||||
self.already_voted.append(message.author.id)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
|
||||
def setup(bot):
|
||||
n = General(bot)
|
||||
bot.add_listener(n.check_poll_votes, "on_message")
|
||||
bot.add_cog(n)
|
||||
199
cogs/mod.py
Normal file
199
cogs/mod.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from .utils import checks
|
||||
from .utils.dataIO import fileIO
|
||||
import __main__
|
||||
import os
|
||||
|
||||
main_path = os.path.dirname(os.path.realpath(__main__.__file__))
|
||||
|
||||
|
||||
class Mod:
|
||||
"""Moderation tools."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.whitelist_list = fileIO(main_path + "/data/mod/whitelist.json", "load")
|
||||
self.blacklist_list = fileIO(main_path + "/data/mod/blacklist.json", "load")
|
||||
|
||||
@commands.command(no_pm=True)
|
||||
@checks.admin_or_permissions(kick_members=True)
|
||||
async def kick(self, user : discord.Member):
|
||||
"""Kicks user."""
|
||||
try:
|
||||
await self.bot.kick(user)
|
||||
await self.bot.say("Done. That felt good.")
|
||||
except discord.errors.Forbidden:
|
||||
await self.bot.say("I'm not allowed to do that.")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@commands.command(no_pm=True)
|
||||
@checks.admin_or_permissions(ban_members=True)
|
||||
async def ban(self, user : discord.Member, purge_msg : int=0):
|
||||
"""Bans user and deletes last X days worth of messages.
|
||||
|
||||
Minimum 0 days, maximum 7. Defaults to 0."""
|
||||
if purge_msg < 0 or purge_msg > 7:
|
||||
await self.bot.say("Invalid days. Must be between 0 and 7.")
|
||||
return
|
||||
try:
|
||||
await self.bot.ban(user, days)
|
||||
await self.bot.say("Done. It was about time.")
|
||||
except discord.errors.Forbidden:
|
||||
await self.bot.say("I'm not allowed to do that.")
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@commands.group(pass_context=True, no_pm=True)
|
||||
@checks.mod_or_permissions(manage_messages=True)
|
||||
async def cleanup(self, ctx):
|
||||
"""Deletes messages.
|
||||
|
||||
cleanup messages [number]
|
||||
cleanup user [name/mention] [number]
|
||||
cleanup text \"Text here\" [number]"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await self.bot.say("Type help cleanup for info.")
|
||||
|
||||
@cleanup.command(pass_context=True, no_pm=True)
|
||||
async def text(self, ctx, text : str, number : int):
|
||||
"""Deletes last X messages matching the specified text.
|
||||
|
||||
Example:
|
||||
cleanup text \"test\" 5
|
||||
|
||||
Remember to use double quotes."""
|
||||
message = ctx.message
|
||||
cmdmsg = message
|
||||
if number > 0 and number < 10000:
|
||||
while True:
|
||||
new = False
|
||||
async for x in self.bot.logs_from(message.channel, limit=100, before=message):
|
||||
if number == 0:
|
||||
await self.bot.delete_message(cmdmsg)
|
||||
return
|
||||
if text in x.content:
|
||||
await self.bot.delete_message(x)
|
||||
number -= 1
|
||||
new = True
|
||||
message = x
|
||||
if not new or number == 0:
|
||||
await self.bot.delete_message(cmdmsg)
|
||||
break
|
||||
|
||||
@cleanup.command(pass_context=True, no_pm=True)
|
||||
async def user(self, ctx, name : discord.Member, number : int):
|
||||
"""Deletes last X messages from specified user.
|
||||
|
||||
Examples:
|
||||
cleanup @\u200bTwentysix 2
|
||||
cleanup Red 6"""
|
||||
message = ctx.message
|
||||
cmdmsg = message
|
||||
if number > 0 and number < 10000:
|
||||
while True:
|
||||
new = False
|
||||
async for x in self.bot.logs_from(message.channel, limit=100, before=message):
|
||||
if number == 0:
|
||||
await self.bot.delete_message(cmdmsg)
|
||||
return
|
||||
if x.author.id == name.id:
|
||||
await self.bot.delete_message(x)
|
||||
number -= 1
|
||||
new = True
|
||||
message = x
|
||||
if not new or number == 0:
|
||||
await self.bot.delete_message(cmdmsg)
|
||||
break
|
||||
|
||||
@cleanup.command(pass_context=True, no_pm=True)
|
||||
async def messages(self, ctx, number : int):
|
||||
"""Deletes last X messages.
|
||||
|
||||
Example:
|
||||
cleanup 26"""
|
||||
channel = ctx.message.channel
|
||||
if number > 0 and number < 10000:
|
||||
async for x in self.bot.logs_from(channel, limit=number+1):
|
||||
await self.bot.delete_message(x)
|
||||
|
||||
@commands.group(pass_context=True)
|
||||
@checks.admin_or_permissions(ban_members=True)
|
||||
async def blacklist(self, ctx):
|
||||
"""Bans user from using the bot"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await self.bot.say("Type help blacklist for info.")
|
||||
|
||||
@blacklist.command(name="add")
|
||||
async def _blacklist_add(self, user : discord.Member):
|
||||
"""Adds user to bot's blacklist"""
|
||||
if user.id not in self.blacklist_list:
|
||||
self.blacklist_list.append(user.id)
|
||||
fileIO(main_path + "/data/mod/blacklist.json", "save", self.blacklist_list)
|
||||
await self.bot.say("User has been added to blacklist.")
|
||||
else:
|
||||
await self.bot.say("User is already blacklisted.")
|
||||
|
||||
@blacklist.command(name="remove")
|
||||
async def _blacklist_remove(self, user : discord.Member):
|
||||
"""Removes user to bot's blacklist"""
|
||||
if user.id in self.blacklist_list:
|
||||
self.blacklist_list.remove(user.id)
|
||||
fileIO(main_path + "/data/mod/blacklist.json", "save", self.blacklist_list)
|
||||
await self.bot.say("User has been removed from blacklist.")
|
||||
else:
|
||||
await self.bot.say("User is not in blacklist.")
|
||||
|
||||
|
||||
@commands.group(pass_context=True)
|
||||
@checks.admin_or_permissions(ban_members=True)
|
||||
async def whitelist(self, ctx):
|
||||
"""Users who will be able to use the bot"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await self.bot.say("Type help whitelist for info.")
|
||||
|
||||
@whitelist.command(name="add")
|
||||
async def _whitelist_add(self, user : discord.Member):
|
||||
"""Adds user to bot's whitelist"""
|
||||
if user.id not in self.whitelist_list:
|
||||
if not self.whitelist_list:
|
||||
msg = "\nAll users not in whitelist will be ignored (owner, admins and mods excluded)"
|
||||
else:
|
||||
msg = ""
|
||||
self.whitelist_list.append(user.id)
|
||||
fileIO(main_path + "/data/mod/whitelist.json", "save", self.whitelist_list)
|
||||
await self.bot.say("User has been added to whitelist." + msg)
|
||||
else:
|
||||
await self.bot.say("User is already whitelisted.")
|
||||
|
||||
@whitelist.command(name="remove")
|
||||
async def _whitelist_remove(self, user : discord.Member):
|
||||
"""Removes user to bot's whitelist"""
|
||||
if user.id in self.whitelist_list:
|
||||
self.whitelist_list.remove(user.id)
|
||||
fileIO(main_path + "/data/mod/whitelist.json", "save", self.whitelist_list)
|
||||
await self.bot.say("User has been removed from whitelist.")
|
||||
else:
|
||||
await self.bot.say("User is not in whitelist.")
|
||||
|
||||
def check_folders():
|
||||
folders = ("data", "data/mod/")
|
||||
for folder in folders:
|
||||
if not os.path.exists(folder):
|
||||
print("Creating " + folder + " folder...")
|
||||
os.makedirs(folder)
|
||||
|
||||
def check_files():
|
||||
if not os.path.isfile(main_path + "/data/mod/blacklist.json"):
|
||||
print("Creating empty blacklist.json...")
|
||||
fileIO(main_path + "/data/mod/blacklist.json", "save", [])
|
||||
|
||||
if not os.path.isfile(main_path + "/data/mod/whitelist.json"):
|
||||
print("Creating empty whitelist.json...")
|
||||
fileIO(main_path + "/data/mod/whitelist.json", "save", [])
|
||||
|
||||
def setup(bot):
|
||||
check_folders()
|
||||
check_files()
|
||||
bot.add_cog(Mod(bot))
|
||||
214
cogs/trivia.py
Normal file
214
cogs/trivia.py
Normal file
@@ -0,0 +1,214 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from random import randint
|
||||
from random import choice as randchoice
|
||||
import datetime
|
||||
import time
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
settings = {"TRIVIA_MAX_SCORE" : 10, "TRIVIA_TIMEOUT" : 120, "TRIVIA_DELAY" : 15, "TRIVIA_BOT_PLAYS" : False}
|
||||
|
||||
class Trivia:
|
||||
"""General commands."""
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.trivia_sessions = []
|
||||
|
||||
@commands.command(pass_context=True)
|
||||
async def trivia(self, ctx, list_name : str=None):
|
||||
"""Start a trivia session with the specified list
|
||||
|
||||
Stop parameter will end the current session
|
||||
"""
|
||||
message = ctx.message
|
||||
if list_name == None:
|
||||
await self.triviaList(ctx.message.author)
|
||||
elif list_name.lower() == "stop":
|
||||
if await getTriviabyChannel(message.channel):
|
||||
s = await getTriviabyChannel(message.channel)
|
||||
await s.endGame()
|
||||
await self.bot.say("`Trivia stopped.`")
|
||||
else:
|
||||
await self.bot.say("`There's no trivia session ongoing in this channel.`")
|
||||
elif not await getTriviabyChannel(message.channel):
|
||||
t = TriviaSession(message)
|
||||
self.trivia_sessions.append(t)
|
||||
await t.loadQuestions(message.content)
|
||||
else:
|
||||
await self.bot.say("`A trivia session is already ongoing in this channel.`")
|
||||
|
||||
async def triviaList(self, author):
|
||||
msg = "**Available trivia lists:** \n\n```"
|
||||
lists = os.listdir("data/trivia/")
|
||||
if lists:
|
||||
clean_list = []
|
||||
for txt in lists:
|
||||
if txt.endswith(".txt") and " " not in txt:
|
||||
txt = txt.replace(".txt", "")
|
||||
clean_list.append(txt)
|
||||
if clean_list:
|
||||
for i, d in enumerate(clean_list):
|
||||
if i % 4 == 0 and i != 0:
|
||||
msg = msg + d + "\n"
|
||||
else:
|
||||
msg = msg + d + "\t"
|
||||
msg += "```"
|
||||
await self.bot.send_message(author, msg)
|
||||
else:
|
||||
await self.bot.say("There are no trivia lists available.")
|
||||
else:
|
||||
await self.bot.say("There are no trivia lists available.")
|
||||
|
||||
class TriviaSession():
|
||||
def __init__(self, message):
|
||||
self.gaveAnswer = ["I know this one! {}!", "Easy: {}.", "Oh really? It's {} of course."]
|
||||
self.currentQ = None # {"QUESTION" : "String", "ANSWERS" : []}
|
||||
self.questionList = ""
|
||||
self.channel = message.channel
|
||||
self.scoreList = {}
|
||||
self.status = None
|
||||
self.timer = None
|
||||
self.count = 0
|
||||
|
||||
async def loadQuestions(self, msg):
|
||||
msg = msg.split(" ")
|
||||
if len(msg) == 2:
|
||||
_, qlist = msg
|
||||
if qlist == "random":
|
||||
chosenList = randchoice(glob.glob("data/trivia/*.txt"))
|
||||
self.questionList = self.loadList(chosenList)
|
||||
self.status = "new question"
|
||||
self.timeout = time.perf_counter()
|
||||
if self.questionList: await self.newQuestion()
|
||||
else:
|
||||
if os.path.isfile("data/trivia/" + qlist + ".txt"):
|
||||
self.questionList = self.loadList("trivia/" + qlist + ".txt")
|
||||
self.status = "new question"
|
||||
self.timeout = time.perf_counter()
|
||||
if self.questionList: await self.newQuestion()
|
||||
else:
|
||||
await triviaManager.bot.say("`There is no list with that name.`")
|
||||
await self.stopTrivia()
|
||||
else:
|
||||
await triviaManager.bot.say("`trivia [list name]`")
|
||||
|
||||
async def stopTrivia(self):
|
||||
self.status = "stop"
|
||||
triviaManager.trivia_sessions.remove(self)
|
||||
|
||||
async def endGame(self):
|
||||
self.status = "stop"
|
||||
if self.scoreList:
|
||||
await self.sendTable()
|
||||
triviaManager.trivia_sessions.remove(self)
|
||||
|
||||
def loadList(self, qlist):
|
||||
with open(qlist, "r", encoding="utf-8") as f:
|
||||
qlist = f.readlines()
|
||||
parsedList = []
|
||||
for line in qlist:
|
||||
if "`" in line and len(line) > 4:
|
||||
line = line.replace("\n", "")
|
||||
line = line.split("`")
|
||||
question = line[0]
|
||||
answers = []
|
||||
for l in line[1:]:
|
||||
answers.append(l.lower())
|
||||
if len(line) >= 2:
|
||||
line = {"QUESTION" : question, "ANSWERS": answers} #string, list
|
||||
parsedList.append(line)
|
||||
if parsedList != []:
|
||||
return parsedList
|
||||
else:
|
||||
self.stopTrivia()
|
||||
return None
|
||||
|
||||
async def newQuestion(self):
|
||||
for score in self.scoreList.values():
|
||||
if score == settings["TRIVIA_MAX_SCORE"]:
|
||||
await self.endGame()
|
||||
return True
|
||||
if self.questionList == []:
|
||||
await self.endGame()
|
||||
return True
|
||||
self.currentQ = randchoice(self.questionList)
|
||||
self.questionList.remove(self.currentQ)
|
||||
self.status = "waiting for answer"
|
||||
self.count += 1
|
||||
self.timer = int(time.perf_counter())
|
||||
await triviaManager.bot.say("**Question number {}!**\n\n{}".format(str(self.count), self.currentQ["QUESTION"]))
|
||||
while self.status != "correct answer" and abs(self.timer - int(time.perf_counter())) <= settings["TRIVIA_DELAY"]:
|
||||
if abs(self.timeout - int(time.perf_counter())) >= settings["TRIVIA_TIMEOUT"]:
|
||||
await triviaManager.bot.say("Guys...? Well, I guess I'll stop then.")
|
||||
await self.stopTrivia()
|
||||
return True
|
||||
await asyncio.sleep(1) #Waiting for an answer or for the time limit
|
||||
if self.status == "correct answer":
|
||||
self.status = "new question"
|
||||
await asyncio.sleep(3)
|
||||
if not self.status == "stop":
|
||||
await self.newQuestion()
|
||||
elif self.status == "stop":
|
||||
return True
|
||||
else:
|
||||
msg = randchoice(self.gaveAnswer).format(self.currentQ["ANSWERS"][0])
|
||||
if settings["TRIVIA_BOT_PLAYS"]:
|
||||
msg += " **+1** for me!"
|
||||
self.addPoint(self.bot.user.name)
|
||||
self.currentQ["ANSWERS"] = []
|
||||
await triviaManager.bot.say(msg)
|
||||
await triviaManager.bot.send_typing(self.channel)
|
||||
await asyncio.sleep(3)
|
||||
if not self.status == "stop":
|
||||
await self.newQuestion()
|
||||
|
||||
async def sendTable(self):
|
||||
self.scoreList = sorted(self.scoreList.items(), reverse=True, key=lambda x: x[1]) # orders score from lower to higher
|
||||
t = "```Scores: \n\n"
|
||||
for score in self.scoreList:
|
||||
t += score[0] # name
|
||||
t += "\t"
|
||||
t += str(score[1]) # score
|
||||
t += "\n"
|
||||
t += "```"
|
||||
await triviaManager.bot.say(t)
|
||||
|
||||
async def checkAnswer(self, message):
|
||||
self.timeout = time.perf_counter()
|
||||
for answer in self.currentQ["ANSWERS"]:
|
||||
if answer in message.content.lower():
|
||||
self.currentQ["ANSWERS"] = []
|
||||
self.status = "correct answer"
|
||||
self.addPoint(message.author.name)
|
||||
await triviaManager.bot.send_message(message.channel, "You got it {}! **+1** to you!".format(message.author.name))
|
||||
await triviaManager.bot.send_typing(self.channel)
|
||||
return True
|
||||
|
||||
def addPoint(self, user):
|
||||
if user in self.scoreList:
|
||||
self.scoreList[user] += 1
|
||||
else:
|
||||
self.scoreList[user] = 1
|
||||
|
||||
def getTriviaQuestion(self):
|
||||
q = randchoice(list(trivia_questions.keys()))
|
||||
return q, trivia_questions[q] # question, answer
|
||||
|
||||
async def getTriviabyChannel(channel):
|
||||
for t in triviaManager.trivia_sessions:
|
||||
if t.channel == channel:
|
||||
return t
|
||||
return False
|
||||
|
||||
async def checkMessages(message):
|
||||
if message.author.id != triviaManager.bot.user.id:
|
||||
if await getTriviabyChannel(message.channel):
|
||||
trvsession = await getTriviabyChannel(message.channel)
|
||||
await trvsession.checkAnswer(message)
|
||||
|
||||
def setup(bot):
|
||||
global triviaManager
|
||||
bot.add_listener(checkMessages, "on_message")
|
||||
triviaManager = Trivia(bot)
|
||||
bot.add_cog(triviaManager)
|
||||
0
cogs/utils/__init__.py
Normal file
0
cogs/utils/__init__.py
Normal file
69
cogs/utils/checks.py
Normal file
69
cogs/utils/checks.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from discord.ext import commands
|
||||
import discord.utils
|
||||
import os.path
|
||||
import json
|
||||
import __main__
|
||||
|
||||
#
|
||||
# This is a modified version of checks.py, originally made by Rapptz
|
||||
#
|
||||
# https://github.com/Rapptz
|
||||
# https://github.com/Rapptz/RoboDanny/tree/async
|
||||
#
|
||||
|
||||
main_path = os.path.dirname(os.path.realpath(__main__.__file__))
|
||||
|
||||
try:
|
||||
with open(main_path + "/data/red/settings.json", "r") as f:
|
||||
settings = json.loads(f.read())
|
||||
except:
|
||||
settings = {"OWNER" : False, "ADMIN_ROLE" : False, "MOD_ROLE" : False}
|
||||
|
||||
def is_owner_check(ctx):
|
||||
return ctx.message.author.id == owner
|
||||
|
||||
def is_owner():
|
||||
return commands.check(is_owner_check)
|
||||
|
||||
# The permission system of the bot is based on a "just works" basis
|
||||
# You have permissions and the bot has permissions. If you meet the permissions
|
||||
# required to execute the command (and the bot does as well) then it goes through
|
||||
# and you can execute the command.
|
||||
# If these checks fail, then there are two fallbacks.
|
||||
# A role with the name of Bot Mod and a role with the name of Bot Admin.
|
||||
# Having these roles provides you access to certain commands without actually having
|
||||
# the permissions required for them.
|
||||
# Of course, the owner will always be able to execute commands.
|
||||
|
||||
def check_permissions(ctx, perms):
|
||||
if is_owner_check(ctx):
|
||||
return True
|
||||
|
||||
ch = ctx.message.channel
|
||||
author = ctx.message.author
|
||||
resolved = ch.permissions_for(author)
|
||||
return all(getattr(resolved, name, None) == value for name, value in perms.items())
|
||||
|
||||
def role_or_permissions(ctx, check, **perms):
|
||||
if check_permissions(ctx, perms):
|
||||
return True
|
||||
|
||||
ch = ctx.message.channel
|
||||
author = ctx.message.author
|
||||
if ch.is_private:
|
||||
return False # can't have roles in PMs
|
||||
|
||||
role = discord.utils.find(check, author.roles)
|
||||
return role is not None
|
||||
|
||||
def mod_or_permissions(**perms):
|
||||
def predicate(ctx):
|
||||
return role_or_permissions(ctx, lambda r: r.name in (settings["MOD_ROLE"], settings["ADMIN_ROLE"]), **perms)
|
||||
|
||||
return commands.check(predicate)
|
||||
|
||||
def admin_or_permissions(**perms):
|
||||
def predicate(ctx):
|
||||
return role_or_permissions(ctx, lambda r: r.name == settings["ADMIN_ROLE"], **perms)
|
||||
|
||||
return commands.check(predicate)
|
||||
28
cogs/utils/dataIO.py
Normal file
28
cogs/utils/dataIO.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import json
|
||||
|
||||
def fileIO(filename, IO, data=None):
|
||||
if IO == "save" and data != None:
|
||||
with open(filename, encoding='utf-8', mode="w") as f:
|
||||
f.write(json.dumps(data))
|
||||
elif IO == "load" and data == None:
|
||||
with open(filename, encoding='utf-8', mode="r") as f:
|
||||
return json.loads(f.read())
|
||||
elif IO == "check" and data == None:
|
||||
try:
|
||||
with open(filename, encoding='utf-8', mode="r") as f:
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
else:
|
||||
raise("Invalid fileIO call")
|
||||
|
||||
def get_value(filename, key):
|
||||
with open(filename, encoding='utf-8', mode="r") as f:
|
||||
data = json.loads(f.read())
|
||||
return data[key]
|
||||
|
||||
def set_value(filename, key, value):
|
||||
data = fileIO(filename, "load")
|
||||
data[key] = value
|
||||
fileIO(filename, "save", data)
|
||||
return True
|
||||
Reference in New Issue
Block a user