diff --git a/Dockerfile b/Dockerfile index 0164e8e..3e88ce1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.8-alpine +FROM python:3.11-alpine WORKDIR /code # ENV FLASK_APP app.py RUN apk add --no-cache gcc musl-dev linux-headers make python3-dev openssl-dev libffi-dev git diff --git a/README.md b/README.md index 7b5aea2..a68ad18 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,48 @@ # redis-streams-fastapi-chat A simple demo of Redis Streams backed Chat app using Websockets, Python Asyncio and FastAPI/Starlette. -Requires Python version >= 3.6 and Redis +Requires Python version >= 3.11 and Redis 7+ # Overview This project has been created to help understand some related concepts. Python standard library asyncio, websockets (which are often cited as a classic use case for async python code), also Redis Streams. It is very much inteded to be an intentionally simple starting point rather than a usable product as is. # Installation +## Local Development + ```shell $ pip install -r requirements.txt ``` +Make sure you have Redis running locally: +```shell +$ redis-server +``` + # Usage +## Local Development + ```shell $ python chat.py ``` -# Docker compose -If you don't have redis installed you can use the docker-compose.yml file to set up a -working environment. +Then open http://localhost:9080 in your browser. + +## Docker Compose + +The easiest way to run the application with all dependencies: + +```shell +$ docker-compose up +``` + +This will start both the chat application and Redis in containers. The app will be available at http://localhost:9080 + +## Environment Variables + +The following environment variables can be configured: + +- `REDIS_HOST` - Redis server hostname (default: `localhost`, set to `redis` in Docker) +- `REDIS_PORT` - Redis server port (default: `6379`) diff --git a/chat.py b/chat.py index 52ca252..7aa211d 100644 --- a/chat.py +++ b/chat.py @@ -1,10 +1,11 @@ import os import asyncio -import aioredis +from redis import asyncio as aioredis import uvloop import socket import uuid import contextvars +from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, Request from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates @@ -12,10 +13,10 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK -from aioredis.errors import ConnectionClosedError as ServerConnectionClosedError +from redis.exceptions import ConnectionError as ServerConnectionClosedError -REDIS_HOST = 'localhost' -REDIS_PORT = 6379 +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PORT = int(os.getenv('REDIS_PORT', '6379')) XREAD_TIMEOUT = 0 XREAD_COUNT = 100 NUM_PREVIOUS = 30 @@ -43,7 +44,32 @@ async def dispatch(self, request, call_next): asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -app = FastAPI() + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + try: + redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}" + pool = await aioredis.from_url( + redis_url, + encoding='utf-8', + decode_responses=True, + max_connections=20 + ) + cvar_redis.set(pool) + print("Connected to Redis on ", REDIS_HOST, REDIS_PORT) + except ConnectionRefusedError as e: + print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) + + yield + + # Shutdown + redis = cvar_redis.get() + if redis: + await redis.aclose() + print("closed connection Redis on ", REDIS_HOST, REDIS_PORT) + +app = FastAPI(lifespan=lifespan) app.add_middleware(CustomHeaderMiddleware) templates = Jinja2Templates(directory="templates") @@ -73,8 +99,12 @@ def get_local_ip(): async def get_redis_pool(): try: - pool = await aioredis.create_redis_pool( - (REDIS_HOST, REDIS_PORT), encoding='utf-8') + redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}" + pool = await aioredis.from_url( + redis_url, + encoding='utf-8', + decode_responses=True + ) return pool except ConnectionRefusedError as e: print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) @@ -97,21 +127,20 @@ async def ws_send_moderator(websocket: WebSocket, chat_info: dict): """ pool = await get_redis_pool() streams = chat_info['room'].split(',') - latest_ids = ['$' for i in streams] + latest_ids = {stream: '$' for stream in streams} ws_connected = True print(streams, latest_ids) while pool and ws_connected: try: events = await pool.xread( - streams=streams, + streams=latest_ids, count=XREAD_COUNT, - timeout=XREAD_TIMEOUT, - latest_ids=latest_ids + block=5000 # Block for 5 seconds waiting for new messages ) - for _, e_id, e in events: - e['e_id'] = e_id - await websocket.send_json(e) - #latest_ids = [e_id] + for stream, messages in events: + for e_id, e in messages: + e['e_id'] = e_id + await websocket.send_json(e) except ConnectionClosedError: ws_connected = False @@ -130,7 +159,8 @@ async def ws_send(websocket: WebSocket, chat_info: dict): :type chat_info: """ pool = await get_redis_pool() - latest_ids = ['$'] + stream_key = cvar_tenant.get() + ":stream" + latest_ids = {stream_key: '$'} ws_connected = True first_run = True while pool and ws_connected: @@ -138,10 +168,10 @@ async def ws_send(websocket: WebSocket, chat_info: dict): if first_run: # fetch some previous chat history events = await pool.xrevrange( - stream=cvar_tenant.get() + ":stream", + name=stream_key, count=NUM_PREVIOUS, - start='+', - stop='-' + min='-', + max='+' ) first_run = False events.reverse() @@ -150,15 +180,15 @@ async def ws_send(websocket: WebSocket, chat_info: dict): await websocket.send_json(e) else: events = await pool.xread( - streams=[cvar_tenant.get() + ":stream"], + streams=latest_ids, count=XREAD_COUNT, - timeout=XREAD_TIMEOUT, - latest_ids=latest_ids + block=5000 # Block for 5 seconds waiting for new messages ) - for _, e_id, e in events: - e['e_id'] = e_id - await websocket.send_json(e) - latest_ids = [e_id] + for stream, messages in events: + for e_id, e in messages: + e['e_id'] = e_id + await websocket.send_json(e) + latest_ids = {stream_key: e_id} #print('################contextvar ', cvar_tenant.get()) except ConnectionClosedError: ws_connected = False @@ -169,7 +199,7 @@ async def ws_send(websocket: WebSocket, chat_info: dict): except ServerConnectionClosedError: print('redis server connection closed') return - pool.close() + await pool.aclose() async def ws_recieve(websocket: WebSocket, chat_info: dict): @@ -205,10 +235,12 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict): 'type': 'comment', 'room': chat_info['room'] } - await pool.xadd(stream=cvar_tenant.get() + ":stream", - fields=fields, - message_id=b'*', - max_len=STREAM_MAX_LEN) + await pool.xadd( + name=cvar_tenant.get() + ":stream", + fields=fields, + id='*', + maxlen=STREAM_MAX_LEN + ) #print('################contextvar ', cvar_tenant.get()) except WebSocketDisconnect: await remove_room_user(chat_info, pool) @@ -223,7 +255,7 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict): print('redis server connection closed') return - pool.close() + await pool.aclose() async def add_room_user(chat_info: dict, pool): @@ -259,10 +291,12 @@ async def announce(pool, chat_info: dict, action: str): } #print(fields) - await pool.xadd(stream=cvar_tenant.get() + ":stream", - fields=fields, - message_id=b'*', - max_len=STREAM_MAX_LEN) + await pool.xadd( + name=cvar_tenant.get() + ":stream", + fields=fields, + id='*', + maxlen=STREAM_MAX_LEN + ) async def chat_info_vars(username: str = None, room: str = None): @@ -355,30 +389,10 @@ async def verify_user_for_room(chat_info): # whitelist rooms if not chat_info['room'] in ALLOWED_ROOMS: verified = False - pool.close() + await pool.aclose() return verified -@app.on_event("startup") -async def handle_startup(): - try: - pool = await aioredis.create_redis_pool( - (REDIS_HOST, REDIS_PORT), encoding='utf-8', maxsize=20) - cvar_redis.set(pool) - print("Connected to Redis on ", REDIS_HOST, REDIS_PORT) - except ConnectionRefusedError as e: - print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) - return - - -@app.on_event("shutdown") -async def handle_shutdown(): - redis = cvar_redis.get() - redis.close() - await redis.wait_closed() - print("closed connection Redis on ", REDIS_HOST, REDIS_PORT) - - if __name__ == "__main__": import uvicorn print(dir(app)) diff --git a/docker-compose.yml b/docker-compose.yml index b5d7c42..61f64f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,9 +8,12 @@ services: - "8082:8082" volumes: - .:/code - links: - - "redis" + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis redis: - image: "redis:6.0-rc2-alpine3.11" + image: "redis:7-alpine" ports: - 6379:6379 diff --git a/requirements.txt b/requirements.txt index 5ba3717..3d0d4f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,11 @@ -uvicorn==0.12.1 -websockets==9.1 -fastapi==0.65.2 -aioredis==1.3.1 -redis==4.5.4 -uvloop==0.15.2 -jinja2==2.11.3 -aiofiles==0.6.0 -httpx==0.23.0 -itsdangerous==1.1.0 -databases[sqlite]==0.4.3 -sqlalchemy==1.3.0 \ No newline at end of file +uvicorn==0.30.6 +websockets==12.0 +fastapi==0.115.0 +redis==5.0.8 +uvloop==0.20.0 +jinja2==3.1.4 +aiofiles==24.1.0 +httpx==0.27.2 +itsdangerous==2.2.0 +databases[sqlite]==0.9.0 +sqlalchemy==2.0.35 \ No newline at end of file