# -*- coding: utf-8 -*- """ A single-client WebSocket and HTTP server for streaming screen captures. This script is a demonstration of the pixelflux library. It captures the screen and sends it to a single connected WebSocket client. All capture settings can be configured in the 'CONFIGURATION SETTINGS' block below. """ # Standard library imports import asyncio import http.server import os import socketserver import threading import websockets # Third-party library imports from pixelflux import CaptureSettings, ScreenCapture, create_memoryview_from_result, free_stripe_encode_result_data # ============================================================================== # --- CONFIGURATION SETTINGS --- # Modify the parameters below to test different capture and encoding options. # ============================================================================== HTTP_PORT = 9001 WS_PORT = 9000 capture_settings = CaptureSettings() # --- Core Capture --- capture_settings.capture_width = 1920 capture_settings.capture_height = 1080 capture_settings.capture_x = 0 capture_settings.capture_y = 0 capture_settings.target_fps = 60.0 capture_settings.capture_cursor = False # --- Encoding Mode --- # Sets the output codec. 0 for JPEG, 1 for H.264. capture_settings.output_mode = 1 # --- H.264 Quality Settings --- # Constant Rate Factor (0-51, lower is better quality & higher bitrate). # Good values are typically 18-28. capture_settings.h264_crf = 25 # Use I444 (full color) instead of I420. Better quality, higher CPU/bandwidth. capture_settings.h264_fullcolor = False # Encode full frames instead of just changed stripes. capture_settings.h264_fullframe = False # Pass a vaapi node index 0 = renderD128, -1 to disable capture_settings.vaapi_render_node_index = -1 # --- Change Detection & Optimization --- # Use a higher quality setting for static regions that haven't changed for a while. capture_settings.use_paint_over_quality = True # Number of frames of no motion in a stripe to trigger a high-quality "paint-over". capture_settings.paint_over_trigger_frames = 15 # Consecutive changes to a stripe to trigger a "damaged" state (uses base quality). capture_settings.damage_block_threshold = 10 # Number of frames a stripe stays "damaged" after being triggered. capture_settings.damage_block_duration = 30 # --- Watermarking --- # The path MUST be a byte string (b"") and point to a valid PNG file. #capture_settings.watermark_path = b"/path/to/image.png" # Sets the watermark location on the screen. Default is 0 (disabled). # Options: 0:None, 1:TopLeft, 2:TopRight, 3:BottomLeft, 4:BottomRight, 5:Middle, 6:Animated capture_settings.watermark_location_enum = 0 # ============================================================================== # --- Global State --- # ============================================================================== g_loop = None # The main asyncio event loop. g_module = None # The ScreenCapture instance. g_active_client = None # Holds the single active WebSocket client. g_is_capturing = False # Flag indicating if capture is active. g_h264_stripe_queue = None # asyncio.Queue for H.264 stripes. g_send_task = None # asyncio.Task for sending stripes. g_is_shutting_down = False async def cleanup(): """A single, race-proof function to shut down all capture resources.""" global g_is_shutting_down, g_is_capturing, g_module, g_send_task, g_active_client if g_is_shutting_down: return g_is_shutting_down = True print("Cleanup initiated...") if g_is_capturing: g_is_capturing = False if g_module: loop = asyncio.get_running_loop() await loop.run_in_executor(None, g_module.stop_capture) if g_send_task and not g_send_task.done(): g_send_task.cancel() try: await g_send_task except asyncio.CancelledError: pass g_active_client = None g_is_shutting_down = False print("Cleanup complete.") async def send_h264_stripes(): """Retrieves H.264 stripes from the queue and sends them to the active client.""" global g_h264_stripe_queue, g_active_client try: while True: mv, result_ptr = await g_h264_stripe_queue.get() if g_active_client: try: # Send memoryview directly for zero-copy await g_active_client.send(mv) except websockets.exceptions.ConnectionClosed: pass # Client disconnected, main handler will clean up. finally: # Manually free buffer after sending free_stripe_encode_result_data(result_ptr) else: free_stripe_encode_result_data(result_ptr) g_h264_stripe_queue.task_done() except asyncio.CancelledError: pass # Expected way for the task to be stopped. except Exception as e: print(f"[ERROR] An unexpected error occurred in the send task: {e}") finally: print("Stripe sending task has stopped.") async def websocket_handler(websocket, path=None): """Manages a single WebSocket connection and the screen capture lifecycle.""" global g_active_client, g_is_capturing, g_h264_stripe_queue, g_module, g_send_task if g_active_client is not None: print("Rejecting new connection: A client is already active.") await websocket.close(code=1013, reason="Server is busy with another client.") return g_active_client = websocket print("Client connected. Starting screen capture...") try: g_h264_stripe_queue = asyncio.Queue(maxsize=120) g_module.start_capture(capture_settings, stripe_callback_handler) g_is_capturing = True g_send_task = asyncio.create_task(send_h264_stripes()) print("Screen capture and stream started.") async for _ in websocket: pass except websockets.exceptions.ConnectionClosed: print("Client disconnected normally.") except Exception as e: print(f"[ERROR] WebSocket handler error: {e}") finally: if websocket is g_active_client: await cleanup() def stripe_callback_handler(result_ptr, user_data_ptr): """Callback invoked by pixelflux when a new video stripe is ready.""" if g_is_capturing and result_ptr and g_h264_stripe_queue is not None: # Get memoryview for zero-copy transfer mv = create_memoryview_from_result(result_ptr) if mv is None or mv.nbytes == 0: return if g_loop and not g_loop.is_closed(): # Put both memoryview and result_ptr for later freeing asyncio.run_coroutine_threadsafe( g_h264_stripe_queue.put((mv, result_ptr)), g_loop ) def start_http_server(host, port): """Starts a simple HTTP server in a separate thread to serve client files.""" script_dir = os.path.dirname(os.path.abspath(__file__)) class QuietHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, directory=script_dir, **kwargs) def log_message(self, format, *args): pass class ReuseAddressTCPServer(socketserver.TCPServer): allow_reuse_address = True with ReuseAddressTCPServer((host, port), QuietHTTPRequestHandler) as httpd: print(f"HTTP server is serving files from '{script_dir}'") print(f"-> Open http://{host}:{port}/index.html in your browser.") httpd.serve_forever() async def main(): """Initializes resources and starts the WebSocket and HTTP servers.""" global g_loop, g_module, g_is_capturing, g_send_task g_loop = asyncio.get_running_loop() g_module = ScreenCapture() if not g_module: print("[FATAL] Failed to initialize pixelflux ScreenCapture module.") return print("Pixelflux capture module initialized.") http_thread = threading.Thread( target=start_http_server, args=('localhost', HTTP_PORT), daemon=True ) http_thread.start() ws_server = None try: ws_server = await websockets.serve( websocket_handler, 'localhost', WS_PORT, compression=None ) print(f"WebSocket server started on ws://localhost:{WS_PORT}") print("Waiting for a client connection... Press Ctrl+C to stop.") await asyncio.Event().wait() except OSError as e: print(f"[FATAL] Could not start server (is port {WS_PORT} in use?): {e}") except KeyboardInterrupt: print("\nShutdown signal received.") finally: await cleanup() if ws_server: ws_server.close() await ws_server.wait_closed() print("Cleanup complete.") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nApplication exiting.")