mirror of
https://github.com/Cog-Creators/Red-DiscordBot.git
synced 2025-11-06 03:08:55 -05:00
* Handle missing cogs correctly, add some helpful algorithms For cog loading, only show "cog not found" if the module in question was the one that failed to import. ImportErrors within cogs will show an error as they should. - deduplicator, benchmarked to be the fastest - bounded gather and bounded async as_completed - tests for all additions * Requested changes + wrap as_completed instead So I went source diving and realized as_completed works the way I want it to, and I don't need to reinvent the wheel for cancelling tasks that remain if the generator is `break`ed out of. So there's that.
194 lines
6.0 KiB
Python
194 lines
6.0 KiB
Python
import asyncio
|
|
import pytest
|
|
import random
|
|
import textwrap
|
|
import warnings
|
|
from redbot.core.utils import (
|
|
chat_formatting,
|
|
bounded_gather,
|
|
bounded_gather_iter,
|
|
deduplicate_iterables,
|
|
)
|
|
|
|
|
|
def test_bordered_symmetrical():
|
|
expected = textwrap.dedent(
|
|
"""\
|
|
┌──────────────┐ ┌─────────────┐
|
|
│one │ │four │
|
|
│two │ │five │
|
|
│three │ │six │
|
|
└──────────────┘ └─────────────┘"""
|
|
)
|
|
col1, col2 = ["one", "two", "three"], ["four", "five", "six"]
|
|
assert chat_formatting.bordered(col1, col2) == expected
|
|
|
|
|
|
def test_bordered_asymmetrical():
|
|
expected = textwrap.dedent(
|
|
"""\
|
|
┌──────────────┐ ┌──────────────┐
|
|
│one │ │four │
|
|
│two │ │five │
|
|
│three │ │six │
|
|
└──────────────┘ │seven │
|
|
└──────────────┘"""
|
|
)
|
|
col1, col2 = ["one", "two", "three"], ["four", "five", "six", "seven"]
|
|
assert chat_formatting.bordered(col1, col2) == expected
|
|
|
|
|
|
def test_bordered_asymmetrical_2():
|
|
expected = textwrap.dedent(
|
|
"""\
|
|
┌──────────────┐ ┌─────────────┐
|
|
│one │ │five │
|
|
│two │ │six │
|
|
│three │ └─────────────┘
|
|
│four │
|
|
└──────────────┘ """
|
|
)
|
|
col1, col2 = ["one", "two", "three", "four"], ["five", "six"]
|
|
assert chat_formatting.bordered(col1, col2) == expected
|
|
|
|
|
|
def test_bordered_ascii():
|
|
expected = textwrap.dedent(
|
|
"""\
|
|
---------------- ---------------
|
|
|one | |four |
|
|
|two | |five |
|
|
|three | |six |
|
|
---------------- ---------------"""
|
|
)
|
|
col1, col2 = ["one", "two", "three"], ["four", "five", "six"]
|
|
assert chat_formatting.bordered(col1, col2, ascii_border=True) == expected
|
|
|
|
|
|
def test_deduplicate_iterables():
|
|
expected = [1, 2, 3, 4, 5]
|
|
inputs = [[1, 2, 1], [3, 1, 2, 4], [5, 1, 2]]
|
|
assert deduplicate_iterables(*inputs) == expected
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bounded_gather():
|
|
status = [0, 0] # num_running, max_running
|
|
|
|
async def wait_task(i, delay, status, fail=False):
|
|
status[0] += 1
|
|
await asyncio.sleep(delay)
|
|
status[1] = max(status)
|
|
status[0] -= 1
|
|
|
|
if fail:
|
|
raise RuntimeError
|
|
|
|
return i
|
|
|
|
num_concurrent = random.randint(2, 8)
|
|
num_tasks = random.randint(4 * num_concurrent, 5 * num_concurrent)
|
|
num_fail = random.randint(num_concurrent, num_tasks)
|
|
|
|
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
|
|
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
|
|
|
|
num_failed = 0
|
|
|
|
results = await bounded_gather(*tasks, limit=num_concurrent, return_exceptions=True)
|
|
|
|
for i, result in enumerate(results):
|
|
if isinstance(result, RuntimeError):
|
|
num_failed += 1
|
|
else:
|
|
assert result == i # verify original orde
|
|
assert 0 <= result < num_tasks
|
|
|
|
assert 0 < status[1] <= num_concurrent
|
|
assert num_fail == num_failed
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bounded_gather_iter():
|
|
status = [0, 0] # num_running, max_running
|
|
|
|
async def wait_task(i, delay, status, fail=False):
|
|
status[0] += 1
|
|
await asyncio.sleep(delay)
|
|
status[1] = max(status)
|
|
status[0] -= 1
|
|
|
|
if fail:
|
|
raise RuntimeError
|
|
|
|
return i
|
|
|
|
num_concurrent = random.randint(2, 8)
|
|
num_tasks = random.randint(4 * num_concurrent, 16 * num_concurrent)
|
|
num_fail = random.randint(num_concurrent, num_tasks)
|
|
|
|
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
|
|
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
|
|
random.shuffle(tasks)
|
|
|
|
num_failed = 0
|
|
|
|
for result in bounded_gather_iter(*tasks, limit=num_concurrent):
|
|
try:
|
|
result = await result
|
|
except RuntimeError:
|
|
num_failed += 1
|
|
continue
|
|
|
|
assert 0 <= result < num_tasks
|
|
|
|
assert 0 < status[1] <= num_concurrent
|
|
assert num_fail == num_failed
|
|
|
|
|
|
@pytest.mark.skip(reason="spams logs with pending task warnings")
|
|
@pytest.mark.asyncio
|
|
async def test_bounded_gather_iter_cancel():
|
|
status = [0, 0, 0] # num_running, max_running, num_ran
|
|
|
|
async def wait_task(i, delay, status, fail=False):
|
|
status[0] += 1
|
|
await asyncio.sleep(delay)
|
|
status[1] = max(status[:2])
|
|
status[0] -= 1
|
|
|
|
if fail:
|
|
raise RuntimeError
|
|
|
|
status[2] += 1
|
|
return i
|
|
|
|
num_concurrent = random.randint(2, 8)
|
|
num_tasks = random.randint(4 * num_concurrent, 16 * num_concurrent)
|
|
quit_on = random.randint(0, num_tasks)
|
|
num_fail = random.randint(num_concurrent, num_tasks)
|
|
|
|
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
|
|
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
|
|
random.shuffle(tasks)
|
|
|
|
num_failed = 0
|
|
i = 0
|
|
|
|
for result in bounded_gather_iter(*tasks, limit=num_concurrent):
|
|
try:
|
|
result = await result
|
|
except RuntimeError:
|
|
num_failed += 1
|
|
continue
|
|
|
|
if i == quit_on:
|
|
break
|
|
|
|
assert 0 <= result < num_tasks
|
|
i += 1
|
|
|
|
assert 0 < status[1] <= num_concurrent
|
|
assert quit_on <= status[2] <= quit_on + num_concurrent
|
|
assert num_failed <= num_fail
|