Friday, September 23, 2016

Python3 Asyncio PubSub Plaything


#!/usr/bin/env python
import io
import asyncio
import websockets
import logging
import collections

logger = logging.getLogger('websockets.server')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
events = collections.defaultdict(lambda: set())
#-----------------------------------------------------------------------
async def handle_outgoing_queue(websocket):
    while websocket.open:
        msg = await websocket.outbox.get()
        await websocket.send(msg)
#-----------------------------------------------------------------------
async def pubsub(websocket, path):
    websocket.prefix = path.encode()
    websocket.outbox = asyncio.Queue()
    websocket.subscriptions = set()
    sender_task = asyncio.ensure_future(handle_outgoing_queue(websocket))
    while True:
        msg = await websocket.recv()
        if msg is None: break
        if isinstance(msg, str): msg = msg.encode()
        stream = io.BytesIO(msg)
        await handle_message(websocket, stream)
    sender_task.cancel()
    for name in websocket.subscriptions:
        try:
            events[name].remove(websocket)
        except KeyError:
            pass
#-----------------------------------------------------------------------
async def handle_message(websocket, stream):
    cmd = stream.readline().strip()
    name = websocket.prefix + stream.readline().strip()
    print(cmd, name);
    if cmd == b"SUB":
        events[name].add(websocket)
        websocket.subscriptions.add(name)
    elif cmd == b"UNS":
        subscribers = events[name]
        try:
            websocket.subscriptions.remove(name)
        except KeyError:
            pass
        try:
            subscribers.remove(websocket)
        except KeyError:
            pass
    elif cmd == b"PUB":
        stream.seek(0)
        msg = stream.read()
        for subscriber in events[name]:
            await subscriber.outbox.put(msg)
#-----------------------------------------------------------------------
start_server = websockets.serve(pubsub, '0.0.0.0', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Popular Posts