mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-07 03:38:53 -05:00
[Image] Complete rewrite: no longer blocking
This commit is contained in:
parent
b5398aeacb
commit
3d9a157516
224
cogs/image.py
224
cogs/image.py
@ -1,111 +1,153 @@
|
||||
import discord
|
||||
from discord.ext import commands
|
||||
from random import randint
|
||||
from random import choice
|
||||
import aiohttp
|
||||
import random
|
||||
import functools
|
||||
import asyncio
|
||||
|
||||
try:
|
||||
from imgurpython import ImgurClient
|
||||
except:
|
||||
ImgurClient = False
|
||||
|
||||
CLIENT_ID = "1fd3ef04daf8cab"
|
||||
CLIENT_SECRET = "f963e574e8e3c17993c933af4f0522e1dc01e230"
|
||||
GIPHY_API_KEY = "dc6zaTOxFJmzC"
|
||||
|
||||
|
||||
class Image:
|
||||
"""Image related commands."""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
#Reserved for further ... stuff
|
||||
self.imgur = ImgurClient(CLIENT_ID, CLIENT_SECRET)
|
||||
|
||||
"""Commands section"""
|
||||
@commands.group(name="imgur", no_pm=True, pass_context=True)
|
||||
async def _imgur(self, ctx):
|
||||
"""Retrieves pictures from imgur"""
|
||||
if ctx.invoked_subcommand is None:
|
||||
await self.bot.send_cmd_help(ctx)
|
||||
|
||||
@commands.command(no_pm=True)
|
||||
async def imgur(self, *text):
|
||||
"""Retrieves a picture from imgur
|
||||
@_imgur.command(pass_context=True, name="random")
|
||||
async def imgur_random(self, ctx):
|
||||
"""Retrieves a random image from Imgur"""
|
||||
task = functools.partial(self.imgur.gallery_random, page=0)
|
||||
task = self.bot.loop.run_in_executor(None, task)
|
||||
|
||||
imgur search [keyword] - Retrieves first hit of search query.
|
||||
imgur [subreddit section] [top or new] - Retrieves top 3 hottest or latest pictures of today for given a subreddit section, e.g. 'funny'."""
|
||||
imgurclient = ImgurClient("1fd3ef04daf8cab", "f963e574e8e3c17993c933af4f0522e1dc01e230")
|
||||
if text == ():
|
||||
rand = randint(0, 59) #60 results per generated page
|
||||
items = imgurclient.gallery_random(page=0)
|
||||
await self.bot.say(items[rand].link)
|
||||
elif text[0] == "search":
|
||||
items = imgurclient.gallery_search(" ".join(text[1:len(text)]), advanced=None, sort='time', window='all', page=0)
|
||||
if len(items) < 1:
|
||||
try:
|
||||
results = await asyncio.wait_for(task, timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
await self.bot.say("Error: request timed out")
|
||||
else:
|
||||
item = choice(results)
|
||||
link = item.gifv if hasattr(item, "gifv") else item.link
|
||||
await self.bot.say(link)
|
||||
|
||||
@_imgur.command(pass_context=True, name="search")
|
||||
async def imgur_search(self, ctx, *, term: str):
|
||||
"""Searches Imgur for the specified term"""
|
||||
task = functools.partial(self.imgur.gallery_search, term,
|
||||
advanced=None, sort='time',
|
||||
window='all', page=0)
|
||||
task = self.bot.loop.run_in_executor(None, task)
|
||||
|
||||
try:
|
||||
result = await asyncio.wait_for(task, timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
await self.bot.say("Error: request timed out")
|
||||
else:
|
||||
if result:
|
||||
await self.bot.say(result[0].link)
|
||||
else:
|
||||
await self.bot.say("Your search terms gave no results.")
|
||||
else:
|
||||
await self.bot.say(items[0].link)
|
||||
elif text[0] != ():
|
||||
try:
|
||||
if text[1] == "top":
|
||||
imgSort = "top"
|
||||
elif text[1] == "new":
|
||||
imgSort = "time"
|
||||
else:
|
||||
await self.bot.say("Only top or new is a valid subcommand.")
|
||||
return
|
||||
items = imgurclient.subreddit_gallery(text[0], sort=imgSort, window='day', page=0)
|
||||
if (len(items) < 3):
|
||||
await self.bot.say("This subreddit section does not exist, try 'funny'")
|
||||
else:
|
||||
await self.bot.say("{} {} {}".format(items[0].link, items[1].link, items[2].link))
|
||||
except:
|
||||
await self.bot.say("Type help imgur for details.")
|
||||
|
||||
@commands.command(no_pm=True)
|
||||
async def gif(self, *text):
|
||||
"""Retrieves first search result from giphy
|
||||
|
||||
gif [keyword]"""
|
||||
if len(text) > 0:
|
||||
if len(text[0]) > 1 and len(text[0]) < 20:
|
||||
try:
|
||||
msg = "+".join(text)
|
||||
search = "http://api.giphy.com/v1/gifs/search?q=" + msg + "&api_key=dc6zaTOxFJmzC"
|
||||
async with aiohttp.get(search) as r:
|
||||
result = await r.json()
|
||||
if result["data"] != []:
|
||||
url = result["data"][0]["url"]
|
||||
await self.bot.say(url)
|
||||
else:
|
||||
await self.bot.say("Your search terms gave no results.")
|
||||
except:
|
||||
await self.bot.say("Error.")
|
||||
else:
|
||||
await self.bot.say("Invalid search.")
|
||||
@_imgur.command(pass_context=True, name="subreddit")
|
||||
async def imgur_subreddit(self, ctx, subreddit: str, sort_type: str="top", window: str="day"):
|
||||
"""Gets images from the specified subreddit section
|
||||
|
||||
Sort types: new, top
|
||||
Time windows: day, week, month, year, all"""
|
||||
sort_type = sort_type.lower()
|
||||
|
||||
if sort_type not in ("new", "top"):
|
||||
await self.bot.say("Only 'new' and 'top' are a valid sort type.")
|
||||
return
|
||||
elif window not in ("day", "week", "month", "year", "all"):
|
||||
await self.bot.send_cmd_help(ctx)
|
||||
return
|
||||
|
||||
if sort_type == "new":
|
||||
sort = "time"
|
||||
elif sort_type == "top":
|
||||
sort = "top"
|
||||
|
||||
links = []
|
||||
|
||||
task = functools.partial(self.imgur.subreddit_gallery, subreddit,
|
||||
sort=sort, window=window, page=0)
|
||||
task = self.bot.loop.run_in_executor(None, task)
|
||||
try:
|
||||
items = await asyncio.wait_for(task, timeout=10)
|
||||
except asyncio.TimeoutError:
|
||||
await self.bot.say("Error: request timed out")
|
||||
return
|
||||
|
||||
for item in items[:3]:
|
||||
link = item.gifv if hasattr(item, "gifv") else item.link
|
||||
links.append("{}\n{}".format(item.title, link))
|
||||
|
||||
if links:
|
||||
await self.bot.say("\n".join(links))
|
||||
else:
|
||||
await self.bot.say("gif [text]")
|
||||
await self.bot.say("No results found.")
|
||||
|
||||
@commands.command(no_pm=True)
|
||||
async def gifr(self, *text):
|
||||
"""Retrieves a random gif from a giphy search
|
||||
|
||||
gifr [keyword]"""
|
||||
random.seed()
|
||||
if len(text) > 0:
|
||||
if len(text[0]) > 1 and len(text[0]) < 20:
|
||||
try:
|
||||
msg = "+".join(text)
|
||||
search = "http://api.giphy.com/v1/gifs/random?&api_key=dc6zaTOxFJmzC&tag=" + msg
|
||||
async with aiohttp.get(search) as r:
|
||||
result = await r.json()
|
||||
if result["data"] != []:
|
||||
url = result["data"]["url"]
|
||||
await self.bot.say(url)
|
||||
else:
|
||||
await self.bot.say("Your search terms gave no results.")
|
||||
except:
|
||||
await self.bot.say("Error.")
|
||||
else:
|
||||
await self.bot.say("Invalid search.")
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def gif(self, ctx, *keywords):
|
||||
"""Retrieves first search result from giphy"""
|
||||
if keywords:
|
||||
keywords = "+".join(keywords)
|
||||
else:
|
||||
await self.bot.say("gifr [text]")
|
||||
await self.bot.send_cmd_help(ctx)
|
||||
return
|
||||
|
||||
url = ("http://api.giphy.com/v1/gifs/search?&api_key={}&q={}"
|
||||
"".format(GIPHY_API_KEY, keywords))
|
||||
|
||||
async with aiohttp.get(url) as r:
|
||||
result = await r.json()
|
||||
if r.status == 200:
|
||||
if result["data"]:
|
||||
await self.bot.say(result["data"][0]["url"])
|
||||
else:
|
||||
await self.bot.say("No results found.")
|
||||
else:
|
||||
await self.bot.say("Error contacting the API")
|
||||
|
||||
@commands.command(pass_context=True, no_pm=True)
|
||||
async def gifr(self, ctx, *keywords):
|
||||
"""Retrieves a random gif from a giphy search"""
|
||||
if keywords:
|
||||
keywords = "+".join(keywords)
|
||||
else:
|
||||
await self.bot.send_cmd_help(ctx)
|
||||
return
|
||||
|
||||
url = ("http://api.giphy.com/v1/gifs/random?&api_key={}&tag={}"
|
||||
"".format(GIPHY_API_KEY, keywords))
|
||||
|
||||
async with aiohttp.get(url) as r:
|
||||
result = await r.json()
|
||||
if r.status == 200:
|
||||
if result["data"]:
|
||||
await self.bot.say(result["data"]["url"])
|
||||
else:
|
||||
await self.bot.say("No results found.")
|
||||
else:
|
||||
await self.bot.say("Error contacting the API")
|
||||
|
||||
class ModuleNotFound(Exception):
|
||||
def __init__(self, m):
|
||||
self.message = m
|
||||
def __str__(self):
|
||||
return self.message
|
||||
|
||||
def setup(bot):
|
||||
global ImgurClient
|
||||
try:
|
||||
from imgurpython import ImgurClient
|
||||
except:
|
||||
raise ModuleNotFound("imgurpython is not installed. Do 'pip3 install imgurpython' to use this cog.")
|
||||
if ImgurClient is False:
|
||||
raise RuntimeError("You need the imgurpython module to use this.\n"
|
||||
"pip3 install imgurpython")
|
||||
|
||||
bot.add_cog(Image(bot))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user