websocket

示例代码

index.py

import aiohttp
from aiohttp.web import Request, Application, WebSocketResponse

from lessweb import Bridge, service, get_mapping


@service
class WhoCache:
    def __init__(self):
        self.who = '-'

    def get(self):
        return self.who

    def set(self, who: str):
        self.who = who


@get_mapping('/ws')
async def websocket_dealer(request: Request, who_cache: WhoCache):
    print('Websocket connection starting')
    ws = WebSocketResponse()
    await ws.prepare(request)
    print('Websocket connection ready')

    async for msg in ws:
        print(msg)
        if msg.type == aiohttp.WSMsgType.TEXT:
            print(msg.data)
            if msg.data == 'close':
                await ws.close()
            else:
                who = who_cache.get()
                await ws.send_str(f'{msg.data}@{who}')

    print('Websocket connection closed')
    return ws


@get_mapping('/set')
async def edit_who(who_cache: WhoCache, *, who: str):
    who_cache.set(who)
    return {'success': True}


if __name__ == '__main__':
    bridge = Bridge()
    bridge.app['who_cache'] = WhoCache()
    bridge.add_route(websocket_dealer)
    bridge.add_route(edit_who)
    bridge.run_app()

测试步骤

  1. 安装一款叫Dark WebSocket Terminal的Chrome应用,用来发送WebSocket请求。
  2. 运行上面的index.py,启动监听8080端口的服务。
  3. 在Dark WebSocket Terminal输入/connect ws://localhost:8080/ws,会显示连接成功。
  4. 在Dark WebSocket Terminal发送a,会收到a@-
  5. 在终端执行curl "http://localhost:8080/set?who=John",会返回{"success": true}
  6. 在Dark WebSocket Terminal发送b,会收到b@John
  7. 在Dark WebSocket Terminal发送/disconnect,会显示连接断开。

参考