import asyncio from typing import AsyncGenerator from quart import Quart, render_template, websocket, g import logging logger = logging.getLogger(__name__) app = Quart(__name__) connections = set() webui = None @app.route("/") async def index(): return await render_template("index.html") @app.route("/api") async def json(): return {"hello": "world"} @app.route("/bots") async def bots(): return await render_template("bots.html", bots=webui.bots) @app.route("/bot/") async def bot(bot_id): return await render_template("bot.html", bot=webui.bots[bot_id], bot_id=bot_id) @app.route("/bot//room/") async def room(bot_id, room_id): return await render_template("room.html", room=webui.bots[bot_id].rooms[room_id], room_id=room_id) async def _send() -> None: connection = asyncio.Queue() connections.add(connection) try: while True: message = await connection.get() await websocket.send(message) finally: connections.remove(connection) async def _receive() -> None: while True: message = await websocket.receive() for connection in connections: await connection.put(message) @app.websocket("/ws") async def ws() -> None: producer = asyncio.create_task(_send()) consumer = asyncio.create_task(_receive()) await asyncio.gather(producer, consumer) #await websocket.send_json({"hello": "world"}) class WebUI(object): """The Web interface.""" def __init__(self): self.shutdown_event = asyncio.Event() self.task = None self.bots = [] app.config["PROPAGATE_EXCEPTIONS"] = True global webui webui = self def run_task(self): return app.run_task(port=5000, debug=True, shutdown_trigger=self.shutdown_event.wait) async def stop(self): self.shutdown_event.set() self.task.cancel() # or close? async def connect(self, bots): self.bots = bots