mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
[Trivia] [WIP] V3 Rewrite (#915)
* Rewritten basic settings * Ported to V3 * Will saves the dicts * TriviaSession uses Config * wait_for is the future * Out with recursion and redundancy * Remove typing before first question * Added payout multiplier setting * Payout setting uses appropriate check * Implement actual paying of winner * Removed trivia lists from repository * Added payout confirmation message Also rearranged the order of methods in TriviaSession to better represent the sequential order of running trivia * Contestant count fixed Also fixed error when lists folder is missing * Support for multiple categories per session * Extra error handling * Update from rebase * Use of task cancel * Nicer docstrings * Oops * Better imports * YAML lists * Using the trivia list repo (YAY) * Cast all answers to string since YAML is dumb * session.stop() is not an async function * Remove redundant attribute from session * Sessions manage their own tasks * Add `redbot/trivia` to .gitignore * Add PyYAML to requirements.txt * Parse answers properly before checking * Add Red-Trivia to reqs * Better task management and some optimisations Sessions now just use a flat dict for settings instead of config. This means the settings can't be changed after the session is started, but it removes the need for config to be read for every question. * Allow lists to override settings * Fix config logic * Iteration fixes, config override fixes, task management fixes * Don't hide dict abuse * Stats tracking * Leaderboard implemented * [leaderboard] ignore self and reverse list * [leaderboard] Handle empty score counter * Only count win when max score is reached
This commit is contained in:
parent
0959aebd10
commit
897adbf5ac
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,3 +1,6 @@
|
||||
# Trivia list repo injection
|
||||
redbot/trivia/
|
||||
|
||||
*.json
|
||||
*.exe
|
||||
*.dll
|
||||
|
||||
10
redbot/cogs/trivia/__init__.py
Normal file
10
redbot/cogs/trivia/__init__.py
Normal file
@ -0,0 +1,10 @@
|
||||
"""Package for Trivia cog."""
|
||||
from .trivia import *
|
||||
from .session import *
|
||||
from .log import *
|
||||
|
||||
|
||||
def setup(bot):
|
||||
"""Load Trivia."""
|
||||
cog = Trivia()
|
||||
bot.add_cog(cog)
|
||||
7
redbot/cogs/trivia/log.py
Normal file
7
redbot/cogs/trivia/log.py
Normal file
@ -0,0 +1,7 @@
|
||||
"""Log for Trivia cog."""
|
||||
|
||||
import logging
|
||||
|
||||
__all__ = ["LOG"]
|
||||
|
||||
LOG = logging.getLogger("red.trivia")
|
||||
307
redbot/cogs/trivia/session.py
Normal file
307
redbot/cogs/trivia/session.py
Normal file
@ -0,0 +1,307 @@
|
||||
"""Module to manage trivia sessions."""
|
||||
import asyncio
|
||||
import time
|
||||
import random
|
||||
from collections import Counter
|
||||
import discord
|
||||
from redbot.core.bank import deposit_credits
|
||||
from redbot.core.utils.chat_formatting import box
|
||||
from .log import LOG
|
||||
|
||||
__all__ = ["TriviaSession"]
|
||||
|
||||
_REVEAL_MESSAGES = ("I know this one! {}!", "Easy: {}.",
|
||||
"Oh really? It's {} of course.")
|
||||
_FAIL_MESSAGES = ("To the next one I guess...", "Moving on...",
|
||||
"I'm sure you'll know the answer of the next one.",
|
||||
"\N{PENSIVE FACE} Next one.")
|
||||
|
||||
|
||||
class TriviaSession():
|
||||
"""Class to run a session of trivia with the user.
|
||||
|
||||
To run the trivia session immediately, use `TriviaSession.start` instead of
|
||||
instantiating directly.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
ctx : `commands.Context`
|
||||
Context object from which this session will be run.
|
||||
This object assumes the session was started in `ctx.channel`
|
||||
by `ctx.author`.
|
||||
question_list : `dict`
|
||||
A list of tuples mapping questions (`str`) to answers (`list` of
|
||||
`str`).
|
||||
settings : `dict`
|
||||
Settings for the trivia session, with values for the following:
|
||||
- ``max_score`` (`int`)
|
||||
- ``delay`` (`float`)
|
||||
- ``timeout`` (`float`)
|
||||
- ``reveal_answer`` (`bool`)
|
||||
- ``bot_plays`` (`bool`)
|
||||
- ``allow_override`` (`bool`)
|
||||
- ``payout_multiplier`` (`float`)
|
||||
scores : `collections.Counter`
|
||||
A counter with the players as keys, and their scores as values. The
|
||||
players are of type `discord.Member`.
|
||||
count : `int`
|
||||
The number of questions which have been asked.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
ctx,
|
||||
question_list: dict,
|
||||
settings: dict):
|
||||
self.ctx = ctx
|
||||
list_ = list(question_list.items())
|
||||
random.shuffle(list_)
|
||||
self.question_list = list_
|
||||
self.settings = settings
|
||||
self.scores = Counter()
|
||||
self.count = 0
|
||||
self._last_response = time.time()
|
||||
self._task = None
|
||||
|
||||
@classmethod
|
||||
def start(cls, ctx, question_list, settings):
|
||||
"""Create and start a trivia session.
|
||||
|
||||
This allows the session to manage the running and cancellation of its
|
||||
own tasks.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : `commands.Context`
|
||||
Same as `TriviaSession.ctx`
|
||||
question_list : `dict`
|
||||
Same as `TriviaSession.question_list`
|
||||
settings : `dict`
|
||||
Same as `TriviaSession.settings`
|
||||
|
||||
Returns
|
||||
-------
|
||||
TriviaSession
|
||||
The new trivia session being run.
|
||||
|
||||
"""
|
||||
session = cls(ctx, question_list, settings)
|
||||
loop = ctx.bot.loop
|
||||
session._task = loop.create_task(session.run())
|
||||
return session
|
||||
|
||||
async def run(self):
|
||||
"""Run the trivia session.
|
||||
|
||||
In order for the trivia session to be stopped correctly, this should
|
||||
only be called internally by `TriviaSession.start`.
|
||||
"""
|
||||
max_score = self.settings["max_score"]
|
||||
delay = self.settings["delay"]
|
||||
timeout = self.settings["timeout"]
|
||||
for question, answers in self._iter_questions():
|
||||
self.count += 1
|
||||
msg = "**Question number {}!**\n\n{}".format(self.count, question)
|
||||
await self.ctx.send(msg)
|
||||
continue_ = await self.wait_for_answer(answers, delay, timeout)
|
||||
if continue_ is False:
|
||||
break
|
||||
if any(score >= max_score for score in self.scores.values()):
|
||||
await self.end_game()
|
||||
break
|
||||
async with self.ctx.typing():
|
||||
await asyncio.sleep(3)
|
||||
else:
|
||||
await self.ctx.send("There are no more questions!")
|
||||
await self.end_game()
|
||||
|
||||
def _iter_questions(self):
|
||||
"""Iterate over questions and answers for this session.
|
||||
|
||||
Yields
|
||||
------
|
||||
`tuple`
|
||||
A tuple containing the question (`str`) and the answers (`tuple` of
|
||||
`str`).
|
||||
|
||||
"""
|
||||
for question, answers in self.question_list:
|
||||
answers = _parse_answers(answers)
|
||||
yield question, answers
|
||||
|
||||
async def wait_for_answer(self,
|
||||
answers,
|
||||
delay: float,
|
||||
timeout: float):
|
||||
"""Wait for a correct answer, and then respond.
|
||||
|
||||
Scores are also updated in this method.
|
||||
|
||||
Returns False if waiting was cancelled; this is usually due to the
|
||||
session being forcibly stopped.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
answers : `iterable` of `str`
|
||||
A list of valid answers to the current question.
|
||||
delay : float
|
||||
How long users have to respond (in seconds).
|
||||
timeout : float
|
||||
How long before the session ends due to no responses (in seconds).
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
:code:`True` if the session wasn't interrupted.
|
||||
|
||||
"""
|
||||
try:
|
||||
message = await self.ctx.bot.wait_for(
|
||||
"message", check=self.check_answer(answers), timeout=delay)
|
||||
except asyncio.TimeoutError:
|
||||
if time.time() - self._last_response >= timeout:
|
||||
await self.ctx.send("Guys...? Well, I guess I'll stop then.")
|
||||
self.stop()
|
||||
return False
|
||||
if self.settings["reveal_answer"]:
|
||||
reply = random.choice(_REVEAL_MESSAGES).format(answers[0])
|
||||
else:
|
||||
reply = random.choice(_FAIL_MESSAGES)
|
||||
if self.settings["bot_plays"]:
|
||||
reply += " **+1** for me!"
|
||||
self.scores[self.ctx.guild.me] += 1
|
||||
await self.ctx.send(reply)
|
||||
else:
|
||||
self.scores[message.author] += 1
|
||||
reply = "You got it {}! **+1** to you!".format(
|
||||
message.author.display_name)
|
||||
await self.ctx.send(reply)
|
||||
return True
|
||||
|
||||
def check_answer(self, answers):
|
||||
"""Get a predicate to check for correct answers.
|
||||
|
||||
The returned predicate takes a message as its only parameter,
|
||||
and returns ``True`` if the message contains any of the
|
||||
given answers.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
answers : `iterable` of `str`
|
||||
The answers which the predicate must check for.
|
||||
|
||||
Returns
|
||||
-------
|
||||
function
|
||||
The message predicate.
|
||||
|
||||
"""
|
||||
answers = tuple(s.lower() for s in answers)
|
||||
def _pred(message: discord.Message):
|
||||
early_exit = (message.channel != self.ctx.channel
|
||||
or message.author == self.ctx.guild.me)
|
||||
if early_exit:
|
||||
return False
|
||||
|
||||
self._last_response = time.time()
|
||||
guess = message.content.lower()
|
||||
for answer in answers:
|
||||
if " " in answer and answer in guess:
|
||||
# Exact matching, issue #331
|
||||
return True
|
||||
elif any(word == answer for word in guess.split(" ")):
|
||||
return True
|
||||
return False
|
||||
|
||||
return _pred
|
||||
|
||||
async def end_game(self):
|
||||
"""End the trivia session and display scrores."""
|
||||
if self.scores:
|
||||
await self.send_table()
|
||||
multiplier = self.settings["payout_multiplier"]
|
||||
if multiplier > 0:
|
||||
await self.pay_winner(multiplier)
|
||||
self.stop()
|
||||
|
||||
async def send_table(self):
|
||||
"""Send a table of scores to the session's channel."""
|
||||
table = "+ Results: \n\n"
|
||||
for user, score in self.scores.most_common():
|
||||
table += "+ {}\t{}\n".format(user, score)
|
||||
await self.ctx.send(box(table, lang="diff"))
|
||||
|
||||
def stop(self):
|
||||
"""Stop the trivia session, without showing scores."""
|
||||
self.ctx.bot.dispatch("trivia_end", self)
|
||||
|
||||
def force_stop(self):
|
||||
"""Cancel whichever tasks this session is running."""
|
||||
self._task.cancel()
|
||||
channel = self.ctx.channel
|
||||
LOG.debug("Force stopping trivia session; #%s in %s", channel,
|
||||
channel.guild.id)
|
||||
|
||||
async def pay_winner(self, multiplier: float):
|
||||
"""Pay the winner of this trivia session.
|
||||
|
||||
The winner is only payed if there are at least 3 human contestants.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
multiplier : float
|
||||
The coefficient of the winner's score, used to determine the amount
|
||||
paid.
|
||||
|
||||
"""
|
||||
(winner, score) = next((tup for tup in self.scores.most_common(1)),
|
||||
(None, None))
|
||||
me_ = self.ctx.guild.me
|
||||
if winner is not None and winner != me_ and score > 0:
|
||||
contestants = list(self.scores.keys())
|
||||
if me_ in contestants:
|
||||
contestants.remove(me_)
|
||||
if len(contestants) >= 3:
|
||||
amount = int(multiplier * score)
|
||||
if amount > 0:
|
||||
LOG.debug("Paying trivia winner: %d credits --> %s",
|
||||
amount, str(winner))
|
||||
await deposit_credits(winner, int(multiplier * score))
|
||||
await self.ctx.send(
|
||||
"Congratulations, {0}, you have received {1} credits"
|
||||
" for coming first.".format(winner.display_name,
|
||||
amount))
|
||||
|
||||
|
||||
def _parse_answers(answers):
|
||||
"""Parse the raw answers to readable strings.
|
||||
|
||||
The reason this exists is because of YAML's ambiguous syntax. For example,
|
||||
if the answer to a question in YAML is ``yes``, YAML will load it as the
|
||||
boolean value ``True``, which is not necessarily the desired answer. This
|
||||
function aims to undo that for bools, and possibly for numbers in the
|
||||
future too.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
answers : `iterable` of `str`
|
||||
The raw answers loaded from YAML.
|
||||
|
||||
Returns
|
||||
-------
|
||||
`tuple` of `str`
|
||||
The answers in readable/ guessable strings.
|
||||
|
||||
"""
|
||||
ret = []
|
||||
for answer in answers:
|
||||
if isinstance(answer, bool):
|
||||
if answer is True:
|
||||
ret.append("True", "Yes")
|
||||
else:
|
||||
ret.append("False", "No")
|
||||
else:
|
||||
ret.append(str(answer))
|
||||
# Uniquify list
|
||||
seen = set()
|
||||
return tuple(x for x in ret if not (x in seen or seen.add(x)))
|
||||
486
redbot/cogs/trivia/trivia.py
Normal file
486
redbot/cogs/trivia/trivia.py
Normal file
@ -0,0 +1,486 @@
|
||||
"""Module for Trivia cog."""
|
||||
from collections import Counter
|
||||
import yaml
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
import redbot.trivia
|
||||
from redbot.core import Config, checks
|
||||
from redbot.core.data_manager import cog_data_path
|
||||
from redbot.core.utils.chat_formatting import box, pagify
|
||||
from redbot.cogs.bank import check_global_setting_admin
|
||||
from .log import LOG
|
||||
from .session import TriviaSession
|
||||
|
||||
__all__ = ["Trivia", "UNIQUE_ID"]
|
||||
|
||||
UNIQUE_ID = 0xb3c0e453
|
||||
|
||||
|
||||
class InvalidListError(Exception):
|
||||
"""A Trivia list file is in invalid format."""
|
||||
pass
|
||||
|
||||
|
||||
class Trivia:
|
||||
"""Play trivia with friends!"""
|
||||
|
||||
def __init__(self):
|
||||
self.trivia_sessions = []
|
||||
self.conf = Config.get_conf(
|
||||
self, identifier=UNIQUE_ID, force_registration=True)
|
||||
|
||||
self.conf.register_guild(
|
||||
max_score=10,
|
||||
timeout=120.0,
|
||||
delay=15.0,
|
||||
bot_plays=False,
|
||||
reveal_answer=True,
|
||||
payout_multiplier=0.0,
|
||||
allow_override=True)
|
||||
|
||||
self.conf.register_member(
|
||||
wins=0, games=0, total_score=0)
|
||||
|
||||
@commands.group()
|
||||
@commands.guild_only()
|
||||
@checks.mod_or_permissions(administrator=True)
|
||||
async def triviaset(self, ctx: commands.Context):
|
||||
"""Manage trivia settings."""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await ctx.send_help()
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
settings_dict = await settings.all()
|
||||
msg = box(
|
||||
"**Current settings**\n"
|
||||
"Bot gains points: {bot_plays}\n"
|
||||
"Answer time limit: {delay} seconds\n"
|
||||
"Lack of response timeout: {timeout} seconds\n"
|
||||
"Points to win: {max_score}\n"
|
||||
"Reveal answer on timeout: {reveal_answer}\n"
|
||||
"Payout multiplier: {payout_multiplier}\n"
|
||||
"Allow lists to override settings: {allow_override}"
|
||||
"".format(**settings_dict),
|
||||
lang="py")
|
||||
await ctx.send(msg)
|
||||
|
||||
@triviaset.command(name="maxscore")
|
||||
async def triviaset_max_score(self, ctx: commands.Context, score: int):
|
||||
"""Set the total points required to win."""
|
||||
if score < 0:
|
||||
await ctx.send("Score must be greater than 0.")
|
||||
return
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
await settings.max_score.set(score)
|
||||
await ctx.send("Done. Points required to win set to {}.".format(score))
|
||||
|
||||
@triviaset.command(name="timelimit")
|
||||
async def triviaset_timelimit(self, ctx: commands.Context, seconds: float):
|
||||
"""Set the maximum seconds permitted to answer a question."""
|
||||
if seconds < 4.0:
|
||||
await ctx.send("Must be at least 4 seconds.")
|
||||
return
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
await settings.delay.set(seconds)
|
||||
await ctx.send("Done. Maximum seconds to answer set to {}."
|
||||
"".format(seconds))
|
||||
|
||||
@triviaset.command(name="stopafter")
|
||||
async def triviaset_stopafter(self, ctx: commands.Context, seconds: float):
|
||||
"""Set how long until trivia stops due to no response."""
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
if seconds < await settings.delay():
|
||||
await ctx.send("Must be larger than the answer time limit.")
|
||||
return
|
||||
await settings.timeout.set(seconds)
|
||||
await ctx.send("Done. Trivia sessions will now time out after {}"
|
||||
" seconds of no responses.".format(seconds))
|
||||
|
||||
@triviaset.command(name="override")
|
||||
async def triviaset_allowoverride(self,
|
||||
ctx: commands.Context,
|
||||
enabled: bool):
|
||||
"""Allow/disallow trivia lists to override settings."""
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
await settings.allow_override.set(enabled)
|
||||
enabled = "now" if enabled else "no longer"
|
||||
await ctx.send("Done. Trivia lists can {} override the trivia settings"
|
||||
" for this server.".format(enabled))
|
||||
|
||||
@triviaset.command(name="botplays")
|
||||
async def trivaset_bot_plays(self,
|
||||
ctx: commands.Context,
|
||||
true_or_false: bool):
|
||||
"""Set whether or not the bot gains points.
|
||||
|
||||
If enabled, the bot will gain a point if no one guesses correctly.
|
||||
"""
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
await settings.bot_plays.set(true_or_false)
|
||||
await ctx.send("Done. " + (
|
||||
"I'll gain a point if users don't answer in time." if true_or_false
|
||||
else "Alright, I won't embarass you at trivia anymore."))
|
||||
|
||||
@triviaset.command(name="revealanswer")
|
||||
async def trivaset_reveal_answer(self,
|
||||
ctx: commands.Context,
|
||||
true_or_false: bool):
|
||||
"""Set whether or not the answer is revealed.
|
||||
|
||||
If enabled, the bot will reveal the answer if no one guesses correctly
|
||||
in time.
|
||||
"""
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
await settings.reveal_answer.set(true_or_false)
|
||||
await ctx.send("Done. " + (
|
||||
"I'll reveal the answer if no one knows it." if true_or_false else
|
||||
"I won't reveal the answer to the questions anymore."))
|
||||
|
||||
@triviaset.command(name="payout")
|
||||
@check_global_setting_admin()
|
||||
async def triviaset_payout_multiplier(self,
|
||||
ctx: commands.Context,
|
||||
multiplier: float):
|
||||
"""Set the payout multiplier.
|
||||
|
||||
This can be any positive decimal number. If a user wins trivia when at
|
||||
least 3 members are playing, they will receive credits. Set to 0 to
|
||||
disable.
|
||||
|
||||
The number of credits is determined by multiplying their total score by
|
||||
this multiplier.
|
||||
"""
|
||||
settings = self.conf.guild(ctx.guild)
|
||||
if multiplier < 0:
|
||||
await ctx.send("Multiplier must be at least 0.")
|
||||
return
|
||||
await settings.payout_multiplier.set(multiplier)
|
||||
if not multiplier:
|
||||
await ctx.send("Done. I will no longer reward the winner with a"
|
||||
" payout.")
|
||||
return
|
||||
await ctx.send("Done. Payout multiplier set to {}.".format(multiplier))
|
||||
|
||||
@commands.group(invoke_without_command=True)
|
||||
@commands.guild_only()
|
||||
async def trivia(self, ctx: commands.Context, *categories: str):
|
||||
"""Start trivia session on the specified category.
|
||||
|
||||
You may list multiple categories, in which case the trivia will involve
|
||||
questions from all of them.
|
||||
"""
|
||||
if not categories:
|
||||
await ctx.send_help()
|
||||
return
|
||||
categories = [c.lower() for c in categories]
|
||||
session = self._get_trivia_session(ctx.channel)
|
||||
if session is not None:
|
||||
await ctx.send(
|
||||
"There is already an ongoing trivia session in this channel.")
|
||||
return
|
||||
trivia_dict = {}
|
||||
for category in reversed(categories):
|
||||
# We reverse the categories so that the first list's config takes
|
||||
# priority over the others.
|
||||
try:
|
||||
dict_ = self.get_trivia_list(category)
|
||||
except FileNotFoundError:
|
||||
await ctx.send("Invalid category `{0}`. See `{1}trivia list`"
|
||||
" for a list of trivia categories."
|
||||
"".format(category, ctx.prefix))
|
||||
except InvalidListError:
|
||||
await ctx.send("There was an error parsing the trivia list for"
|
||||
" the `{}` category. It may be formatted"
|
||||
" incorrectly.".format(category))
|
||||
else:
|
||||
trivia_dict.update(dict_)
|
||||
continue
|
||||
return
|
||||
if not trivia_dict:
|
||||
await ctx.send("The trivia list was parsed successfully, however"
|
||||
" it appears to be empty!")
|
||||
return
|
||||
settings = await self.conf.guild(ctx.guild).all()
|
||||
config = trivia_dict.pop("CONFIG", None)
|
||||
if config and settings["allow_override"]:
|
||||
settings.update(config)
|
||||
session = TriviaSession.start(ctx, trivia_dict, settings)
|
||||
self.trivia_sessions.append(session)
|
||||
LOG.debug("New trivia session; #%s in %d", ctx.channel, ctx.guild.id)
|
||||
|
||||
@trivia.command(name="stop")
|
||||
async def trivia_stop(self, ctx: commands.Context):
|
||||
"""Stop an ongoing trivia session."""
|
||||
session = self._get_trivia_session(ctx.channel)
|
||||
if session is None:
|
||||
await ctx.send(
|
||||
"There is no ongoing trivia session in this channel.")
|
||||
return
|
||||
author = ctx.author
|
||||
auth_checks = (await ctx.bot.is_owner(author), await
|
||||
ctx.bot.is_mod(author), await ctx.bot.is_admin(author),
|
||||
author == ctx.guild.owner, author == session.ctx.author)
|
||||
if any(auth_checks):
|
||||
await session.end_game()
|
||||
session.force_stop()
|
||||
await ctx.send("Trivia stopped.")
|
||||
else:
|
||||
await ctx.send("You are not allowed to do that.")
|
||||
|
||||
@trivia.command(name="list")
|
||||
async def trivia_list(self, ctx: commands.Context):
|
||||
"""List available trivia categories."""
|
||||
lists = set(p.stem for p in self._all_lists())
|
||||
|
||||
msg = box("**Available trivia lists**\n\n{}"
|
||||
"".format(", ".join(sorted(lists))))
|
||||
if len(msg) > 1000:
|
||||
await ctx.author.send(msg)
|
||||
return
|
||||
await ctx.send(msg)
|
||||
|
||||
@trivia.group(name="leaderboard", aliases=["lboard"])
|
||||
async def trivia_leaderboard(self, ctx: commands.Context):
|
||||
"""Leaderboard for trivia.
|
||||
|
||||
Defaults to the top 10 of this server, sorted by total wins. Use
|
||||
subcommands for a more customised leaderboard.
|
||||
"""
|
||||
if ctx.invoked_subcommand == self.trivia_leaderboard:
|
||||
cmd = self.trivia_leaderboard_server
|
||||
if isinstance(ctx.channel, discord.abc.PrivateChannel):
|
||||
cmd = self.trivia_leaderboard_global
|
||||
await ctx.invoke(cmd, "wins", 10)
|
||||
|
||||
@trivia_leaderboard.command(name="server")
|
||||
@commands.guild_only()
|
||||
async def trivia_leaderboard_server(self,
|
||||
ctx: commands.Context,
|
||||
sort_by: str="wins",
|
||||
top: int=10):
|
||||
"""Leaderboard for this server.
|
||||
|
||||
<sort_by> can be any of the following fields:
|
||||
- wins : total wins
|
||||
- avg : average score
|
||||
- total : total correct answers
|
||||
|
||||
<top> is the number of ranks to show on the leaderboard.
|
||||
"""
|
||||
key = self._get_sort_key(sort_by)
|
||||
if key is None:
|
||||
await ctx.send("Unknown field `{}`, see `{}help trivia "
|
||||
"leaderboard server` for valid fields to sort by."
|
||||
"".format(sort_by, ctx.prefix))
|
||||
return
|
||||
guild = ctx.guild
|
||||
data = await self.conf.all_members(guild)
|
||||
data = {guild.get_member(u): d for u, d in data.items()}
|
||||
data.pop(None, None) # remove any members which aren't in the guild
|
||||
await self.send_leaderboard(ctx, data, key, top)
|
||||
|
||||
@trivia_leaderboard.command(name="global")
|
||||
async def trivia_leaderboard_global(self,
|
||||
ctx: commands.Context,
|
||||
sort_by: str="wins",
|
||||
top: int=10):
|
||||
"""Global trivia leaderboard.
|
||||
|
||||
<sort_by> can be any of the following fields:
|
||||
- wins : total wins
|
||||
- avg : average score
|
||||
- total : total correct answers from all sessions
|
||||
- games : total games played
|
||||
|
||||
<top> is the number of ranks to show on the leaderboard.
|
||||
"""
|
||||
key = self._get_sort_key(sort_by)
|
||||
if key is None:
|
||||
await ctx.send("Unknown field `{}`, see `{}help trivia "
|
||||
"leaderboard global` for valid fields to sort by."
|
||||
"".format(sort_by, ctx.prefix))
|
||||
return
|
||||
data = await self.conf.all_members()
|
||||
collated_data = {}
|
||||
for guild_id, guild_data in data.items():
|
||||
guild = ctx.bot.get_guild(guild_id)
|
||||
if guild is None:
|
||||
continue
|
||||
for member_id, member_data in guild_data.items():
|
||||
member = guild.get_member(member_id)
|
||||
if member is None:
|
||||
continue
|
||||
collated_member_data = collated_data.get(member, Counter())
|
||||
for v_key, value in member_data.items():
|
||||
collated_member_data[v_key] += value
|
||||
collated_data[member] = collated_member_data
|
||||
await self.send_leaderboard(ctx, collated_data, key, top)
|
||||
|
||||
def _get_sort_key(self, key: str):
|
||||
key = key.lower()
|
||||
if key in ("wins", "average_score", "total_score", "games"):
|
||||
return key
|
||||
elif key in ("avg", "average"):
|
||||
return "average_score"
|
||||
elif key in ("total", "score", "answers", "correct"):
|
||||
return "total_score"
|
||||
|
||||
async def send_leaderboard(self,
|
||||
ctx: commands.Context,
|
||||
data: dict,
|
||||
key: str,
|
||||
top: int):
|
||||
"""Send the leaderboard from the given data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ctx : commands.Context
|
||||
The context to send the leaderboard to.
|
||||
data : dict
|
||||
The data for the leaderboard. This must map `discord.Member` ->
|
||||
`dict`.
|
||||
key : str
|
||||
The field to sort the data by. Can be ``wins``, ``total_score``,
|
||||
``games`` or ``average_score``.
|
||||
top : int
|
||||
The number of members to display on the leaderboard.
|
||||
|
||||
Returns
|
||||
-------
|
||||
`list` of `discord.Message`
|
||||
The sent leaderboard messages.
|
||||
|
||||
"""
|
||||
if not data:
|
||||
await ctx.send("There are no scores on record!")
|
||||
return
|
||||
leaderboard = self._get_leaderboard(data, key, top)
|
||||
ret = []
|
||||
for page in pagify(leaderboard):
|
||||
ret.append(await ctx.send(box(page, lang="py")))
|
||||
return ret
|
||||
|
||||
def _get_leaderboard(self, data: dict, key: str, top: int):
|
||||
# Mix in average score
|
||||
for member, stats in data.items():
|
||||
if stats["games"] != 0:
|
||||
stats["average_score"] = stats["total_score"] / stats["games"]
|
||||
else:
|
||||
stats["average_score"] = 0.0
|
||||
# Sort by reverse order of priority
|
||||
priority = ["average_score", "total_score", "wins", "games"]
|
||||
try:
|
||||
priority.remove(key)
|
||||
except ValueError:
|
||||
raise ValueError("{} is not a valid key".format(key))
|
||||
# Put key last in reverse priority
|
||||
priority.append(key)
|
||||
items = data.items()
|
||||
for key in priority:
|
||||
items = sorted(items, key=lambda t: t[1][key], reverse=True)
|
||||
max_name_len = max(map(lambda m: len(str(m)), data.keys()))
|
||||
# Headers
|
||||
headers = ("Rank", "Member{}".format(" " * (max_name_len - 6)), "Wins",
|
||||
"Games Played", "Total Score", "Average Score")
|
||||
lines = [" | ".join(headers)]
|
||||
# Header underlines
|
||||
lines.append(" | ".join(("-" * len(h) for h in headers)))
|
||||
for rank, tup in enumerate(items, 1):
|
||||
member, m_data = tup
|
||||
# Align fields to header width
|
||||
fields = tuple(map(str, (rank,
|
||||
member,
|
||||
m_data["wins"],
|
||||
m_data["games"],
|
||||
m_data["total_score"],
|
||||
round(m_data["average_score"], 2))))
|
||||
padding = [
|
||||
" " * (len(h) - len(f)) for h, f in zip(headers, fields)
|
||||
]
|
||||
fields = tuple(f + padding[i] for i, f in enumerate(fields))
|
||||
lines.append(" | ".join(fields).format(member=member, **m_data))
|
||||
if rank == top:
|
||||
break
|
||||
return "\n".join(lines)
|
||||
|
||||
async def on_trivia_end(self, session: TriviaSession):
|
||||
"""Event for a trivia session ending.
|
||||
|
||||
This method removes the session from this cog's sessions, and
|
||||
cancels any tasks which it was running.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
session : TriviaSession
|
||||
The session which has just ended.
|
||||
|
||||
"""
|
||||
channel = session.ctx.channel
|
||||
LOG.debug("Ending trivia session; #%s in %s", channel,
|
||||
channel.guild.id)
|
||||
if session in self.trivia_sessions:
|
||||
self.trivia_sessions.remove(session)
|
||||
if session.scores:
|
||||
await self.update_leaderboard(session)
|
||||
|
||||
async def update_leaderboard(self, session):
|
||||
"""Update the leaderboard with the given scores.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
session : TriviaSession
|
||||
The trivia session to update scores from.
|
||||
|
||||
"""
|
||||
max_score = session.settings["max_score"]
|
||||
for member, score in session.scores.items():
|
||||
if member.id == session.ctx.bot.user.id:
|
||||
continue
|
||||
stats = await self.conf.member(member).all()
|
||||
if score == max_score:
|
||||
stats["wins"] += 1
|
||||
stats["total_score"] += score
|
||||
stats["games"] += 1
|
||||
await self.conf.member(member).set(stats)
|
||||
|
||||
def get_trivia_list(self, category: str) -> dict:
|
||||
"""Get the trivia list corresponding to the given category.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
category : str
|
||||
The desired category. Case sensitive.
|
||||
|
||||
Returns
|
||||
-------
|
||||
`dict`
|
||||
A dict mapping questions (`str`) to answers (`list` of `str`).
|
||||
|
||||
"""
|
||||
try:
|
||||
path = next(p for p in self._all_lists() if p.stem == category)
|
||||
except StopIteration:
|
||||
raise FileNotFoundError("Could not find the `{}` category"
|
||||
"".format(category))
|
||||
|
||||
with path.open() as file:
|
||||
try:
|
||||
dict_ = yaml.load(file)
|
||||
except yaml.error.YAMLError as exc:
|
||||
raise InvalidListError("YAML parsing failed") from exc
|
||||
else:
|
||||
return dict_
|
||||
|
||||
def _get_trivia_session(self,
|
||||
channel: discord.TextChannel) -> TriviaSession:
|
||||
return next((session for session in self.trivia_sessions
|
||||
if session.ctx.channel == channel), None)
|
||||
|
||||
def _all_lists(self):
|
||||
personal_lists = tuple(p.resolve()
|
||||
for p in cog_data_path(self).glob("*.yaml"))
|
||||
|
||||
return personal_lists + tuple(redbot.trivia.lists())
|
||||
|
||||
def __unload(self):
|
||||
for session in self.trivia_sessions:
|
||||
session.force_stop()
|
||||
@ -3,4 +3,6 @@ appdirs
|
||||
youtube_dl
|
||||
raven
|
||||
colorama
|
||||
aiohttp-json-rpc
|
||||
aiohttp-json-rpc
|
||||
pyyaml
|
||||
Red-Trivia
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user