From 211e5bac3e12002ad72a6e9eff04815931d4170d Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 12 Nov 2025 23:01:44 +0000 Subject: [PATCH 1/3] Modernize project dependencies and code to work with current Python/Redis versions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This update brings the project from 2020-2021 era dependencies to 2024 standards: Dependencies updated: - Migrated from deprecated aioredis 1.3.1 to redis 5.0.8 with asyncio support - Updated FastAPI from 0.65.2 to 0.115.0 - Updated uvicorn from 0.12.1 to 0.30.6 - Updated jinja2 from 2.11.3 (security vulnerabilities) to 3.1.4 - Updated all other dependencies to latest stable versions - Removed aioredis package (functionality merged into redis) Code changes: - Migrated all Redis operations from aioredis 1.x API to redis.asyncio 5.x API * Changed from create_redis_pool() to from_url() * Updated xread() to use dict-based streams parameter * Updated xadd() parameter names (stream→name, message_id→id) * Updated xrevrange() parameter names (start/stop→min/max) * Changed pool.close() to await pool.aclose() - Replaced deprecated @app.on_event() lifecycle handlers with lifespan context manager - Updated error imports from aioredis.errors to redis.exceptions Infrastructure: - Updated Dockerfile from Python 3.8 to 3.11 - Updated docker-compose.yml to use Redis 7 instead of 6.0-rc2 Testing: Module imports successfully with no syntax errors --- Dockerfile | 2 +- chat.py | 124 +++++++++++++++++++++++++-------------------- docker-compose.yml | 2 +- requirements.txt | 23 ++++----- 4 files changed, 82 insertions(+), 69 deletions(-) diff --git a/Dockerfile b/Dockerfile index 0164e8e..3e88ce1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.8-alpine +FROM python:3.11-alpine WORKDIR /code # ENV FLASK_APP app.py RUN apk add --no-cache gcc musl-dev linux-headers make python3-dev openssl-dev libffi-dev git diff --git a/chat.py b/chat.py index 52ca252..923819f 100644 --- a/chat.py +++ b/chat.py @@ -1,10 +1,11 @@ import os import asyncio -import aioredis +from redis import asyncio as aioredis import uvloop import socket import uuid import contextvars +from contextlib import asynccontextmanager from fastapi import FastAPI, Depends, Request from starlette.staticfiles import StaticFiles from starlette.templating import Jinja2Templates @@ -12,7 +13,7 @@ from starlette.websockets import WebSocket, WebSocketDisconnect from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK -from aioredis.errors import ConnectionClosedError as ServerConnectionClosedError +from redis.exceptions import ConnectionError as ServerConnectionClosedError REDIS_HOST = 'localhost' REDIS_PORT = 6379 @@ -43,7 +44,32 @@ async def dispatch(self, request, call_next): asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) -app = FastAPI() + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + try: + redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}" + pool = await aioredis.from_url( + redis_url, + encoding='utf-8', + decode_responses=True, + max_connections=20 + ) + cvar_redis.set(pool) + print("Connected to Redis on ", REDIS_HOST, REDIS_PORT) + except ConnectionRefusedError as e: + print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) + + yield + + # Shutdown + redis = cvar_redis.get() + if redis: + await redis.aclose() + print("closed connection Redis on ", REDIS_HOST, REDIS_PORT) + +app = FastAPI(lifespan=lifespan) app.add_middleware(CustomHeaderMiddleware) templates = Jinja2Templates(directory="templates") @@ -73,8 +99,12 @@ def get_local_ip(): async def get_redis_pool(): try: - pool = await aioredis.create_redis_pool( - (REDIS_HOST, REDIS_PORT), encoding='utf-8') + redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}" + pool = await aioredis.from_url( + redis_url, + encoding='utf-8', + decode_responses=True + ) return pool except ConnectionRefusedError as e: print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) @@ -97,21 +127,20 @@ async def ws_send_moderator(websocket: WebSocket, chat_info: dict): """ pool = await get_redis_pool() streams = chat_info['room'].split(',') - latest_ids = ['$' for i in streams] + latest_ids = {stream: '$' for stream in streams} ws_connected = True print(streams, latest_ids) while pool and ws_connected: try: events = await pool.xread( - streams=streams, + streams=latest_ids, count=XREAD_COUNT, - timeout=XREAD_TIMEOUT, - latest_ids=latest_ids + block=XREAD_TIMEOUT if XREAD_TIMEOUT > 0 else None ) - for _, e_id, e in events: - e['e_id'] = e_id - await websocket.send_json(e) - #latest_ids = [e_id] + for stream, messages in events: + for e_id, e in messages: + e['e_id'] = e_id + await websocket.send_json(e) except ConnectionClosedError: ws_connected = False @@ -130,7 +159,8 @@ async def ws_send(websocket: WebSocket, chat_info: dict): :type chat_info: """ pool = await get_redis_pool() - latest_ids = ['$'] + stream_key = cvar_tenant.get() + ":stream" + latest_ids = {stream_key: '$'} ws_connected = True first_run = True while pool and ws_connected: @@ -138,10 +168,10 @@ async def ws_send(websocket: WebSocket, chat_info: dict): if first_run: # fetch some previous chat history events = await pool.xrevrange( - stream=cvar_tenant.get() + ":stream", + name=stream_key, count=NUM_PREVIOUS, - start='+', - stop='-' + min='-', + max='+' ) first_run = False events.reverse() @@ -150,15 +180,15 @@ async def ws_send(websocket: WebSocket, chat_info: dict): await websocket.send_json(e) else: events = await pool.xread( - streams=[cvar_tenant.get() + ":stream"], + streams=latest_ids, count=XREAD_COUNT, - timeout=XREAD_TIMEOUT, - latest_ids=latest_ids + block=XREAD_TIMEOUT if XREAD_TIMEOUT > 0 else None ) - for _, e_id, e in events: - e['e_id'] = e_id - await websocket.send_json(e) - latest_ids = [e_id] + for stream, messages in events: + for e_id, e in messages: + e['e_id'] = e_id + await websocket.send_json(e) + latest_ids = {stream_key: e_id} #print('################contextvar ', cvar_tenant.get()) except ConnectionClosedError: ws_connected = False @@ -169,7 +199,7 @@ async def ws_send(websocket: WebSocket, chat_info: dict): except ServerConnectionClosedError: print('redis server connection closed') return - pool.close() + await pool.aclose() async def ws_recieve(websocket: WebSocket, chat_info: dict): @@ -205,10 +235,12 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict): 'type': 'comment', 'room': chat_info['room'] } - await pool.xadd(stream=cvar_tenant.get() + ":stream", - fields=fields, - message_id=b'*', - max_len=STREAM_MAX_LEN) + await pool.xadd( + name=cvar_tenant.get() + ":stream", + fields=fields, + id='*', + maxlen=STREAM_MAX_LEN + ) #print('################contextvar ', cvar_tenant.get()) except WebSocketDisconnect: await remove_room_user(chat_info, pool) @@ -223,7 +255,7 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict): print('redis server connection closed') return - pool.close() + await pool.aclose() async def add_room_user(chat_info: dict, pool): @@ -259,10 +291,12 @@ async def announce(pool, chat_info: dict, action: str): } #print(fields) - await pool.xadd(stream=cvar_tenant.get() + ":stream", - fields=fields, - message_id=b'*', - max_len=STREAM_MAX_LEN) + await pool.xadd( + name=cvar_tenant.get() + ":stream", + fields=fields, + id='*', + maxlen=STREAM_MAX_LEN + ) async def chat_info_vars(username: str = None, room: str = None): @@ -355,30 +389,10 @@ async def verify_user_for_room(chat_info): # whitelist rooms if not chat_info['room'] in ALLOWED_ROOMS: verified = False - pool.close() + await pool.aclose() return verified -@app.on_event("startup") -async def handle_startup(): - try: - pool = await aioredis.create_redis_pool( - (REDIS_HOST, REDIS_PORT), encoding='utf-8', maxsize=20) - cvar_redis.set(pool) - print("Connected to Redis on ", REDIS_HOST, REDIS_PORT) - except ConnectionRefusedError as e: - print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT) - return - - -@app.on_event("shutdown") -async def handle_shutdown(): - redis = cvar_redis.get() - redis.close() - await redis.wait_closed() - print("closed connection Redis on ", REDIS_HOST, REDIS_PORT) - - if __name__ == "__main__": import uvicorn print(dir(app)) diff --git a/docker-compose.yml b/docker-compose.yml index b5d7c42..7ee2414 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,6 @@ services: links: - "redis" redis: - image: "redis:6.0-rc2-alpine3.11" + image: "redis:7-alpine" ports: - 6379:6379 diff --git a/requirements.txt b/requirements.txt index 5ba3717..3d0d4f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,11 @@ -uvicorn==0.12.1 -websockets==9.1 -fastapi==0.65.2 -aioredis==1.3.1 -redis==4.5.4 -uvloop==0.15.2 -jinja2==2.11.3 -aiofiles==0.6.0 -httpx==0.23.0 -itsdangerous==1.1.0 -databases[sqlite]==0.4.3 -sqlalchemy==1.3.0 \ No newline at end of file +uvicorn==0.30.6 +websockets==12.0 +fastapi==0.115.0 +redis==5.0.8 +uvloop==0.20.0 +jinja2==3.1.4 +aiofiles==24.1.0 +httpx==0.27.2 +itsdangerous==2.2.0 +databases[sqlite]==0.9.0 +sqlalchemy==2.0.35 \ No newline at end of file From 7e283e6d926df0819ae76239c0763240f88d6196 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 12 Nov 2025 23:29:59 +0000 Subject: [PATCH 2/3] Fix Docker networking: use environment variables for Redis connection The application was hardcoded to connect to localhost:6379, which doesn't work in Docker Compose where Redis runs in a separate container. Changes: - Updated chat.py to read REDIS_HOST and REDIS_PORT from environment variables * Defaults to localhost:6379 for local development * Can be overridden for Docker or other deployment scenarios - Updated docker-compose.yml to set REDIS_HOST=redis for container networking - Replaced deprecated 'links' with 'depends_on' in docker-compose.yml - Updated README.md with: * Modern Python 3.11+ and Redis 7+ requirements * Separate sections for local development and Docker Compose usage * Documentation of environment variables This fixes the "Connection refused" error when running via docker-compose. --- README.md | 32 ++++++++++++++++++++++++++++---- chat.py | 4 ++-- docker-compose.yml | 7 +++++-- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 7b5aea2..a68ad18 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,48 @@ # redis-streams-fastapi-chat A simple demo of Redis Streams backed Chat app using Websockets, Python Asyncio and FastAPI/Starlette. -Requires Python version >= 3.6 and Redis +Requires Python version >= 3.11 and Redis 7+ # Overview This project has been created to help understand some related concepts. Python standard library asyncio, websockets (which are often cited as a classic use case for async python code), also Redis Streams. It is very much inteded to be an intentionally simple starting point rather than a usable product as is. # Installation +## Local Development + ```shell $ pip install -r requirements.txt ``` +Make sure you have Redis running locally: +```shell +$ redis-server +``` + # Usage +## Local Development + ```shell $ python chat.py ``` -# Docker compose -If you don't have redis installed you can use the docker-compose.yml file to set up a -working environment. +Then open http://localhost:9080 in your browser. + +## Docker Compose + +The easiest way to run the application with all dependencies: + +```shell +$ docker-compose up +``` + +This will start both the chat application and Redis in containers. The app will be available at http://localhost:9080 + +## Environment Variables + +The following environment variables can be configured: + +- `REDIS_HOST` - Redis server hostname (default: `localhost`, set to `redis` in Docker) +- `REDIS_PORT` - Redis server port (default: `6379`) diff --git a/chat.py b/chat.py index 923819f..2a36591 100644 --- a/chat.py +++ b/chat.py @@ -15,8 +15,8 @@ from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK from redis.exceptions import ConnectionError as ServerConnectionClosedError -REDIS_HOST = 'localhost' -REDIS_PORT = 6379 +REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') +REDIS_PORT = int(os.getenv('REDIS_PORT', '6379')) XREAD_TIMEOUT = 0 XREAD_COUNT = 100 NUM_PREVIOUS = 30 diff --git a/docker-compose.yml b/docker-compose.yml index 7ee2414..61f64f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,8 +8,11 @@ services: - "8082:8082" volumes: - .:/code - links: - - "redis" + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + depends_on: + - redis redis: image: "redis:7-alpine" ports: From a8c8c0e967397e947ee737b0fd1f4d32a0826360 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 12 Nov 2025 23:43:13 +0000 Subject: [PATCH 3/3] Fix real-time message updates: use blocking Redis XREAD The chat message list wasn't updating in real-time because XREAD was using non-blocking mode, causing it to return immediately without waiting for new messages. Root cause: - XREAD_TIMEOUT = 0 was converted to block=None (non-blocking) - In old aioredis: timeout=0 meant "block forever" - In new redis library: block=None means "non-blocking" (return immediately) - This caused XREAD to constantly poll without waiting for messages Fix: - Changed to block=5000 (5 seconds) for proper blocking behavior - XREAD now waits up to 5 seconds for new messages before returning - Messages now appear in real-time for all connected users Tested: - Users connecting to the same room see each other's messages immediately - Message list updates within milliseconds of sending - No more need to refresh or reconnect to see new messages --- chat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chat.py b/chat.py index 2a36591..7aa211d 100644 --- a/chat.py +++ b/chat.py @@ -135,7 +135,7 @@ async def ws_send_moderator(websocket: WebSocket, chat_info: dict): events = await pool.xread( streams=latest_ids, count=XREAD_COUNT, - block=XREAD_TIMEOUT if XREAD_TIMEOUT > 0 else None + block=5000 # Block for 5 seconds waiting for new messages ) for stream, messages in events: for e_id, e in messages: @@ -182,7 +182,7 @@ async def ws_send(websocket: WebSocket, chat_info: dict): events = await pool.xread( streams=latest_ids, count=XREAD_COUNT, - block=XREAD_TIMEOUT if XREAD_TIMEOUT > 0 else None + block=5000 # Block for 5 seconds waiting for new messages ) for stream, messages in events: for e_id, e in messages: