mirror of
https://github.com/linuxserver/pixelflux.git
synced 2026-03-16 00:06:55 +08:00
271 lines
11 KiB
Python
271 lines
11 KiB
Python
# Standard library imports
|
|
import asyncio
|
|
import ctypes
|
|
import http.server
|
|
import socketserver
|
|
import threading
|
|
import websockets
|
|
import os # For path operations
|
|
|
|
# pixelflux library imports
|
|
from pixelflux import CaptureSettings, ScreenCapture, StripeCallback
|
|
|
|
# --- Global "Context" Variables ---
|
|
# These are effectively global state or shared resources for the demo.
|
|
# Using a more explicit naming convention like 'g_' for clarity.
|
|
g_loop = None # The asyncio event loop, set in main()
|
|
g_capture_settings = None # Capture settings, configured in main()
|
|
g_stripe_callback = None # CTypes callback, created in main()
|
|
g_module = None # ScreenCapture instance, created in main()
|
|
|
|
g_clients = set() # Set of connected WebSocket clients
|
|
g_is_capturing = False # Flag indicating if capture is active
|
|
g_h264_stripe_queue = None # asyncio.Queue for H.264 stripes, created when capture starts
|
|
g_send_task = None # asyncio.Task for broadcasting stripes
|
|
# --- End Global Variables ---
|
|
|
|
|
|
async def send_h264_stripes():
|
|
"""
|
|
Continuously retrieves H.264 stripe data from the global queue
|
|
and broadcasts it to all connected WebSocket clients.
|
|
"""
|
|
global g_h264_stripe_queue, g_clients
|
|
print("Stripe sending task started.")
|
|
try:
|
|
while True:
|
|
if g_h264_stripe_queue is None: # Should not happen if task is managed correctly
|
|
await asyncio.sleep(0.1)
|
|
continue
|
|
|
|
h264_bytes_with_prefix = await g_h264_stripe_queue.get()
|
|
|
|
if not g_clients:
|
|
g_h264_stripe_queue.task_done()
|
|
continue
|
|
|
|
# Create a list of tasks to send to current clients
|
|
active_clients = list(g_clients) # Avoid issues if set modified during iteration
|
|
tasks = [client.send(h264_bytes_with_prefix) for client in active_clients]
|
|
if tasks:
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
g_h264_stripe_queue.task_done()
|
|
except asyncio.CancelledError:
|
|
print("Stripe sending task cancelled.")
|
|
raise
|
|
except Exception as e:
|
|
print(f"Error in send_h264_stripes: {e}")
|
|
finally:
|
|
print("Stripe sending task finished.")
|
|
|
|
|
|
async def ws_handler(websocket, path=None):
|
|
"""
|
|
Handles new WebSocket connections, manages client set, and controls capture state.
|
|
"""
|
|
global g_clients, g_is_capturing, g_h264_stripe_queue, g_module, g_send_task
|
|
global g_capture_settings, g_stripe_callback # These are set in main()
|
|
|
|
g_clients.add(websocket)
|
|
print(f"Client connected: {websocket.remote_address}. Total clients: {len(g_clients)}")
|
|
|
|
if not g_is_capturing and g_module:
|
|
print("First client connected. Starting H.264 capture...")
|
|
g_h264_stripe_queue = asyncio.Queue() # Fresh queue for the new capture session
|
|
|
|
if g_capture_settings is None or g_stripe_callback is None:
|
|
print("Error: Critical server components (settings/callback) not initialized.")
|
|
await websocket.close(code=1011, reason="Server configuration error")
|
|
g_clients.remove(websocket)
|
|
return
|
|
|
|
g_module.start_capture(g_capture_settings, g_stripe_callback)
|
|
g_is_capturing = True
|
|
|
|
if g_send_task is None or g_send_task.done():
|
|
g_send_task = asyncio.create_task(send_h264_stripes())
|
|
print("H.264 capture process initiated.")
|
|
elif not g_module:
|
|
print("Error: Pixelflux module not initialized. Cannot start capture.")
|
|
try:
|
|
await websocket.send("ERROR: Server-side capture module not ready.")
|
|
await websocket.close(code=1011, reason="Server module error")
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass
|
|
if websocket in g_clients: g_clients.remove(websocket)
|
|
return
|
|
|
|
try:
|
|
async for _ in websocket:
|
|
# This demo doesn't expect messages from the client
|
|
pass
|
|
except websockets.exceptions.ConnectionClosedError as e:
|
|
print(f"Client {websocket.remote_address} disconnected: Code {e.code}, Reason: '{e.reason or 'N/A'}'")
|
|
except Exception as e:
|
|
print(f"WebSocket error for client {websocket.remote_address}: {e}")
|
|
finally:
|
|
if websocket in g_clients:
|
|
g_clients.remove(websocket)
|
|
print(f"Client connection closed: {websocket.remote_address}. Remaining clients: {len(g_clients)}")
|
|
|
|
if g_is_capturing and not g_clients and g_module: # If last client disconnects
|
|
print("Last client disconnected. Stopping H.264 capture...")
|
|
g_module.stop_capture()
|
|
g_is_capturing = False
|
|
if g_send_task and not g_send_task.done():
|
|
g_send_task.cancel()
|
|
# The task will be awaited in the main finally block if needed
|
|
g_send_task = None
|
|
print("H.264 capture process stopped.")
|
|
# g_h264_stripe_queue can be left for GC or explicitly set to None
|
|
g_h264_stripe_queue = None
|
|
|
|
|
|
def py_stripe_callback(result_ptr, user_data):
|
|
"""
|
|
Callback function invoked by the pixelflux C++ module from a separate thread
|
|
when an H.264 encoded stripe is ready.
|
|
"""
|
|
global g_is_capturing, g_h264_stripe_queue, g_loop
|
|
|
|
if g_is_capturing and result_ptr and g_h264_stripe_queue is not None:
|
|
result = result_ptr.contents
|
|
if result.data and result.size > 0:
|
|
data_bytes_ptr = ctypes.cast(
|
|
result.data, ctypes.POINTER(ctypes.c_ubyte * result.size)
|
|
)
|
|
h264_stripe_with_prefix = bytes(data_bytes_ptr.contents)
|
|
|
|
if g_loop and not g_loop.is_closed():
|
|
asyncio.run_coroutine_threadsafe(
|
|
g_h264_stripe_queue.put(h264_stripe_with_prefix),
|
|
g_loop
|
|
)
|
|
# Memory for `result.data` is managed by the C++ module.
|
|
|
|
def start_http_server(port=9001):
|
|
"""
|
|
Starts a simple HTTP server in a new thread to serve client-side files.
|
|
"""
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
class DirectoryServingHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, directory=script_dir, **kwargs)
|
|
def log_message(self, format, *args):
|
|
pass # Quieter HTTP logs
|
|
|
|
with socketserver.TCPServer(("localhost", port), DirectoryServingHTTPRequestHandler) as httpd:
|
|
print(f"HTTP server serving files from '{os.path.abspath(script_dir)}' on http://localhost:{port}/")
|
|
print(f" (Open http://localhost:{port}/index.html in browser to connect to WebSocket)")
|
|
httpd.serve_forever()
|
|
|
|
async def main_async_routine():
|
|
"""
|
|
Main asynchronous routine to set up and run the server.
|
|
"""
|
|
global g_loop, g_capture_settings, g_stripe_callback, g_module
|
|
global g_is_capturing, g_send_task # For cleanup
|
|
|
|
# asyncio.run() creates and manages the loop. Get a reference for the C callback.
|
|
g_loop = asyncio.get_running_loop()
|
|
|
|
# --- Configure Screen Capture Parameters ---
|
|
g_capture_settings = CaptureSettings()
|
|
g_capture_settings.capture_width = 1920
|
|
g_capture_settings.capture_height = 1080
|
|
g_capture_settings.capture_x = 0
|
|
g_capture_settings.capture_y = 0
|
|
g_capture_settings.target_fps = 60.0
|
|
g_capture_settings.output_mode = 1 # 1 for H.264
|
|
g_capture_settings.h264_crf = 25
|
|
g_capture_settings.use_paint_over_quality = True
|
|
g_capture_settings.paint_over_trigger_frames = 5
|
|
g_capture_settings.damage_block_threshold = 15
|
|
g_capture_settings.damage_block_duration = 30
|
|
g_capture_settings.h264_fullcolor = False
|
|
g_capture_settings.h264_fullframe = False
|
|
# --- End Capture Configuration ---
|
|
|
|
g_stripe_callback = StripeCallback(py_stripe_callback)
|
|
g_module = ScreenCapture()
|
|
|
|
ws_server_instance = None # To hold the websockets server object
|
|
|
|
if not g_module:
|
|
print("Fatal: Failed to initialize pixelflux ScreenCapture module.")
|
|
return
|
|
|
|
print("Pixelflux capture module initialized.")
|
|
|
|
# Start HTTP server in a daemon thread.
|
|
http_thread = threading.Thread(target=start_http_server, args=(9001,), daemon=True)
|
|
http_thread.start()
|
|
|
|
ws_port = 9000
|
|
shutdown_event = asyncio.Event() # Used to keep server running until interrupt
|
|
|
|
try:
|
|
# websockets.serve should use the currently running loop (managed by asyncio.run())
|
|
# No explicit `loop=` argument is typically needed here with asyncio.run()
|
|
ws_server_instance = await websockets.serve(
|
|
ws_handler,
|
|
'localhost',
|
|
ws_port,
|
|
compression=None
|
|
)
|
|
print(f"WebSocket server started (streaming H.264) on ws://localhost:{ws_port}")
|
|
print("Waiting for client connections... Press Ctrl+C to stop.")
|
|
|
|
await shutdown_event.wait() # Keep the server running until event is set (by KeyboardInterrupt)
|
|
|
|
except OSError as e:
|
|
print(f"Failed to start server (e.g., port in use?): {e}")
|
|
# KeyboardInterrupt will be handled by the outer asyncio.run() wrapper,
|
|
# or we can catch it here to set the shutdown_event.
|
|
except KeyboardInterrupt: # This allows graceful shutdown of this coroutine
|
|
print("\nKeyboardInterrupt caught in main_async_routine. Signaling shutdown...")
|
|
finally:
|
|
print("Cleaning up resources in main_async_routine...")
|
|
if g_is_capturing and g_module:
|
|
g_module.stop_capture() # Ensure capture is stopped
|
|
g_is_capturing = False # Update flag
|
|
print("Capture stopped.")
|
|
|
|
if ws_server_instance:
|
|
ws_server_instance.close()
|
|
await ws_server_instance.wait_closed()
|
|
print("WebSocket server closed.")
|
|
|
|
if g_send_task and not g_send_task.done():
|
|
g_send_task.cancel()
|
|
try:
|
|
await g_send_task
|
|
except asyncio.CancelledError:
|
|
pass # Expected
|
|
print("Stripe sending task cancelled and awaited.")
|
|
|
|
if g_module:
|
|
del g_module # Allow C++ destructor to run if applicable
|
|
g_module = None
|
|
print("Capture module resources released.")
|
|
|
|
# asyncio.run() handles loop cleanup and cancellation of remaining tasks.
|
|
print("Async routine cleanup finished.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main_async_routine())
|
|
except KeyboardInterrupt:
|
|
# This handles Ctrl+C if it occurs outside the main_async_routine's try/except,
|
|
# or if main_async_routine re-raises it.
|
|
print("\nApplication shutdown initiated by KeyboardInterrupt (at top level).")
|
|
except Exception as e:
|
|
print(f"Unhandled exception in __main__: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
print("Application exiting.")
|