websocket
示例代码
index.py
import aiohttp
from aiohttp.web import Request, Application, WebSocketResponse
from lessweb import Bridge, service, get_mapping
@service
class WhoCache:
def __init__(self):
self.who = '-'
def get(self):
return self.who
def set(self, who: str):
self.who = who
@get_mapping('/ws')
async def websocket_dealer(request: Request, who_cache: WhoCache):
print('Websocket connection starting')
ws = WebSocketResponse()
await ws.prepare(request)
print('Websocket connection ready')
async for msg in ws:
print(msg)
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
if msg.data == 'close':
await ws.close()
else:
who = who_cache.get()
await ws.send_str(f'{msg.data}@{who}')
print('Websocket connection closed')
return ws
@get_mapping('/set')
async def edit_who(who_cache: WhoCache, *, who: str):
who_cache.set(who)
return {'success': True}
if __name__ == '__main__':
bridge = Bridge()
bridge.app['who_cache'] = WhoCache()
bridge.add_route(websocket_dealer)
bridge.add_route(edit_who)
bridge.run_app()
测试步骤
- 安装一款叫Dark WebSocket Terminal的Chrome应用,用来发送WebSocket请求。
- 运行上面的index.py,启动监听8080端口的服务。
- 在Dark WebSocket Terminal输入
/connect ws://localhost:8080/ws
,会显示连接成功。 - 在Dark WebSocket Terminal发送
a
,会收到a@-
。 - 在终端执行
curl "http://localhost:8080/set?who=John"
,会返回{"success": true}
。 - 在Dark WebSocket Terminal发送
b
,会收到b@John
。 - 在Dark WebSocket Terminal发送
/disconnect
,会显示连接断开。