mirror of
https://github.com/linuxserver/pixelflux.git
synced 2026-02-19 16:27:45 +08:00
231 lines
8.8 KiB
Python
231 lines
8.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
A single-client WebSocket and HTTP server for streaming screen captures.
|
|
|
|
This script is a demonstration of the pixelflux library. It captures the screen
|
|
and sends it to a single connected WebSocket client. All capture settings can be
|
|
configured in the 'CONFIGURATION SETTINGS' block below.
|
|
"""
|
|
|
|
# Standard library imports
|
|
import asyncio
|
|
import http.server
|
|
import os
|
|
import socketserver
|
|
import threading
|
|
import websockets
|
|
|
|
# Third-party library imports
|
|
from pixelflux import CaptureSettings, ScreenCapture, create_memoryview_from_result, free_stripe_encode_result_data
|
|
|
|
# ==============================================================================
|
|
# --- CONFIGURATION SETTINGS ---
|
|
# Modify the parameters below to test different capture and encoding options.
|
|
# ==============================================================================
|
|
HTTP_PORT = 9001
|
|
WS_PORT = 9000
|
|
|
|
capture_settings = CaptureSettings()
|
|
|
|
# --- Core Capture ---
|
|
capture_settings.capture_width = 1920
|
|
capture_settings.capture_height = 1080
|
|
capture_settings.capture_x = 0
|
|
capture_settings.capture_y = 0
|
|
capture_settings.target_fps = 60.0
|
|
capture_settings.capture_cursor = False
|
|
|
|
# --- Encoding Mode ---
|
|
# Sets the output codec. 0 for JPEG, 1 for H.264.
|
|
capture_settings.output_mode = 1
|
|
|
|
# --- H.264 Quality Settings ---
|
|
# Constant Rate Factor (0-51, lower is better quality & higher bitrate).
|
|
# Good values are typically 18-28.
|
|
capture_settings.h264_crf = 25
|
|
# Use I444 (full color) instead of I420. Better quality, higher CPU/bandwidth.
|
|
capture_settings.h264_fullcolor = False
|
|
# Encode full frames instead of just changed stripes.
|
|
capture_settings.h264_fullframe = False
|
|
# Pass a vaapi node index 0 = renderD128, -1 to disable
|
|
capture_settings.vaapi_render_node_index = -1
|
|
|
|
# --- Change Detection & Optimization ---
|
|
# Use a higher quality setting for static regions that haven't changed for a while.
|
|
capture_settings.use_paint_over_quality = True
|
|
# Number of frames of no motion in a stripe to trigger a high-quality "paint-over".
|
|
capture_settings.paint_over_trigger_frames = 15
|
|
# Consecutive changes to a stripe to trigger a "damaged" state (uses base quality).
|
|
capture_settings.damage_block_threshold = 10
|
|
# Number of frames a stripe stays "damaged" after being triggered.
|
|
capture_settings.damage_block_duration = 30
|
|
|
|
# --- Watermarking ---
|
|
# The path MUST be a byte string (b"") and point to a valid PNG file.
|
|
#capture_settings.watermark_path = b"/path/to/image.png"
|
|
|
|
# Sets the watermark location on the screen. Default is 0 (disabled).
|
|
# Options: 0:None, 1:TopLeft, 2:TopRight, 3:BottomLeft, 4:BottomRight, 5:Middle, 6:Animated
|
|
capture_settings.watermark_location_enum = 0
|
|
|
|
# ==============================================================================
|
|
# --- Global State ---
|
|
# ==============================================================================
|
|
g_loop = None # The main asyncio event loop.
|
|
g_module = None # The ScreenCapture instance.
|
|
g_active_client = None # Holds the single active WebSocket client.
|
|
g_is_capturing = False # Flag indicating if capture is active.
|
|
g_h264_stripe_queue = None # asyncio.Queue for H.264 stripes.
|
|
g_send_task = None # asyncio.Task for sending stripes.
|
|
|
|
g_is_shutting_down = False
|
|
async def cleanup():
|
|
"""A single, race-proof function to shut down all capture resources."""
|
|
global g_is_shutting_down, g_is_capturing, g_module, g_send_task, g_active_client
|
|
if g_is_shutting_down:
|
|
return
|
|
g_is_shutting_down = True
|
|
print("Cleanup initiated...")
|
|
if g_is_capturing:
|
|
g_is_capturing = False
|
|
if g_module:
|
|
loop = asyncio.get_running_loop()
|
|
await loop.run_in_executor(None, g_module.stop_capture)
|
|
if g_send_task and not g_send_task.done():
|
|
g_send_task.cancel()
|
|
try:
|
|
await g_send_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
g_active_client = None
|
|
g_is_shutting_down = False
|
|
print("Cleanup complete.")
|
|
|
|
async def send_h264_stripes():
|
|
"""Retrieves H.264 stripes from the queue and sends them to the active client."""
|
|
global g_h264_stripe_queue, g_active_client
|
|
try:
|
|
while True:
|
|
mv, result_ptr = await g_h264_stripe_queue.get()
|
|
if g_active_client:
|
|
try:
|
|
# Send memoryview directly for zero-copy
|
|
await g_active_client.send(mv)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass # Client disconnected, main handler will clean up.
|
|
finally:
|
|
# Manually free buffer after sending
|
|
free_stripe_encode_result_data(result_ptr)
|
|
else:
|
|
free_stripe_encode_result_data(result_ptr)
|
|
g_h264_stripe_queue.task_done()
|
|
except asyncio.CancelledError:
|
|
pass # Expected way for the task to be stopped.
|
|
except Exception as e:
|
|
print(f"[ERROR] An unexpected error occurred in the send task: {e}")
|
|
finally:
|
|
print("Stripe sending task has stopped.")
|
|
|
|
|
|
async def websocket_handler(websocket, path=None):
|
|
"""Manages a single WebSocket connection and the screen capture lifecycle."""
|
|
global g_active_client, g_is_capturing, g_h264_stripe_queue, g_module, g_send_task
|
|
|
|
if g_active_client is not None:
|
|
print("Rejecting new connection: A client is already active.")
|
|
await websocket.close(code=1013, reason="Server is busy with another client.")
|
|
return
|
|
|
|
g_active_client = websocket
|
|
print("Client connected. Starting screen capture...")
|
|
|
|
try:
|
|
g_h264_stripe_queue = asyncio.Queue(maxsize=120)
|
|
g_module.start_capture(capture_settings, stripe_callback_handler)
|
|
g_is_capturing = True
|
|
g_send_task = asyncio.create_task(send_h264_stripes())
|
|
print("Screen capture and stream started.")
|
|
|
|
async for _ in websocket:
|
|
pass
|
|
except websockets.exceptions.ConnectionClosed:
|
|
print("Client disconnected normally.")
|
|
except Exception as e:
|
|
print(f"[ERROR] WebSocket handler error: {e}")
|
|
finally:
|
|
if websocket is g_active_client:
|
|
await cleanup()
|
|
|
|
def stripe_callback_handler(result_ptr, user_data_ptr):
|
|
"""Callback invoked by pixelflux when a new video stripe is ready."""
|
|
if g_is_capturing and result_ptr and g_h264_stripe_queue is not None:
|
|
# Get memoryview for zero-copy transfer
|
|
mv = create_memoryview_from_result(result_ptr)
|
|
if mv is None or mv.nbytes == 0:
|
|
return
|
|
|
|
if g_loop and not g_loop.is_closed():
|
|
# Put both memoryview and result_ptr for later freeing
|
|
asyncio.run_coroutine_threadsafe(
|
|
g_h264_stripe_queue.put((mv, result_ptr)), g_loop
|
|
)
|
|
|
|
|
|
def start_http_server(host, port):
|
|
"""Starts a simple HTTP server in a separate thread to serve client files."""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
class QuietHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, directory=script_dir, **kwargs)
|
|
def log_message(self, format, *args):
|
|
pass
|
|
class ReuseAddressTCPServer(socketserver.TCPServer):
|
|
allow_reuse_address = True
|
|
with ReuseAddressTCPServer((host, port), QuietHTTPRequestHandler) as httpd:
|
|
print(f"HTTP server is serving files from '{script_dir}'")
|
|
print(f"-> Open http://{host}:{port}/index.html in your browser.")
|
|
httpd.serve_forever()
|
|
|
|
|
|
async def main():
|
|
"""Initializes resources and starts the WebSocket and HTTP servers."""
|
|
global g_loop, g_module, g_is_capturing, g_send_task
|
|
|
|
g_loop = asyncio.get_running_loop()
|
|
|
|
g_module = ScreenCapture()
|
|
if not g_module:
|
|
print("[FATAL] Failed to initialize pixelflux ScreenCapture module.")
|
|
return
|
|
print("Pixelflux capture module initialized.")
|
|
|
|
http_thread = threading.Thread(
|
|
target=start_http_server, args=('localhost', HTTP_PORT), daemon=True
|
|
)
|
|
http_thread.start()
|
|
|
|
ws_server = None
|
|
try:
|
|
ws_server = await websockets.serve(
|
|
websocket_handler, 'localhost', WS_PORT, compression=None
|
|
)
|
|
print(f"WebSocket server started on ws://localhost:{WS_PORT}")
|
|
print("Waiting for a client connection... Press Ctrl+C to stop.")
|
|
await asyncio.Event().wait()
|
|
except OSError as e:
|
|
print(f"[FATAL] Could not start server (is port {WS_PORT} in use?): {e}")
|
|
except KeyboardInterrupt:
|
|
print("\nShutdown signal received.")
|
|
finally:
|
|
await cleanup()
|
|
if ws_server:
|
|
ws_server.close()
|
|
await ws_server.wait_closed()
|
|
print("Cleanup complete.")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nApplication exiting.")
|