import asyncio from typing import AsyncGenerator from quart import Quart, render_template, websocket import logging logger = logging.getLogger(__name__) app = Quart(__name__) connections = set() @app.route("/") async def index(): return await render_template("index.html") @app.route("/api") async def json(): return {"hello": "world"} async def _send() -> None: connection = asyncio.Queue() connections.add(connection) try: while True: message = await connection.get() await websocket.send(message) finally: connections.remove(connection) async def _receive() -> None: while True: message = await websocket.receive() for connection in connections: await connection.put(message) @app.websocket("/ws") async def ws() -> None: producer = asyncio.create_task(_send()) consumer = asyncio.create_task(_receive()) await asyncio.gather(producer, consumer) #await websocket.send_json({"hello": "world"}) class WebUI(object): """The Web interface.""" def __init__(self): self.shutdown_event = asyncio.Event() self.task = None app.config["PROPAGATE_EXCEPTIONS"] = True pass def run_task(self): return app.run_task(port=5000, debug=True, shutdown_trigger=self.shutdown_event.wait) async def stop(self): self.shutdown_event.set() self.task.cancel() # or close?