diff --git a/.gitignore b/.gitignore index bb9b7b7d1..7691685ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Trivia list repo injection +redbot/trivia/ + *.json *.exe *.dll diff --git a/redbot/cogs/trivia/__init__.py b/redbot/cogs/trivia/__init__.py new file mode 100644 index 000000000..3b65601d1 --- /dev/null +++ b/redbot/cogs/trivia/__init__.py @@ -0,0 +1,10 @@ +"""Package for Trivia cog.""" +from .trivia import * +from .session import * +from .log import * + + +def setup(bot): + """Load Trivia.""" + cog = Trivia() + bot.add_cog(cog) diff --git a/redbot/cogs/trivia/log.py b/redbot/cogs/trivia/log.py new file mode 100644 index 000000000..8b5be288c --- /dev/null +++ b/redbot/cogs/trivia/log.py @@ -0,0 +1,7 @@ +"""Log for Trivia cog.""" + +import logging + +__all__ = ["LOG"] + +LOG = logging.getLogger("red.trivia") diff --git a/redbot/cogs/trivia/session.py b/redbot/cogs/trivia/session.py new file mode 100644 index 000000000..13523096b --- /dev/null +++ b/redbot/cogs/trivia/session.py @@ -0,0 +1,307 @@ +"""Module to manage trivia sessions.""" +import asyncio +import time +import random +from collections import Counter +import discord +from redbot.core.bank import deposit_credits +from redbot.core.utils.chat_formatting import box +from .log import LOG + +__all__ = ["TriviaSession"] + +_REVEAL_MESSAGES = ("I know this one! {}!", "Easy: {}.", + "Oh really? It's {} of course.") +_FAIL_MESSAGES = ("To the next one I guess...", "Moving on...", + "I'm sure you'll know the answer of the next one.", + "\N{PENSIVE FACE} Next one.") + + +class TriviaSession(): + """Class to run a session of trivia with the user. + + To run the trivia session immediately, use `TriviaSession.start` instead of + instantiating directly. + + Attributes + ---------- + ctx : `commands.Context` + Context object from which this session will be run. + This object assumes the session was started in `ctx.channel` + by `ctx.author`. + question_list : `dict` + A list of tuples mapping questions (`str`) to answers (`list` of + `str`). + settings : `dict` + Settings for the trivia session, with values for the following: + - ``max_score`` (`int`) + - ``delay`` (`float`) + - ``timeout`` (`float`) + - ``reveal_answer`` (`bool`) + - ``bot_plays`` (`bool`) + - ``allow_override`` (`bool`) + - ``payout_multiplier`` (`float`) + scores : `collections.Counter` + A counter with the players as keys, and their scores as values. The + players are of type `discord.Member`. + count : `int` + The number of questions which have been asked. + + """ + + def __init__(self, + ctx, + question_list: dict, + settings: dict): + self.ctx = ctx + list_ = list(question_list.items()) + random.shuffle(list_) + self.question_list = list_ + self.settings = settings + self.scores = Counter() + self.count = 0 + self._last_response = time.time() + self._task = None + + @classmethod + def start(cls, ctx, question_list, settings): + """Create and start a trivia session. + + This allows the session to manage the running and cancellation of its + own tasks. + + Parameters + ---------- + ctx : `commands.Context` + Same as `TriviaSession.ctx` + question_list : `dict` + Same as `TriviaSession.question_list` + settings : `dict` + Same as `TriviaSession.settings` + + Returns + ------- + TriviaSession + The new trivia session being run. + + """ + session = cls(ctx, question_list, settings) + loop = ctx.bot.loop + session._task = loop.create_task(session.run()) + return session + + async def run(self): + """Run the trivia session. + + In order for the trivia session to be stopped correctly, this should + only be called internally by `TriviaSession.start`. + """ + max_score = self.settings["max_score"] + delay = self.settings["delay"] + timeout = self.settings["timeout"] + for question, answers in self._iter_questions(): + self.count += 1 + msg = "**Question number {}!**\n\n{}".format(self.count, question) + await self.ctx.send(msg) + continue_ = await self.wait_for_answer(answers, delay, timeout) + if continue_ is False: + break + if any(score >= max_score for score in self.scores.values()): + await self.end_game() + break + async with self.ctx.typing(): + await asyncio.sleep(3) + else: + await self.ctx.send("There are no more questions!") + await self.end_game() + + def _iter_questions(self): + """Iterate over questions and answers for this session. + + Yields + ------ + `tuple` + A tuple containing the question (`str`) and the answers (`tuple` of + `str`). + + """ + for question, answers in self.question_list: + answers = _parse_answers(answers) + yield question, answers + + async def wait_for_answer(self, + answers, + delay: float, + timeout: float): + """Wait for a correct answer, and then respond. + + Scores are also updated in this method. + + Returns False if waiting was cancelled; this is usually due to the + session being forcibly stopped. + + Parameters + ---------- + answers : `iterable` of `str` + A list of valid answers to the current question. + delay : float + How long users have to respond (in seconds). + timeout : float + How long before the session ends due to no responses (in seconds). + + Returns + ------- + bool + :code:`True` if the session wasn't interrupted. + + """ + try: + message = await self.ctx.bot.wait_for( + "message", check=self.check_answer(answers), timeout=delay) + except asyncio.TimeoutError: + if time.time() - self._last_response >= timeout: + await self.ctx.send("Guys...? Well, I guess I'll stop then.") + self.stop() + return False + if self.settings["reveal_answer"]: + reply = random.choice(_REVEAL_MESSAGES).format(answers[0]) + else: + reply = random.choice(_FAIL_MESSAGES) + if self.settings["bot_plays"]: + reply += " **+1** for me!" + self.scores[self.ctx.guild.me] += 1 + await self.ctx.send(reply) + else: + self.scores[message.author] += 1 + reply = "You got it {}! **+1** to you!".format( + message.author.display_name) + await self.ctx.send(reply) + return True + + def check_answer(self, answers): + """Get a predicate to check for correct answers. + + The returned predicate takes a message as its only parameter, + and returns ``True`` if the message contains any of the + given answers. + + Parameters + ---------- + answers : `iterable` of `str` + The answers which the predicate must check for. + + Returns + ------- + function + The message predicate. + + """ + answers = tuple(s.lower() for s in answers) + def _pred(message: discord.Message): + early_exit = (message.channel != self.ctx.channel + or message.author == self.ctx.guild.me) + if early_exit: + return False + + self._last_response = time.time() + guess = message.content.lower() + for answer in answers: + if " " in answer and answer in guess: + # Exact matching, issue #331 + return True + elif any(word == answer for word in guess.split(" ")): + return True + return False + + return _pred + + async def end_game(self): + """End the trivia session and display scrores.""" + if self.scores: + await self.send_table() + multiplier = self.settings["payout_multiplier"] + if multiplier > 0: + await self.pay_winner(multiplier) + self.stop() + + async def send_table(self): + """Send a table of scores to the session's channel.""" + table = "+ Results: \n\n" + for user, score in self.scores.most_common(): + table += "+ {}\t{}\n".format(user, score) + await self.ctx.send(box(table, lang="diff")) + + def stop(self): + """Stop the trivia session, without showing scores.""" + self.ctx.bot.dispatch("trivia_end", self) + + def force_stop(self): + """Cancel whichever tasks this session is running.""" + self._task.cancel() + channel = self.ctx.channel + LOG.debug("Force stopping trivia session; #%s in %s", channel, + channel.guild.id) + + async def pay_winner(self, multiplier: float): + """Pay the winner of this trivia session. + + The winner is only payed if there are at least 3 human contestants. + + Parameters + ---------- + multiplier : float + The coefficient of the winner's score, used to determine the amount + paid. + + """ + (winner, score) = next((tup for tup in self.scores.most_common(1)), + (None, None)) + me_ = self.ctx.guild.me + if winner is not None and winner != me_ and score > 0: + contestants = list(self.scores.keys()) + if me_ in contestants: + contestants.remove(me_) + if len(contestants) >= 3: + amount = int(multiplier * score) + if amount > 0: + LOG.debug("Paying trivia winner: %d credits --> %s", + amount, str(winner)) + await deposit_credits(winner, int(multiplier * score)) + await self.ctx.send( + "Congratulations, {0}, you have received {1} credits" + " for coming first.".format(winner.display_name, + amount)) + + +def _parse_answers(answers): + """Parse the raw answers to readable strings. + + The reason this exists is because of YAML's ambiguous syntax. For example, + if the answer to a question in YAML is ``yes``, YAML will load it as the + boolean value ``True``, which is not necessarily the desired answer. This + function aims to undo that for bools, and possibly for numbers in the + future too. + + Parameters + ---------- + answers : `iterable` of `str` + The raw answers loaded from YAML. + + Returns + ------- + `tuple` of `str` + The answers in readable/ guessable strings. + + """ + ret = [] + for answer in answers: + if isinstance(answer, bool): + if answer is True: + ret.append("True", "Yes") + else: + ret.append("False", "No") + else: + ret.append(str(answer)) + # Uniquify list + seen = set() + return tuple(x for x in ret if not (x in seen or seen.add(x))) diff --git a/redbot/cogs/trivia/trivia.py b/redbot/cogs/trivia/trivia.py new file mode 100644 index 000000000..68b9aea78 --- /dev/null +++ b/redbot/cogs/trivia/trivia.py @@ -0,0 +1,486 @@ +"""Module for Trivia cog.""" +from collections import Counter +import yaml +import discord +from discord.ext import commands +import redbot.trivia +from redbot.core import Config, checks +from redbot.core.data_manager import cog_data_path +from redbot.core.utils.chat_formatting import box, pagify +from redbot.cogs.bank import check_global_setting_admin +from .log import LOG +from .session import TriviaSession + +__all__ = ["Trivia", "UNIQUE_ID"] + +UNIQUE_ID = 0xb3c0e453 + + +class InvalidListError(Exception): + """A Trivia list file is in invalid format.""" + pass + + +class Trivia: + """Play trivia with friends!""" + + def __init__(self): + self.trivia_sessions = [] + self.conf = Config.get_conf( + self, identifier=UNIQUE_ID, force_registration=True) + + self.conf.register_guild( + max_score=10, + timeout=120.0, + delay=15.0, + bot_plays=False, + reveal_answer=True, + payout_multiplier=0.0, + allow_override=True) + + self.conf.register_member( + wins=0, games=0, total_score=0) + + @commands.group() + @commands.guild_only() + @checks.mod_or_permissions(administrator=True) + async def triviaset(self, ctx: commands.Context): + """Manage trivia settings.""" + if ctx.invoked_subcommand is None: + await ctx.send_help() + settings = self.conf.guild(ctx.guild) + settings_dict = await settings.all() + msg = box( + "**Current settings**\n" + "Bot gains points: {bot_plays}\n" + "Answer time limit: {delay} seconds\n" + "Lack of response timeout: {timeout} seconds\n" + "Points to win: {max_score}\n" + "Reveal answer on timeout: {reveal_answer}\n" + "Payout multiplier: {payout_multiplier}\n" + "Allow lists to override settings: {allow_override}" + "".format(**settings_dict), + lang="py") + await ctx.send(msg) + + @triviaset.command(name="maxscore") + async def triviaset_max_score(self, ctx: commands.Context, score: int): + """Set the total points required to win.""" + if score < 0: + await ctx.send("Score must be greater than 0.") + return + settings = self.conf.guild(ctx.guild) + await settings.max_score.set(score) + await ctx.send("Done. Points required to win set to {}.".format(score)) + + @triviaset.command(name="timelimit") + async def triviaset_timelimit(self, ctx: commands.Context, seconds: float): + """Set the maximum seconds permitted to answer a question.""" + if seconds < 4.0: + await ctx.send("Must be at least 4 seconds.") + return + settings = self.conf.guild(ctx.guild) + await settings.delay.set(seconds) + await ctx.send("Done. Maximum seconds to answer set to {}." + "".format(seconds)) + + @triviaset.command(name="stopafter") + async def triviaset_stopafter(self, ctx: commands.Context, seconds: float): + """Set how long until trivia stops due to no response.""" + settings = self.conf.guild(ctx.guild) + if seconds < await settings.delay(): + await ctx.send("Must be larger than the answer time limit.") + return + await settings.timeout.set(seconds) + await ctx.send("Done. Trivia sessions will now time out after {}" + " seconds of no responses.".format(seconds)) + + @triviaset.command(name="override") + async def triviaset_allowoverride(self, + ctx: commands.Context, + enabled: bool): + """Allow/disallow trivia lists to override settings.""" + settings = self.conf.guild(ctx.guild) + await settings.allow_override.set(enabled) + enabled = "now" if enabled else "no longer" + await ctx.send("Done. Trivia lists can {} override the trivia settings" + " for this server.".format(enabled)) + + @triviaset.command(name="botplays") + async def trivaset_bot_plays(self, + ctx: commands.Context, + true_or_false: bool): + """Set whether or not the bot gains points. + + If enabled, the bot will gain a point if no one guesses correctly. + """ + settings = self.conf.guild(ctx.guild) + await settings.bot_plays.set(true_or_false) + await ctx.send("Done. " + ( + "I'll gain a point if users don't answer in time." if true_or_false + else "Alright, I won't embarass you at trivia anymore.")) + + @triviaset.command(name="revealanswer") + async def trivaset_reveal_answer(self, + ctx: commands.Context, + true_or_false: bool): + """Set whether or not the answer is revealed. + + If enabled, the bot will reveal the answer if no one guesses correctly + in time. + """ + settings = self.conf.guild(ctx.guild) + await settings.reveal_answer.set(true_or_false) + await ctx.send("Done. " + ( + "I'll reveal the answer if no one knows it." if true_or_false else + "I won't reveal the answer to the questions anymore.")) + + @triviaset.command(name="payout") + @check_global_setting_admin() + async def triviaset_payout_multiplier(self, + ctx: commands.Context, + multiplier: float): + """Set the payout multiplier. + + This can be any positive decimal number. If a user wins trivia when at + least 3 members are playing, they will receive credits. Set to 0 to + disable. + + The number of credits is determined by multiplying their total score by + this multiplier. + """ + settings = self.conf.guild(ctx.guild) + if multiplier < 0: + await ctx.send("Multiplier must be at least 0.") + return + await settings.payout_multiplier.set(multiplier) + if not multiplier: + await ctx.send("Done. I will no longer reward the winner with a" + " payout.") + return + await ctx.send("Done. Payout multiplier set to {}.".format(multiplier)) + + @commands.group(invoke_without_command=True) + @commands.guild_only() + async def trivia(self, ctx: commands.Context, *categories: str): + """Start trivia session on the specified category. + + You may list multiple categories, in which case the trivia will involve + questions from all of them. + """ + if not categories: + await ctx.send_help() + return + categories = [c.lower() for c in categories] + session = self._get_trivia_session(ctx.channel) + if session is not None: + await ctx.send( + "There is already an ongoing trivia session in this channel.") + return + trivia_dict = {} + for category in reversed(categories): + # We reverse the categories so that the first list's config takes + # priority over the others. + try: + dict_ = self.get_trivia_list(category) + except FileNotFoundError: + await ctx.send("Invalid category `{0}`. See `{1}trivia list`" + " for a list of trivia categories." + "".format(category, ctx.prefix)) + except InvalidListError: + await ctx.send("There was an error parsing the trivia list for" + " the `{}` category. It may be formatted" + " incorrectly.".format(category)) + else: + trivia_dict.update(dict_) + continue + return + if not trivia_dict: + await ctx.send("The trivia list was parsed successfully, however" + " it appears to be empty!") + return + settings = await self.conf.guild(ctx.guild).all() + config = trivia_dict.pop("CONFIG", None) + if config and settings["allow_override"]: + settings.update(config) + session = TriviaSession.start(ctx, trivia_dict, settings) + self.trivia_sessions.append(session) + LOG.debug("New trivia session; #%s in %d", ctx.channel, ctx.guild.id) + + @trivia.command(name="stop") + async def trivia_stop(self, ctx: commands.Context): + """Stop an ongoing trivia session.""" + session = self._get_trivia_session(ctx.channel) + if session is None: + await ctx.send( + "There is no ongoing trivia session in this channel.") + return + author = ctx.author + auth_checks = (await ctx.bot.is_owner(author), await + ctx.bot.is_mod(author), await ctx.bot.is_admin(author), + author == ctx.guild.owner, author == session.ctx.author) + if any(auth_checks): + await session.end_game() + session.force_stop() + await ctx.send("Trivia stopped.") + else: + await ctx.send("You are not allowed to do that.") + + @trivia.command(name="list") + async def trivia_list(self, ctx: commands.Context): + """List available trivia categories.""" + lists = set(p.stem for p in self._all_lists()) + + msg = box("**Available trivia lists**\n\n{}" + "".format(", ".join(sorted(lists)))) + if len(msg) > 1000: + await ctx.author.send(msg) + return + await ctx.send(msg) + + @trivia.group(name="leaderboard", aliases=["lboard"]) + async def trivia_leaderboard(self, ctx: commands.Context): + """Leaderboard for trivia. + + Defaults to the top 10 of this server, sorted by total wins. Use + subcommands for a more customised leaderboard. + """ + if ctx.invoked_subcommand == self.trivia_leaderboard: + cmd = self.trivia_leaderboard_server + if isinstance(ctx.channel, discord.abc.PrivateChannel): + cmd = self.trivia_leaderboard_global + await ctx.invoke(cmd, "wins", 10) + + @trivia_leaderboard.command(name="server") + @commands.guild_only() + async def trivia_leaderboard_server(self, + ctx: commands.Context, + sort_by: str="wins", + top: int=10): + """Leaderboard for this server. + + can be any of the following fields: + - wins : total wins + - avg : average score + - total : total correct answers + + is the number of ranks to show on the leaderboard. + """ + key = self._get_sort_key(sort_by) + if key is None: + await ctx.send("Unknown field `{}`, see `{}help trivia " + "leaderboard server` for valid fields to sort by." + "".format(sort_by, ctx.prefix)) + return + guild = ctx.guild + data = await self.conf.all_members(guild) + data = {guild.get_member(u): d for u, d in data.items()} + data.pop(None, None) # remove any members which aren't in the guild + await self.send_leaderboard(ctx, data, key, top) + + @trivia_leaderboard.command(name="global") + async def trivia_leaderboard_global(self, + ctx: commands.Context, + sort_by: str="wins", + top: int=10): + """Global trivia leaderboard. + + can be any of the following fields: + - wins : total wins + - avg : average score + - total : total correct answers from all sessions + - games : total games played + + is the number of ranks to show on the leaderboard. + """ + key = self._get_sort_key(sort_by) + if key is None: + await ctx.send("Unknown field `{}`, see `{}help trivia " + "leaderboard global` for valid fields to sort by." + "".format(sort_by, ctx.prefix)) + return + data = await self.conf.all_members() + collated_data = {} + for guild_id, guild_data in data.items(): + guild = ctx.bot.get_guild(guild_id) + if guild is None: + continue + for member_id, member_data in guild_data.items(): + member = guild.get_member(member_id) + if member is None: + continue + collated_member_data = collated_data.get(member, Counter()) + for v_key, value in member_data.items(): + collated_member_data[v_key] += value + collated_data[member] = collated_member_data + await self.send_leaderboard(ctx, collated_data, key, top) + + def _get_sort_key(self, key: str): + key = key.lower() + if key in ("wins", "average_score", "total_score", "games"): + return key + elif key in ("avg", "average"): + return "average_score" + elif key in ("total", "score", "answers", "correct"): + return "total_score" + + async def send_leaderboard(self, + ctx: commands.Context, + data: dict, + key: str, + top: int): + """Send the leaderboard from the given data. + + Parameters + ---------- + ctx : commands.Context + The context to send the leaderboard to. + data : dict + The data for the leaderboard. This must map `discord.Member` -> + `dict`. + key : str + The field to sort the data by. Can be ``wins``, ``total_score``, + ``games`` or ``average_score``. + top : int + The number of members to display on the leaderboard. + + Returns + ------- + `list` of `discord.Message` + The sent leaderboard messages. + + """ + if not data: + await ctx.send("There are no scores on record!") + return + leaderboard = self._get_leaderboard(data, key, top) + ret = [] + for page in pagify(leaderboard): + ret.append(await ctx.send(box(page, lang="py"))) + return ret + + def _get_leaderboard(self, data: dict, key: str, top: int): + # Mix in average score + for member, stats in data.items(): + if stats["games"] != 0: + stats["average_score"] = stats["total_score"] / stats["games"] + else: + stats["average_score"] = 0.0 + # Sort by reverse order of priority + priority = ["average_score", "total_score", "wins", "games"] + try: + priority.remove(key) + except ValueError: + raise ValueError("{} is not a valid key".format(key)) + # Put key last in reverse priority + priority.append(key) + items = data.items() + for key in priority: + items = sorted(items, key=lambda t: t[1][key], reverse=True) + max_name_len = max(map(lambda m: len(str(m)), data.keys())) + # Headers + headers = ("Rank", "Member{}".format(" " * (max_name_len - 6)), "Wins", + "Games Played", "Total Score", "Average Score") + lines = [" | ".join(headers)] + # Header underlines + lines.append(" | ".join(("-" * len(h) for h in headers))) + for rank, tup in enumerate(items, 1): + member, m_data = tup + # Align fields to header width + fields = tuple(map(str, (rank, + member, + m_data["wins"], + m_data["games"], + m_data["total_score"], + round(m_data["average_score"], 2)))) + padding = [ + " " * (len(h) - len(f)) for h, f in zip(headers, fields) + ] + fields = tuple(f + padding[i] for i, f in enumerate(fields)) + lines.append(" | ".join(fields).format(member=member, **m_data)) + if rank == top: + break + return "\n".join(lines) + + async def on_trivia_end(self, session: TriviaSession): + """Event for a trivia session ending. + + This method removes the session from this cog's sessions, and + cancels any tasks which it was running. + + Parameters + ---------- + session : TriviaSession + The session which has just ended. + + """ + channel = session.ctx.channel + LOG.debug("Ending trivia session; #%s in %s", channel, + channel.guild.id) + if session in self.trivia_sessions: + self.trivia_sessions.remove(session) + if session.scores: + await self.update_leaderboard(session) + + async def update_leaderboard(self, session): + """Update the leaderboard with the given scores. + + Parameters + ---------- + session : TriviaSession + The trivia session to update scores from. + + """ + max_score = session.settings["max_score"] + for member, score in session.scores.items(): + if member.id == session.ctx.bot.user.id: + continue + stats = await self.conf.member(member).all() + if score == max_score: + stats["wins"] += 1 + stats["total_score"] += score + stats["games"] += 1 + await self.conf.member(member).set(stats) + + def get_trivia_list(self, category: str) -> dict: + """Get the trivia list corresponding to the given category. + + Parameters + ---------- + category : str + The desired category. Case sensitive. + + Returns + ------- + `dict` + A dict mapping questions (`str`) to answers (`list` of `str`). + + """ + try: + path = next(p for p in self._all_lists() if p.stem == category) + except StopIteration: + raise FileNotFoundError("Could not find the `{}` category" + "".format(category)) + + with path.open() as file: + try: + dict_ = yaml.load(file) + except yaml.error.YAMLError as exc: + raise InvalidListError("YAML parsing failed") from exc + else: + return dict_ + + def _get_trivia_session(self, + channel: discord.TextChannel) -> TriviaSession: + return next((session for session in self.trivia_sessions + if session.ctx.channel == channel), None) + + def _all_lists(self): + personal_lists = tuple(p.resolve() + for p in cog_data_path(self).glob("*.yaml")) + + return personal_lists + tuple(redbot.trivia.lists()) + + def __unload(self): + for session in self.trivia_sessions: + session.force_stop() diff --git a/requirements.txt b/requirements.txt index e19a53bb5..065bd25cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ appdirs youtube_dl raven colorama -aiohttp-json-rpc \ No newline at end of file +aiohttp-json-rpc +pyyaml +Red-Trivia