Red-DiscordBot/tests/core/test_utils.py
Caleb Johnson 1329fa1b09 [CogManager, Utils] Handle missing cogs correctly, add some helpful algorithms (#1989)
* Handle missing cogs correctly, add some helpful algorithms

For cog loading, only show "cog not found" if the module in question was the one
that failed to import. ImportErrors within cogs will show an error as they should.

- deduplicator, benchmarked to be the fastest
- bounded gather and bounded async as_completed
- tests for all additions

* Requested changes + wrap as_completed instead

So I went source diving and realized as_completed works the way I want it to,
and I don't need to reinvent the wheel for cancelling tasks that remain
if the generator is `break`ed out of. So there's that.
2018-08-21 11:26:04 +10:00

194 lines
6.0 KiB
Python

import asyncio
import pytest
import random
import textwrap
import warnings
from redbot.core.utils import (
chat_formatting,
bounded_gather,
bounded_gather_iter,
deduplicate_iterables,
)
def test_bordered_symmetrical():
expected = textwrap.dedent(
"""\
┌──────────────┐ ┌─────────────┐
│one │ │four │
│two │ │five │
│three │ │six │
└──────────────┘ └─────────────┘"""
)
col1, col2 = ["one", "two", "three"], ["four", "five", "six"]
assert chat_formatting.bordered(col1, col2) == expected
def test_bordered_asymmetrical():
expected = textwrap.dedent(
"""\
┌──────────────┐ ┌──────────────┐
│one │ │four │
│two │ │five │
│three │ │six │
└──────────────┘ │seven │
└──────────────┘"""
)
col1, col2 = ["one", "two", "three"], ["four", "five", "six", "seven"]
assert chat_formatting.bordered(col1, col2) == expected
def test_bordered_asymmetrical_2():
expected = textwrap.dedent(
"""\
┌──────────────┐ ┌─────────────┐
│one │ │five │
│two │ │six │
│three │ └─────────────┘
│four │
└──────────────┘ """
)
col1, col2 = ["one", "two", "three", "four"], ["five", "six"]
assert chat_formatting.bordered(col1, col2) == expected
def test_bordered_ascii():
expected = textwrap.dedent(
"""\
---------------- ---------------
|one | |four |
|two | |five |
|three | |six |
---------------- ---------------"""
)
col1, col2 = ["one", "two", "three"], ["four", "five", "six"]
assert chat_formatting.bordered(col1, col2, ascii_border=True) == expected
def test_deduplicate_iterables():
expected = [1, 2, 3, 4, 5]
inputs = [[1, 2, 1], [3, 1, 2, 4], [5, 1, 2]]
assert deduplicate_iterables(*inputs) == expected
@pytest.mark.asyncio
async def test_bounded_gather():
status = [0, 0] # num_running, max_running
async def wait_task(i, delay, status, fail=False):
status[0] += 1
await asyncio.sleep(delay)
status[1] = max(status)
status[0] -= 1
if fail:
raise RuntimeError
return i
num_concurrent = random.randint(2, 8)
num_tasks = random.randint(4 * num_concurrent, 5 * num_concurrent)
num_fail = random.randint(num_concurrent, num_tasks)
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
num_failed = 0
results = await bounded_gather(*tasks, limit=num_concurrent, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, RuntimeError):
num_failed += 1
else:
assert result == i # verify original orde
assert 0 <= result < num_tasks
assert 0 < status[1] <= num_concurrent
assert num_fail == num_failed
@pytest.mark.asyncio
async def test_bounded_gather_iter():
status = [0, 0] # num_running, max_running
async def wait_task(i, delay, status, fail=False):
status[0] += 1
await asyncio.sleep(delay)
status[1] = max(status)
status[0] -= 1
if fail:
raise RuntimeError
return i
num_concurrent = random.randint(2, 8)
num_tasks = random.randint(4 * num_concurrent, 16 * num_concurrent)
num_fail = random.randint(num_concurrent, num_tasks)
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
random.shuffle(tasks)
num_failed = 0
for result in bounded_gather_iter(*tasks, limit=num_concurrent):
try:
result = await result
except RuntimeError:
num_failed += 1
continue
assert 0 <= result < num_tasks
assert 0 < status[1] <= num_concurrent
assert num_fail == num_failed
@pytest.mark.skip(reason="spams logs with pending task warnings")
@pytest.mark.asyncio
async def test_bounded_gather_iter_cancel():
status = [0, 0, 0] # num_running, max_running, num_ran
async def wait_task(i, delay, status, fail=False):
status[0] += 1
await asyncio.sleep(delay)
status[1] = max(status[:2])
status[0] -= 1
if fail:
raise RuntimeError
status[2] += 1
return i
num_concurrent = random.randint(2, 8)
num_tasks = random.randint(4 * num_concurrent, 16 * num_concurrent)
quit_on = random.randint(0, num_tasks)
num_fail = random.randint(num_concurrent, num_tasks)
tasks = [wait_task(i, random.random() / 1000, status) for i in range(num_tasks)]
tasks += [wait_task(i, random.random() / 1000, status, fail=True) for i in range(num_fail)]
random.shuffle(tasks)
num_failed = 0
i = 0
for result in bounded_gather_iter(*tasks, limit=num_concurrent):
try:
result = await result
except RuntimeError:
num_failed += 1
continue
if i == quit_on:
break
assert 0 <= result < num_tasks
i += 1
assert 0 < status[1] <= num_concurrent
assert quit_on <= status[2] <= quit_on + num_concurrent
assert num_failed <= num_fail