rommapp_romm/backend/handler/socket_handler.py
2025-10-03 10:24:48 -04:00

85 lines
2.8 KiB
Python

import socketio # type: ignore
from config import REDIS_URL
from utils import json as json_module
class SocketHandler:
def __init__(self) -> None:
self.socket_server = socketio.AsyncServer(
cors_allowed_origins="*",
async_mode="asgi",
json=json_module,
logger=False,
engineio_logger=False,
client_manager=socketio.AsyncRedisManager(str(REDIS_URL)),
ping_timeout=60,
ping_interval=25,
max_http_buffer_size=1e6, # 1MB
cors_credentials=True,
)
self.socket_app = socketio.ASGIApp(
self.socket_server, socketio_path="/ws/socket.io"
)
# Register connection handlers
self.socket_server.on("connect")(self.on_connect) # type: ignore
self.socket_server.on("disconnect")(self.on_disconnect) # type: ignore
async def on_connect(
self, sid: str, environ: dict, auth: dict | None = None
) -> None:
"""Handle socket connection and store session data."""
# Extract session data from the request environment
session_data = {}
# Create a mock scope for session extraction
scope = {
"type": "websocket",
"headers": [
(k.encode("latin-1"), v.encode("latin-1"))
for k, v in environ.items()
if k.startswith("HTTP_")
],
}
# Try to extract session using the session middleware
try:
# Extract session from cookies
from starlette.requests import HTTPConnection
connection = HTTPConnection(scope)
# Manually extract session cookie
if "HTTP_COOKIE" in environ:
cookies = environ["HTTP_COOKIE"]
for cookie in cookies.split(";"):
if "romm_session=" in cookie:
cookie_value = (
cookie.strip().split("romm_session=")[1].split(";")[0]
)
# Store the raw cookie value for later JWT decoding
session_data["session_cookie"] = cookie_value
break
# Store the parsed session data
session_data["session"] = connection.session
session_data["headers"] = scope["headers"]
except Exception:
# If session extraction fails, store empty session
session_data["session"] = {}
session_data["headers"] = scope["headers"]
# Store session data in the socket manager
await self.socket_server.save_session(sid, session_data)
async def on_disconnect(self, sid: str) -> None:
"""Handle socket disconnection."""
# Clean up session data
await self.socket_server.save_session(sid, {})
socket_handler = SocketHandler()