[Audio] Refactor internal Lavalink server management (#2495)

* Refactor internal Lavalink server management

Killing many birds with one stone here.
- Made server manager into class-based API with two public methods: `start()` and `shutdown()`. Must be re-instantiated each time it is restarted.
- Using V3 universal Lavalink.jar hosted on Cog-Creators/Lavalink-Jars repository.
- Uses output of `java -jar Lavalink.jar --version` to check if a new jar needs to be downloaded.
- `ServerManager.start()` won't return until server is ready, i.e. when "Started Launcher in X seconds" message is printed to STDOUT.
- `shlex.quote()` is used so spaces in path to Lavalink.jar don't cause issues.
- Enabling external Lavalink will cause internal server to be terminated.
- Disabling internal Lavalink will no longer reset settings in config - instead, hard-coded values will be used when connecting to an internal server.
- Internal server will now run both WS and REST servers on port 2333, meaning one less port will need to be taken up.
- Now using `asyncio.subprocess` module so waiting on and reading from subprocesses can be done asynchronously.

Signed-off-by: Toby Harradine <tobyharradine@gmail.com>

* Don't use shlex.quote on Windows

Signed-off-by: Toby <tobyharradine@gmail.com>

* Don't use shlex.quote at all

I misread a note in the python docs and assumed it was best to use it. Turns out the note only applies to `asyncio.create_subprocess_shell`.

Signed-off-by: Toby <tobyharradine@gmail.com>

* Missed the port on the rebase

* Ignore invalid architectures and inform users when commands are used.

* Style fix
This commit is contained in:
Toby Harradine 2019-04-30 11:31:28 +10:00 committed by Will
parent c79b5e6179
commit 476f441c9b
4 changed files with 287 additions and 202 deletions

View File

@ -1,31 +1,9 @@
from pathlib import Path from redbot.core import commands
import logging
from .audio import Audio from .audio import Audio
from .manager import start_lavalink_server, maybe_download_lavalink
from redbot.core import commands
from redbot.core.data_manager import cog_data_path
import redbot.core
log = logging.getLogger("red.audio")
LAVALINK_DOWNLOAD_URL = (
"https://github.com/Cog-Creators/Red-DiscordBot/releases/download/{}/Lavalink.jar"
).format(redbot.core.__version__)
LAVALINK_DOWNLOAD_DIR = cog_data_path(raw_name="Audio")
LAVALINK_JAR_FILE = LAVALINK_DOWNLOAD_DIR / "Lavalink.jar"
APP_YML_FILE = LAVALINK_DOWNLOAD_DIR / "application.yml"
BUNDLED_APP_YML_FILE = Path(__file__).parent / "data/application.yml"
async def setup(bot: commands.Bot): async def setup(bot: commands.Bot):
cog = Audio(bot) cog = Audio(bot)
if not await cog.config.use_external_lavalink():
await maybe_download_lavalink(bot.loop, cog)
await start_lavalink_server(bot.loop)
await cog.initialize() await cog.initialize()
bot.add_cog(cog) bot.add_cog(cog)

View File

@ -14,6 +14,7 @@ import os
import random import random
import re import re
import time import time
from typing import Optional
import redbot.core import redbot.core
from redbot.core import Config, commands, checks, bank from redbot.core import Config, commands, checks, bank
from redbot.core.data_manager import cog_data_path from redbot.core.data_manager import cog_data_path
@ -29,7 +30,7 @@ from redbot.core.utils.menus import (
) )
from redbot.core.utils.predicates import MessagePredicate, ReactionPredicate from redbot.core.utils.predicates import MessagePredicate, ReactionPredicate
from urllib.parse import urlparse from urllib.parse import urlparse
from .manager import shutdown_lavalink_server, start_lavalink_server, maybe_download_lavalink from .manager import ServerManager
_ = Translator("Audio", __file__) _ = Translator("Audio", __file__)
@ -43,41 +44,45 @@ log = logging.getLogger("red.audio")
class Audio(commands.Cog): class Audio(commands.Cog):
"""Play audio through voice channels.""" """Play audio through voice channels."""
_default_lavalink_settings = {
"host": "localhost",
"rest_port": 2333,
"ws_port": 2333,
"password": "youshallnotpass",
}
def __init__(self, bot): def __init__(self, bot):
super().__init__() super().__init__()
self.bot = bot self.bot = bot
self.config = Config.get_conf(self, 2711759130, force_registration=True) self.config = Config.get_conf(self, 2711759130, force_registration=True)
default_global = { default_global = dict(
"host": "localhost", status=False,
"rest_port": "2333", use_external_lavalink=False,
"ws_port": "2332", restrict=True,
"password": "youshallnotpass", current_version=redbot.core.VersionInfo.from_str("3.0.0a0").to_json(),
"status": False, localpath=str(cog_data_path(raw_name="Audio")),
"current_version": redbot.core.VersionInfo.from_str("3.0.0a0").to_json(), **self._default_lavalink_settings,
"use_external_lavalink": False, )
"restrict": True,
"localpath": str(cog_data_path(raw_name="Audio")),
}
default_guild = { default_guild = dict(
"disconnect": False, disconnect=False,
"dj_enabled": False, dj_enabled=False,
"dj_role": None, dj_role=None,
"emptydc_enabled": False, emptydc_enabled=False,
"emptydc_timer": 0, emptydc_timer=0,
"jukebox": False, jukebox=False,
"jukebox_price": 0, jukebox_price=0,
"maxlength": 0, maxlength=0,
"playlists": {}, playlists={},
"notify": False, notify=False,
"repeat": False, repeat=False,
"shuffle": False, shuffle=False,
"thumbnail": False, thumbnail=False,
"volume": 100, volume=100,
"vote_enabled": False, vote_enabled=False,
"vote_percent": 0, vote_percent=0,
} )
self.config.register_guild(**default_guild) self.config.register_guild(**default_guild)
self.config.register_global(**default_global) self.config.register_global(**default_global)
@ -86,9 +91,24 @@ class Audio(commands.Cog):
self._connect_task = None self._connect_task = None
self._disconnect_task = None self._disconnect_task = None
self._cleaned_up = False self._cleaned_up = False
self.spotify_token = None self.spotify_token = None
self.play_lock = {} self.play_lock = {}
self._manager: Optional[ServerManager] = None
async def cog_before_invoke(self, ctx):
if self.llsetup in [ctx.command, ctx.command.root_parent]:
pass
elif self._connect_task.cancelled:
await ctx.send(
"You have attempted to run Audio's Lavalink server on an unsupported"
" architecture. Only settings related commands will be available."
)
raise RuntimeError(
"Not running audio command due to invalid machine architecture for Lavalink."
)
async def initialize(self): async def initialize(self):
self._restart_connect() self._restart_connect()
self._disconnect_task = self.bot.loop.create_task(self.disconnect_timer()) self._disconnect_task = self.bot.loop.create_task(self.disconnect_timer())
@ -103,16 +123,33 @@ class Audio(commands.Cog):
async def attempt_connect(self, timeout: int = 30): async def attempt_connect(self, timeout: int = 30):
while True: # run until success while True: # run until success
external = await self.config.use_external_lavalink() external = await self.config.use_external_lavalink()
if not external: if external is False:
shutdown_lavalink_server() settings = self._default_lavalink_settings
await maybe_download_lavalink(self.bot.loop, self) host = settings["host"]
await start_lavalink_server(self.bot.loop) password = settings["password"]
rest_port = settings["rest_port"]
ws_port = settings["ws_port"]
if self._manager is not None:
await self._manager.shutdown()
self._manager = ServerManager()
try: try:
await self._manager.start()
except RuntimeError as exc:
log.exception(
"Exception whilst starting internal Lavalink server, retrying...",
exc_info=exc,
)
await asyncio.sleep(1)
continue
except asyncio.CancelledError:
log.exception("Invalid machine architecture, cannot run Lavalink.")
break
else:
host = await self.config.host() host = await self.config.host()
password = await self.config.password() password = await self.config.password()
rest_port = await self.config.rest_port() rest_port = await self.config.rest_port()
ws_port = await self.config.ws_port() ws_port = await self.config.ws_port()
try:
await lavalink.initialize( await lavalink.initialize(
bot=self.bot, bot=self.bot,
host=host, host=host,
@ -122,9 +159,10 @@ class Audio(commands.Cog):
timeout=timeout, timeout=timeout,
) )
return # break infinite loop return # break infinite loop
except Exception: except asyncio.TimeoutError:
if not external: log.error("Connecting to Lavalink server timed out, retrying...")
shutdown_lavalink_server() if external is False and self._manager is not None:
await self._manager.shutdown()
await asyncio.sleep(1) # prevent busylooping await asyncio.sleep(1) # prevent busylooping
async def event_handler(self, player, event_type, extra): async def event_handler(self, player, event_type, extra):
@ -3104,19 +3142,16 @@ class Audio(commands.Cog):
await self.config.use_external_lavalink.set(not external) await self.config.use_external_lavalink.set(not external)
if external: if external:
await self.config.host.set("localhost")
await self.config.password.set("youshallnotpass")
await self.config.rest_port.set(2333)
await self.config.ws_port.set(2332)
embed = discord.Embed( embed = discord.Embed(
colour=await ctx.embed_colour(), colour=await ctx.embed_colour(),
title=_("External lavalink server: {true_or_false}.").format( title=_("External lavalink server: {true_or_false}.").format(
true_or_false=not external true_or_false=not external
), ),
) )
embed.set_footer(text=_("Defaults reset."))
await ctx.send(embed=embed) await ctx.send(embed=embed)
else: else:
if self._manager is not None:
await self._manager.shutdown()
await self._embed_msg( await self._embed_msg(
ctx, ctx,
_("External lavalink server: {true_or_false}.").format(true_or_false=not external), _("External lavalink server: {true_or_false}.").format(true_or_false=not external),
@ -3229,6 +3264,8 @@ class Audio(commands.Cog):
async def _check_external(self): async def _check_external(self):
external = await self.config.use_external_lavalink() external = await self.config.use_external_lavalink()
if not external: if not external:
if self._manager is not None:
await self._manager.shutdown()
await self.config.use_external_lavalink.set(True) await self.config.use_external_lavalink.set(True)
return True return True
else: else:
@ -3597,7 +3634,8 @@ class Audio(commands.Cog):
lavalink.unregister_event_listener(self.event_handler) lavalink.unregister_event_listener(self.event_handler)
self.bot.loop.create_task(lavalink.close()) self.bot.loop.create_task(lavalink.close())
shutdown_lavalink_server() if self._manager is not None:
self.bot.loop.create_task(self._manager.shutdown())
self._cleaned_up = True self._cleaned_up = True
__del__ = cog_unload __del__ = cog_unload

View File

@ -1,11 +1,9 @@
server: server:
host: "localhost"
port: 2333 # REST server port: 2333 # REST server
lavalink: lavalink:
server: server:
password: "youshallnotpass" password: "youshallnotpass"
ws:
host: "localhost"
port: 2332
sources: sources:
youtube: true youtube: true
bandcamp: true bandcamp: true

View File

@ -1,72 +1,124 @@
import shlex import itertools
import pathlib
import platform
import shutil import shutil
import asyncio import asyncio
import asyncio.subprocess import asyncio.subprocess
import os
import logging import logging
import re import re
from subprocess import Popen, DEVNULL import tempfile
from typing import Optional, Tuple from typing import Optional, Tuple, ClassVar, List
from aiohttp import ClientSession import aiohttp
import redbot.core from redbot.core import data_manager
_JavaVersion = Tuple[int, int] JAR_VERSION = "3.2.0.3"
JAR_BUILD = 751
LAVALINK_DOWNLOAD_URL = (
f"https://github.com/Cog-Creators/Lavalink-Jars/releases/download/{JAR_VERSION}_{JAR_BUILD}/"
f"Lavalink.jar"
)
LAVALINK_DOWNLOAD_DIR = data_manager.cog_data_path(raw_name="Audio")
LAVALINK_JAR_FILE = LAVALINK_DOWNLOAD_DIR / "Lavalink.jar"
BUNDLED_APP_YML = pathlib.Path(__file__).parent / "data" / "application.yml"
LAVALINK_APP_YML = LAVALINK_DOWNLOAD_DIR / "application.yml"
READY_LINE_RE = re.compile(rb"Started Launcher in \S+ seconds")
BUILD_LINE_RE = re.compile(rb"Build:\s+(?P<build>\d+)")
log = logging.getLogger("red.audio.manager") log = logging.getLogger("red.audio.manager")
proc = None
shutdown = False
class ServerManager:
def has_java_error(pid): _java_available: ClassVar[Optional[bool]] = None
from . import LAVALINK_DOWNLOAD_DIR _java_version: ClassVar[Optional[Tuple[int, int]]] = None
_up_to_date: ClassVar[Optional[bool]] = None
poss_error_file = LAVALINK_DOWNLOAD_DIR / "hs_err_pid{}.log".format(pid) _blacklisted_archs = ["armv6l", "aarch32", "aarch64"]
return poss_error_file.exists()
def __init__(self) -> None:
self.ready = asyncio.Event()
async def monitor_lavalink_server(loop): self._proc: Optional[asyncio.subprocess.Process] = None
global shutdown self._monitor_task: Optional[asyncio.Task] = None
while shutdown is False: self._shutdown: bool = False
if proc.poll() is not None:
break
await asyncio.sleep(0.5)
if shutdown is False: async def start(self) -> None:
# Lavalink was shut down by something else arch_name = platform.machine()
log.info("Lavalink jar shutdown.") if arch_name in self._blacklisted_archs:
shutdown = True raise asyncio.CancelledError(
if not has_java_error(proc.pid): "You are attempting to run Lavalink audio on an unsupported machine architecture."
log.info("Restarting Lavalink jar.")
await start_lavalink_server(loop)
else:
log.error(
"Your Java is borked. Please find the hs_err_pid{}.log file"
" in the Audio data folder and report this issue.".format(proc.pid)
) )
if self._proc is not None:
if self._proc.returncode is None:
raise RuntimeError("Internal Lavalink server is already running")
else:
raise RuntimeError("Server manager has already been used - create another one")
async def has_java(loop) -> Tuple[bool, Optional[_JavaVersion]]: await self.maybe_download_jar()
# Copy the application.yml across.
# For people to customise their Lavalink server configuration they need to run it
# externally
shutil.copyfile(BUNDLED_APP_YML, LAVALINK_APP_YML)
args = await self._get_jar_args()
self._proc = await asyncio.subprocess.create_subprocess_exec(
*args,
cwd=str(LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
log.info("Internal Lavalink server started. PID: %s", self._proc.pid)
try:
await asyncio.wait_for(self._wait_for_launcher(), timeout=120)
except asyncio.TimeoutError:
log.warning("Timeout occurred whilst waiting for internal Lavalink server to be ready")
self._monitor_task = asyncio.create_task(self._monitor())
@classmethod
async def _get_jar_args(cls) -> List[str]:
java_available, java_version = await cls._has_java()
if not java_available:
raise RuntimeError("You must install Java 1.8+ for Lavalink to run.")
if java_version == (1, 8):
extra_flags = ["-Dsun.zip.disableMemoryMapping=true"]
elif java_version >= (11, 0):
extra_flags = ["-Djdk.tls.client.protocols=TLSv1.2"]
else:
extra_flags = []
return ["java", *extra_flags, "-jar", str(LAVALINK_JAR_FILE)]
@classmethod
async def _has_java(cls) -> Tuple[bool, Optional[Tuple[int, int]]]:
if cls._java_available is not None:
# Return cached value if we've checked this before
return cls._java_available, cls._java_version
java_available = shutil.which("java") is not None java_available = shutil.which("java") is not None
if not java_available: if not java_available:
return False, None cls.java_available = False
cls.java_version = None
else:
cls._java_version = version = await cls._get_java_version()
cls._java_available = (2, 0) > version >= (1, 8) or version >= (8, 0)
return cls._java_available, cls._java_version
version = await get_java_version(loop) @staticmethod
return (2, 0) > version >= (1, 8) or version >= (8, 0), version async def _get_java_version() -> Tuple[int, int]:
async def get_java_version(loop) -> _JavaVersion:
""" """
This assumes we've already checked that java exists. This assumes we've already checked that java exists.
""" """
_proc: asyncio.subprocess.Process = await asyncio.create_subprocess_exec( _proc: asyncio.subprocess.Process = await asyncio.create_subprocess_exec(
"java", "java", "-version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
"-version",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
loop=loop,
) )
# java -version outputs to stderr # java -version outputs to stderr
_, err = await _proc.communicate() _, err = await _proc.communicate()
@ -97,76 +149,95 @@ async def get_java_version(loop) -> _JavaVersion:
"issue tracker." "issue tracker."
) )
async def _wait_for_launcher(self) -> None:
log.debug("Waiting for Lavalink server to be ready")
for i in itertools.cycle(range(50)):
line = await self._proc.stdout.readline()
if READY_LINE_RE.search(line):
self.ready.set()
break
if self._proc.returncode is not None:
log.critical("Internal lavalink server exited early")
if i == 49:
# Sleep after 50 lines to prevent busylooping
await asyncio.sleep(0.1)
async def start_lavalink_server(loop): async def _monitor(self) -> None:
java_available, java_version = await has_java(loop) while self._proc.returncode is None:
if not java_available: await asyncio.sleep(0.5)
raise RuntimeError("You must install Java 1.8+ for Lavalink to run.")
if java_version == (1, 8): # This task hasn't been cancelled - Lavalink was shut down by something else
extra_flags = "-Dsun.zip.disableMemoryMapping=true" log.info("Internal Lavalink jar shutdown unexpectedly")
elif java_version >= (11, 0): if not self._has_java_error():
extra_flags = "-Djdk.tls.client.protocols=TLSv1.2" log.info("Restarting internal Lavalink server")
await self.start()
else: else:
extra_flags = "" log.critical(
"Your Java is borked. Please find the hs_err_pid{}.log file"
from . import LAVALINK_DOWNLOAD_DIR, LAVALINK_JAR_FILE " in the Audio data folder and report this issue.",
self._proc.pid,
start_cmd = "java {} -jar {}".format(extra_flags, LAVALINK_JAR_FILE.resolve())
global proc
if proc and proc.poll() is None:
return # already running
proc = Popen(
shlex.split(start_cmd, posix=os.name == "posix"),
cwd=str(LAVALINK_DOWNLOAD_DIR),
stdout=DEVNULL,
stderr=DEVNULL,
) )
log.info("Lavalink jar started. PID: {}".format(proc.pid)) def _has_java_error(self) -> bool:
global shutdown poss_error_file = LAVALINK_DOWNLOAD_DIR / "hs_err_pid{}.log".format(self._proc.pid)
shutdown = False return poss_error_file.exists()
loop.create_task(monitor_lavalink_server(loop)) async def shutdown(self) -> None:
if self._shutdown is True or self._proc is None:
# For convenience, calling this method more than once or calling it before starting it
# does nothing.
return
log.info("Shutting down internal Lavalink server")
if self._monitor_task is not None:
self._monitor_task.cancel()
self._proc.terminate()
await self._proc.wait()
self._shutdown = True
@staticmethod
async def _download_jar() -> None:
log.info("Downloading Lavalink.jar...")
async with aiohttp.ClientSession() as session:
async with session.get(LAVALINK_DOWNLOAD_URL) as response:
if response.status == 404:
raise RuntimeError(
f"Lavalink jar version {JAR_VERSION}_{JAR_BUILD} hasn't been published"
)
fd, path = tempfile.mkstemp()
file = open(fd, "wb")
try:
chunk = await response.content.read(1024)
while chunk:
file.write(chunk)
chunk = await response.content.read(1024)
file.flush()
finally:
file.close()
pathlib.Path(path).replace(LAVALINK_JAR_FILE)
def shutdown_lavalink_server(): @classmethod
global shutdown async def _is_up_to_date(cls):
shutdown = True if cls._up_to_date is True:
global proc # Return cached value if we've checked this before
if proc is not None: return True
log.info("Shutting down lavalink server.") args = await cls._get_jar_args()
proc.terminate() args.append("--version")
proc.wait() _proc = await asyncio.subprocess.create_subprocess_exec(
proc = None *args,
cwd=str(LAVALINK_DOWNLOAD_DIR),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout = (await _proc.communicate())[0]
match = BUILD_LINE_RE.search(stdout)
if not match:
# Output is unexpected, suspect corrupted jarfile
return False
build = int(match["build"])
cls._up_to_date = build == JAR_BUILD
return cls._up_to_date
@classmethod
async def download_lavalink(session): async def maybe_download_jar(cls):
from . import LAVALINK_DOWNLOAD_URL, LAVALINK_JAR_FILE if not (LAVALINK_JAR_FILE.exists() and await cls._is_up_to_date()):
await cls._download_jar()
with LAVALINK_JAR_FILE.open(mode="wb") as f:
async with session.get(LAVALINK_DOWNLOAD_URL) as resp:
while True:
chunk = await resp.content.read(512)
if not chunk:
break
f.write(chunk)
async def maybe_download_lavalink(loop, cog):
from . import LAVALINK_DOWNLOAD_DIR, LAVALINK_JAR_FILE, BUNDLED_APP_YML_FILE, APP_YML_FILE
jar_exists = LAVALINK_JAR_FILE.exists()
current_build = redbot.VersionInfo.from_json(await cog.config.current_version())
if not jar_exists or current_build < redbot.core.version_info:
log.info("Downloading Lavalink.jar")
LAVALINK_DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
async with ClientSession(loop=loop) as session:
await download_lavalink(session)
await cog.config.current_version.set(redbot.core.version_info.to_json())
shutil.copyfile(str(BUNDLED_APP_YML_FILE), str(APP_YML_FILE))