[V3 RPC] Initial RPC library switch (#1634)

* Initial RPC library switch

* Use weak refs to the methods so cog unload works

* Add docs

* Black fixes

* Add jsonrpcserver to Pipfile.lock
This commit is contained in:
Will
2018-05-22 20:29:26 -04:00
committed by palmtree5
parent abfee70eb3
commit 73a427f6aa
8 changed files with 179 additions and 99 deletions

View File

@@ -1,85 +1,135 @@
import asyncio
import weakref
from aiohttp.web import Application
from aiohttp_json_rpc import JsonRpc
from aiohttp import web
import jsonrpcserver.aio
import inspect
import logging
from .utils import TYPE_CHECKING, NewType
if TYPE_CHECKING:
from .bot import Red
__all__ = ["methods", "RPC", "Methods"]
log = logging.getLogger("red.rpc")
JsonSerializable = NewType("JsonSerializable", dict)
_rpc = JsonRpc(logger=log)
_rpc_server = None # type: asyncio.AbstractServer
async def initialize(bot: "Red"):
global _rpc_server
app = Application(loop=bot.loop)
app.router.add_route("*", "/rpc", _rpc)
handler = app.make_handler()
_rpc_server = await bot.loop.create_server(handler, "127.0.0.1", 6133)
log.debug("Created RPC _rpc_server listener.")
def add_topic(topic_name: str):
class Methods(jsonrpcserver.aio.AsyncMethods):
"""
Adds a topic for clients to listen to.
Container class for all registered RPC methods, please use the existing `methods`
attribute rather than creating a new instance of this class.
Parameters
----------
topic_name
.. warning::
**NEVER** create a new instance of this class!
"""
_rpc.add_topics(topic_name)
def __init__(self):
super().__init__()
self._items = weakref.WeakValueDictionary()
def add(self, method, name: str = None):
"""
Registers a method to the internal RPC server making it available for
RPC users to call.
.. important::
Any method added here must take ONLY JSON serializable parameters and
MUST return a JSON serializable object.
Parameters
----------
method : function
A reference to the function to register.
name : str
Name of the function as seen by the RPC clients.
"""
if not inspect.iscoroutinefunction(method):
raise TypeError("Method must be a coroutine.")
if name is None:
name = method.__qualname__
self._items[str(name)] = method
def remove(self, *, name: str = None, method=None):
"""
Unregisters an RPC method. Either a name or reference to the method must
be provided and name will take priority.
Parameters
----------
name : str
method : function
"""
if name and name in self._items:
del self._items[name]
elif method and method in self._items.values():
to_remove = []
for name, val in self._items.items():
if method == val:
to_remove.append(name)
for name in to_remove:
del self._items[name]
def all_methods(self):
"""
Lists all available method names.
Returns
-------
list of str
"""
return self._items.keys()
def notify(topic_name: str, data: JsonSerializable):
methods = Methods()
class BaseRPCMethodMixin:
def __init__(self):
methods.add(self.all_methods, name="all_methods")
async def all_methods(self):
return list(methods.all_methods())
class RPC(BaseRPCMethodMixin):
"""
Publishes a notification for the given topic name to all listening clients.
data MUST be json serializable.
note::
This method will fail silently.
Parameters
----------
topic_name
data
RPC server manager.
"""
_rpc.notify(topic_name, data)
def __init__(self, bot):
self.app = web.Application(loop=bot.loop)
self.app.router.add_post("/rpc", self.handle)
def add_method(prefix, method):
"""
Makes a method available to RPC clients. The name given to clients will be as
follows::
self.app_handler = self.app.make_handler()
"{}__{}".format(prefix, method.__name__)
self.server = None
note::
super().__init__()
This method will fail silently.
async def initialize(self):
"""
Finalizes the initialization of the RPC server and allows it to begin
accepting queries.
"""
self.server = await self.app.loop.create_server(self.app_handler, "127.0.0.1", 6133)
log.debug("Created RPC server listener.")
Parameters
----------
prefix
method
MUST BE A COROUTINE OR OBJECT.
"""
_rpc.add_methods(("", method), prefix=prefix)
def close(self):
"""
Closes the RPC server.
"""
self.server.close()
def clean_up():
if _rpc_server is not None:
_rpc_server.close()
async def handle(self, request):
request = await request.text()
response = await methods.dispatch(request)
if response.is_notification:
return web.Response()
else:
return web.json_response(response, status=response.http_status)