Skip to content

Python API Reference

This page contains auto-generated API documentation for the Stimm Python modules, produced by mkdocstrings.

get_provider_constants_endpoint() async

Serve provider constants to JavaScript frontend

Source code in src/main.py
@app.get("/api/provider-constants")
async def get_provider_constants_endpoint():
    """Serve provider constants to JavaScript frontend"""
    try:
        constants = get_provider_constants()
        return constants
    except Exception as e:
        logger.error(f"Failed to load provider constants: {e}")
        return {"error": "Failed to load provider constants"}

health_check()

Basic health check

Source code in src/main.py
@app.get("/health")
def health_check():
    """Basic health check"""
    return {"status": "healthy"}

rag_preloading_health() async

Health check for RAG preloading status

Source code in src/main.py
@app.get("/health/rag-preloading")
async def rag_preloading_health():
    """Health check for RAG preloading status"""
    try:
        from services.rag.rag_preloader import rag_preloader

        rag_preloader.get_status()  # status info not used but call kept for side effects

        if rag_preloader.is_preloaded:
            return {
                "status": "healthy",
                "rag_preloading": "completed",
                "preload_time": rag_preloader.preload_time,
                "preload_start_time": rag_preloader.preload_start_time,
                "rag_state_available": rag_preloader.rag_state is not None,
            }
        elif rag_preloader.preload_error:
            return {
                "status": "degraded",
                "rag_preloading": "failed",
                "preload_error": rag_preloader.preload_error,
                "preload_time": rag_preloader.preload_time,
                "rag_state_available": rag_preloader.rag_state is not None,
            }
        else:
            return {
                "status": "loading",
                "rag_preloading": "in_progress",
                "preload_start_time": rag_preloader.preload_start_time,
                "rag_state_available": rag_preloader.rag_state is not None,
            }

    except ImportError:
        return {
            "status": "degraded",
            "rag_preloading": "not_available",
            "message": "RAG preloader module not available, using lazy loading",
        }
    except Exception as e:
        return {"status": "error", "rag_preloading": "unknown", "error": str(e)}

sip_bridge_health() async

Health check for SIP Bridge status

Source code in src/main.py
@app.get("/health/sip-bridge")
async def sip_bridge_health():
    """Health check for SIP Bridge status"""
    try:
        from services.sip_bridge_integration import sip_bridge_integration

        if not sip_bridge_integration.is_enabled():
            return {
                "status": "disabled",
                "sip_bridge": "not_enabled",
                "message": "SIP Bridge is disabled (ENABLE_SIP_BRIDGE=false)",
            }

        if sip_bridge_integration.is_running():
            return {"status": "healthy", "sip_bridge": "running", "message": "SIP Bridge is running normally"}
        else:
            return {
                "status": "degraded",
                "sip_bridge": "not_running",
                "message": "SIP Bridge is enabled but not running",
            }

    except ImportError:
        return {
            "status": "error",
            "sip_bridge": "not_available",
            "message": "SIP Bridge Integration module not available",
        }
    except Exception as e:
        return {"status": "error", "sip_bridge": "error", "error": str(e)}

sip_bridge_status() async

Detailed status of SIP Bridge

Source code in src/main.py
@app.get("/health/sip-bridge-status")
async def sip_bridge_status():
    """Detailed status of SIP Bridge"""
    try:
        from services.sip_bridge_integration import get_sip_bridge_status

        return get_sip_bridge_status()
    except ImportError:
        return {"error": "SIP Bridge Integration module not available"}
    except Exception as e:
        return {"error": str(e)}

startup_event() async

Preload RAG models and initialize agent system at server startup

Source code in src/main.py
@app.on_event("startup")
async def startup_event():
    """Preload RAG models and initialize agent system at server startup"""
    try:
        logger.info("Starting RAG preloading and agent initialization at server startup...")

        # Import here to avoid circular imports
        try:
            from services.rag.rag_preloader import rag_preloader

            # Start preloading in background to not block server startup
            async def preload_rag():
                success = await rag_preloader.preload_all()
                if success:
                    logger.info("✅ RAG preloading completed successfully")
                else:
                    logger.error(f"❌ RAG preloading failed: {rag_preloader.preload_error}")

            # Start preloading as background task
            asyncio.create_task(preload_rag())

        except ImportError as e:
            logger.warning(f"RAG preloader not available, using lazy loading: {e}")
        except Exception as e:
            logger.error(f"Failed to initialize RAG preloader: {e}")

        # Initialize agent system (default agent only; no global provider config)
        try:
            # Force environment config loading first to ensure correct database URL
            from environment_config import get_environment_config

            env_config = get_environment_config()
            logger.info(f"Environment detected as: {os.getenv('ENVIRONMENT', 'local')}")
            logger.info(f"Database URL: {env_config.database_url}")

            # Now initialize agent system
            from database.session import get_db
            from services.agents_admin.dev_agent_creator import initialize_default_agent

            db_gen = get_db()
            db = next(db_gen)
            try:
                success = initialize_default_agent(db)
                if success:
                    logger.info("✅ Default development agent initialized successfully")
                else:
                    logger.error("❌ Failed to initialize default development agent")
            finally:
                db_gen.close()

        except ImportError as e:
            logger.warning(f"Agent system not available: {e}")
        except Exception as e:
            logger.error(f"Failed to initialize agent system: {e}")

        # Initialize SIP Bridge Integration if enabled
        try:
            from services.sip_bridge_integration import start_sip_bridge

            # Start SIP Bridge in background (singleton ensures no duplicates)
            start_sip_bridge()
            logger.info("✅ SIP Bridge Integration initialized (robust singleton)")

        except ImportError as e:
            logger.warning(f"SIP Bridge Integration not available: {e}")
        except Exception as e:
            logger.error(f"Failed to initialize SIP Bridge Integration: {e}")

        # Note: Stimm services are now initialized per-session in LiveKit service
        # to avoid concurrency issues with providers like Deepgram

    except Exception as e:
        logger.error(f"Failed to start startup procedures: {e}")

options: show_root_heading: true show_source: true

Stimm Service - Main service class for voice assistant functionality.

This service wraps the StimmEventLoop and provides a simple interface for managing stimm conversations.

StimmService

Main stimm service class.

Provides a high-level interface for managing stimm interactions using the event-driven StimmEventLoop architecture.

Source code in src/services/agents/stimm_service.py
class StimmService:
    """
    Main stimm service class.

    Provides a high-level interface for managing stimm interactions
    using the event-driven StimmEventLoop architecture.
    """

    def __init__(self, stt_service, chatbot_service, tts_service, vad_service, agent_id: str = None):
        self.stt_service = stt_service
        self.chatbot_service = chatbot_service
        self.tts_service = tts_service
        self.vad_service = vad_service
        self.agent_id = agent_id

        # Active sessions
        self.active_sessions: Dict[str, StimmEventLoop] = {}

        # Event handlers for real-time updates
        self.event_handlers: Dict[str, Callable] = {}

    async def create_session(self, conversation_id: str, session_id: str = None) -> StimmEventLoop:
        """
        Create a new stimm session.

        Args:
            conversation_id: Unique identifier for the conversation
            session_id: Optional session identifier

        Returns:
            StimmEventLoop instance for the new session
        """
        if conversation_id in self.active_sessions:
            logger.warning(f"Session {conversation_id} already exists")
            return self.active_sessions[conversation_id]

        # Create output queue for the session
        output_queue = asyncio.Queue()

        # Create event loop for the session
        event_loop = StimmEventLoop(
            conversation_id=conversation_id,
            output_queue=output_queue,
            stt_service=self.stt_service,
            chatbot_service=self.chatbot_service,
            tts_service=self.tts_service,
            vad_service=self.vad_service,
            agent_id=self.agent_id,
            session_id=session_id,
        )

        # Start the event loop
        await event_loop.start()

        # Store in active sessions
        self.active_sessions[conversation_id] = event_loop

        # Start a task to forward events to registered handlers
        asyncio.create_task(self._forward_events(conversation_id, output_queue))

        logger.info(f"Created stimm session: {conversation_id}")
        return event_loop

    async def close_session(self, conversation_id: str):
        """
        Close a stimm session.

        Args:
            conversation_id: ID of the session to close
        """
        if conversation_id not in self.active_sessions:
            logger.warning(f"Session {conversation_id} not found")
            return

        event_loop = self.active_sessions[conversation_id]
        await event_loop.stop()
        del self.active_sessions[conversation_id]

        logger.info(f"Closed stimm session: {conversation_id}")

    async def process_audio(self, conversation_id: str, audio_chunk: bytes) -> bool:
        """
        Process audio chunk for a specific session.

        Args:
            conversation_id: ID of the conversation
            audio_chunk: Raw audio data bytes

        Returns:
            True if processed successfully, False if session not found
        """
        if conversation_id not in self.active_sessions:
            logger.warning(f"Session {conversation_id} not found for audio processing")
            return False

        event_loop = self.active_sessions[conversation_id]
        # logger.debug(f"Processing audio chunk for session {conversation_id}: {len(audio_chunk)} bytes")
        await event_loop.process_audio_chunk(audio_chunk)
        return True

    def get_session_state(self, conversation_id: str) -> Optional[Dict[str, Any]]:
        """
        Get the current state of a session.

        Args:
            conversation_id: ID of the conversation

        Returns:
            Dictionary with session state or None if session not found
        """
        if conversation_id not in self.active_sessions:
            return None

        event_loop = self.active_sessions[conversation_id]
        return {
            "conversation_id": event_loop.conversation_id,
            "state": event_loop.state.value,
            "agent_id": event_loop.agent_id,
            "session_id": event_loop.session_id,
            "is_recording": event_loop.is_recording,
        }

    def register_event_handler(self, event_type: str, handler: Callable[[Dict[str, Any]], None]):
        """
        Register an event handler for real-time updates.

        Args:
            event_type: Type of event to handle
            handler: Async function to call when event occurs
        """
        self.event_handlers[event_type] = handler

    def unregister_event_handler(self, event_type: str):
        """
        Unregister an event handler.

        Args:
            event_type: Type of event to unregister
        """
        self.event_handlers.pop(event_type, None)

    async def _forward_events(self, conversation_id: str, output_queue: asyncio.Queue):
        """
        Forward events from output queue to registered handlers.

        Args:
            conversation_id: ID of the conversation
            output_queue: Queue to listen for events
        """
        try:
            while True:
                event = await output_queue.get()

                # Add conversation context to event
                event["conversation_id"] = conversation_id
                event["timestamp"] = event.get("timestamp", asyncio.get_event_loop().time())

                # Forward to registered handlers
                handler = self.event_handlers.get(event["type"])
                if handler:
                    try:
                        # logger.debug(f"🔄 Dispatching event {event['type']} to handler")
                        await handler(event)
                    except Exception as e:
                        logger.error(f"Error in event handler for {event['type']}: {e}")
                else:
                    # Log missing handler only for important events to avoid noise
                    if event["type"] in ["transcript_update", "assistant_response", "vad_update"]:
                        logger.warning(f"⚠️ No handler registered for event: {event['type']}")

                output_queue.task_done()

        except asyncio.CancelledError:
            logger.info(f"Event forwarding cancelled for session {conversation_id}")
        except Exception as e:
            logger.error(f"Error in event forwarding for session {conversation_id}: {e}")

    async def cleanup_all_sessions(self):
        """Clean up all active sessions."""
        for conversation_id in list(self.active_sessions.keys()):
            await self.close_session(conversation_id)

    def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
        """
        Get information about all active sessions.

        Returns:
            Dictionary mapping conversation IDs to session info
        """
        return {conv_id: self.get_session_state(conv_id) for conv_id in self.active_sessions.keys()}

cleanup_all_sessions() async

Clean up all active sessions.

Source code in src/services/agents/stimm_service.py
async def cleanup_all_sessions(self):
    """Clean up all active sessions."""
    for conversation_id in list(self.active_sessions.keys()):
        await self.close_session(conversation_id)

close_session(conversation_id) async

Close a stimm session.

Parameters:

Name Type Description Default
conversation_id str

ID of the session to close

required
Source code in src/services/agents/stimm_service.py
async def close_session(self, conversation_id: str):
    """
    Close a stimm session.

    Args:
        conversation_id: ID of the session to close
    """
    if conversation_id not in self.active_sessions:
        logger.warning(f"Session {conversation_id} not found")
        return

    event_loop = self.active_sessions[conversation_id]
    await event_loop.stop()
    del self.active_sessions[conversation_id]

    logger.info(f"Closed stimm session: {conversation_id}")

create_session(conversation_id, session_id=None) async

Create a new stimm session.

Parameters:

Name Type Description Default
conversation_id str

Unique identifier for the conversation

required
session_id str

Optional session identifier

None

Returns:

Type Description
StimmEventLoop

StimmEventLoop instance for the new session

Source code in src/services/agents/stimm_service.py
async def create_session(self, conversation_id: str, session_id: str = None) -> StimmEventLoop:
    """
    Create a new stimm session.

    Args:
        conversation_id: Unique identifier for the conversation
        session_id: Optional session identifier

    Returns:
        StimmEventLoop instance for the new session
    """
    if conversation_id in self.active_sessions:
        logger.warning(f"Session {conversation_id} already exists")
        return self.active_sessions[conversation_id]

    # Create output queue for the session
    output_queue = asyncio.Queue()

    # Create event loop for the session
    event_loop = StimmEventLoop(
        conversation_id=conversation_id,
        output_queue=output_queue,
        stt_service=self.stt_service,
        chatbot_service=self.chatbot_service,
        tts_service=self.tts_service,
        vad_service=self.vad_service,
        agent_id=self.agent_id,
        session_id=session_id,
    )

    # Start the event loop
    await event_loop.start()

    # Store in active sessions
    self.active_sessions[conversation_id] = event_loop

    # Start a task to forward events to registered handlers
    asyncio.create_task(self._forward_events(conversation_id, output_queue))

    logger.info(f"Created stimm session: {conversation_id}")
    return event_loop

get_active_sessions()

Get information about all active sessions.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dictionary mapping conversation IDs to session info

Source code in src/services/agents/stimm_service.py
def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
    """
    Get information about all active sessions.

    Returns:
        Dictionary mapping conversation IDs to session info
    """
    return {conv_id: self.get_session_state(conv_id) for conv_id in self.active_sessions.keys()}

get_session_state(conversation_id)

Get the current state of a session.

Parameters:

Name Type Description Default
conversation_id str

ID of the conversation

required

Returns:

Type Description
Optional[Dict[str, Any]]

Dictionary with session state or None if session not found

Source code in src/services/agents/stimm_service.py
def get_session_state(self, conversation_id: str) -> Optional[Dict[str, Any]]:
    """
    Get the current state of a session.

    Args:
        conversation_id: ID of the conversation

    Returns:
        Dictionary with session state or None if session not found
    """
    if conversation_id not in self.active_sessions:
        return None

    event_loop = self.active_sessions[conversation_id]
    return {
        "conversation_id": event_loop.conversation_id,
        "state": event_loop.state.value,
        "agent_id": event_loop.agent_id,
        "session_id": event_loop.session_id,
        "is_recording": event_loop.is_recording,
    }

process_audio(conversation_id, audio_chunk) async

Process audio chunk for a specific session.

Parameters:

Name Type Description Default
conversation_id str

ID of the conversation

required
audio_chunk bytes

Raw audio data bytes

required

Returns:

Type Description
bool

True if processed successfully, False if session not found

Source code in src/services/agents/stimm_service.py
async def process_audio(self, conversation_id: str, audio_chunk: bytes) -> bool:
    """
    Process audio chunk for a specific session.

    Args:
        conversation_id: ID of the conversation
        audio_chunk: Raw audio data bytes

    Returns:
        True if processed successfully, False if session not found
    """
    if conversation_id not in self.active_sessions:
        logger.warning(f"Session {conversation_id} not found for audio processing")
        return False

    event_loop = self.active_sessions[conversation_id]
    # logger.debug(f"Processing audio chunk for session {conversation_id}: {len(audio_chunk)} bytes")
    await event_loop.process_audio_chunk(audio_chunk)
    return True

register_event_handler(event_type, handler)

Register an event handler for real-time updates.

Parameters:

Name Type Description Default
event_type str

Type of event to handle

required
handler Callable[[Dict[str, Any]], None]

Async function to call when event occurs

required
Source code in src/services/agents/stimm_service.py
def register_event_handler(self, event_type: str, handler: Callable[[Dict[str, Any]], None]):
    """
    Register an event handler for real-time updates.

    Args:
        event_type: Type of event to handle
        handler: Async function to call when event occurs
    """
    self.event_handlers[event_type] = handler

unregister_event_handler(event_type)

Unregister an event handler.

Parameters:

Name Type Description Default
event_type str

Type of event to unregister

required
Source code in src/services/agents/stimm_service.py
def unregister_event_handler(self, event_type: str):
    """
    Unregister an event handler.

    Args:
        event_type: Type of event to unregister
    """
    self.event_handlers.pop(event_type, None)

get_stimm_service(stt_service=None, chatbot_service=None, tts_service=None, vad_service=None, agent_id=None)

Get or create the global stimm service instance.

Parameters:

Name Type Description Default
stt_service

STT service instance

None
chatbot_service

Chatbot service instance

None
tts_service

TTS service instance

None
vad_service

VAD service instance

None
agent_id str

Optional agent ID

None

Returns:

Type Description
StimmService

StimmService instance

Source code in src/services/agents/stimm_service.py
def get_stimm_service(stt_service=None, chatbot_service=None, tts_service=None, vad_service=None, agent_id: str = None) -> StimmService:
    """
    Get or create the global stimm service instance.

    Args:
        stt_service: STT service instance
        chatbot_service: Chatbot service instance
        tts_service: TTS service instance
        vad_service: VAD service instance
        agent_id: Optional agent ID

    Returns:
        StimmService instance
    """
    global stimm_service

    if stimm_service is None:
        stimm_service = StimmService(
            stt_service=stt_service,
            chatbot_service=chatbot_service,
            tts_service=tts_service,
            vad_service=vad_service,
            agent_id=agent_id,
        )

    return stimm_service

options: show_root_heading: true show_source: true

RAG Service Core Logic

This module contains the core service logic for conversation management and document handling.

options: show_root_heading: true show_source: true

Language Model Service Module with Agent Support

LLMService

Service for handling Language Model operations with agent support

Source code in src/services/llm/llm.py
class LLMService:
    """Service for handling Language Model operations with agent support"""

    def __init__(self, agent_id: Optional[UUID] = None, session_id: Optional[str] = None):
        """
        Initialize LLM Service with agent support.

        Args:
            agent_id: Specific agent ID to use (if None, uses default agent)
            session_id: Session ID for agent resolution
        """
        self.agent_manager = get_agent_manager()
        self.agent_id = agent_id
        self.session_id = session_id
        self.agent_config = None
        self.provider = self._initialize_provider()

    def _initialize_provider(self):
        """Initialize the appropriate LLM provider based on agent configuration"""
        # Get agent configuration
        agent_config = None

        if self.session_id:
            try:
                # Verify if session_id is a valid UUID before querying
                UUID(self.session_id)
                agent_config = self.agent_manager.get_session_agent(self.session_id)
            except (ValueError, Exception) as e:
                logger.warning(f"Invalid session_id '{self.session_id}', falling back to agent_id: {e}")

        if not agent_config and self.agent_id:
            agent_config = self.agent_manager.get_agent_config(self.agent_id)

        if not agent_config:
            agent_config = self.agent_manager.get_agent_config()

        # Store agent configuration for later use (e.g., system prompt)
        self.agent_config = agent_config

        provider_name = agent_config.llm_provider
        provider_config = agent_config.llm_config

        logger.debug(f"Initializing LLM provider: {provider_name} with agent configuration")
        logger.debug(f"🔍 LLM provider config for {provider_name}: {provider_config}")

        # Initialize provider - mapping is now handled within each provider
        if provider_name == "groq.com":
            return create_groq_provider(provider_config)
        elif provider_name == "mistral.ai":
            return create_mistral_provider(provider_config)
        elif provider_name == "openrouter.ai":
            return create_openrouter_provider(provider_config)
        elif provider_name == "llama-cpp.local":
            return create_llama_cpp_provider(provider_config)
        else:
            raise ValueError(f"Unsupported LLM provider: {provider_name}")

    async def generate(self, prompt: str, **kwargs) -> str:
        """
        Generate text using the configured LLM provider

        Args:
            prompt: Input text prompt
            **kwargs: Additional parameters for the provider

        Returns:
            str: Generated text
        """
        return await self.provider.generate(prompt, **kwargs)

    async def generate_stream(self, prompt: str, **kwargs) -> AsyncIterator[str]:
        """
        Stream text generation using the configured LLM provider

        Args:
            prompt: Input text prompt
            **kwargs: Additional parameters for the provider

        Yields:
            str: Generated text chunks
        """
        async for chunk in self.provider.generate_stream(prompt, **kwargs):
            yield chunk

    async def close(self):
        """Close the provider session"""
        if hasattr(self.provider, "close"):
            await self.provider.close()

__init__(agent_id=None, session_id=None)

Initialize LLM Service with agent support.

Parameters:

Name Type Description Default
agent_id Optional[UUID]

Specific agent ID to use (if None, uses default agent)

None
session_id Optional[str]

Session ID for agent resolution

None
Source code in src/services/llm/llm.py
def __init__(self, agent_id: Optional[UUID] = None, session_id: Optional[str] = None):
    """
    Initialize LLM Service with agent support.

    Args:
        agent_id: Specific agent ID to use (if None, uses default agent)
        session_id: Session ID for agent resolution
    """
    self.agent_manager = get_agent_manager()
    self.agent_id = agent_id
    self.session_id = session_id
    self.agent_config = None
    self.provider = self._initialize_provider()

close() async

Close the provider session

Source code in src/services/llm/llm.py
async def close(self):
    """Close the provider session"""
    if hasattr(self.provider, "close"):
        await self.provider.close()

generate(prompt, **kwargs) async

Generate text using the configured LLM provider

Parameters:

Name Type Description Default
prompt str

Input text prompt

required
**kwargs

Additional parameters for the provider

{}

Returns:

Name Type Description
str str

Generated text

Source code in src/services/llm/llm.py
async def generate(self, prompt: str, **kwargs) -> str:
    """
    Generate text using the configured LLM provider

    Args:
        prompt: Input text prompt
        **kwargs: Additional parameters for the provider

    Returns:
        str: Generated text
    """
    return await self.provider.generate(prompt, **kwargs)

generate_stream(prompt, **kwargs) async

Stream text generation using the configured LLM provider

Parameters:

Name Type Description Default
prompt str

Input text prompt

required
**kwargs

Additional parameters for the provider

{}

Yields:

Name Type Description
str AsyncIterator[str]

Generated text chunks

Source code in src/services/llm/llm.py
async def generate_stream(self, prompt: str, **kwargs) -> AsyncIterator[str]:
    """
    Stream text generation using the configured LLM provider

    Args:
        prompt: Input text prompt
        **kwargs: Additional parameters for the provider

    Yields:
        str: Generated text chunks
    """
    async for chunk in self.provider.generate_stream(prompt, **kwargs):
        yield chunk

options: show_root_heading: true show_source: true

Speech-to-Text Service Module

STTService

Service for handling Speech-to-Text operations

Source code in src/services/stt/stt.py
class STTService:
    """Service for handling Speech-to-Text operations"""

    def __init__(self, agent_id: Optional[str] = None, session_id: Optional[str] = None):
        self.agent_id = agent_id
        self.session_id = session_id
        self.provider = None
        self._initialize_provider()

    def _initialize_provider(self):
        """Initialize the configured STT provider"""
        # Always use agent-based configuration
        agent_manager = get_agent_manager()
        if self.agent_id:
            agent_config = agent_manager.get_agent_config(self.agent_id)
        elif self.session_id:
            agent_config = agent_manager.get_session_agent(self.session_id)
        else:
            agent_config = agent_manager.get_agent_config()

        provider_name = agent_config.stt_provider
        provider_config = agent_config.stt_config or {}
        logger.info(f"Initialized STT provider from agent configuration: {provider_name}")
        logger.info(f"🔍 STT provider config for {provider_name}: {provider_config}")

        if provider_name == "whisper.local":
            self.provider = WhisperLocalProvider(provider_config)
        elif provider_name == "deepgram.com":
            self.provider = DeepgramProvider(provider_config)
        else:
            raise ValueError(f"Unsupported STT provider: {provider_name}")

    async def transcribe_streaming(self, audio_generator: AsyncGenerator[bytes, None]) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Transcribe from a streaming audio generator

        Args:
            audio_generator: Async generator yielding audio chunks

        Yields:
            Transcription results
        """
        if not self.provider:
            raise RuntimeError("STT provider not initialized")

        try:
            # Use the provider's streaming method
            async for transcript in self.provider.stream_audio_chunks(audio_generator):
                yield transcript

        except Exception as e:
            logger.error(f"Streaming transcription failed: {e}")
            raise

transcribe_streaming(audio_generator) async

Transcribe from a streaming audio generator

Parameters:

Name Type Description Default
audio_generator AsyncGenerator[bytes, None]

Async generator yielding audio chunks

required

Yields:

Type Description
AsyncGenerator[Dict[str, Any], None]

Transcription results

Source code in src/services/stt/stt.py
async def transcribe_streaming(self, audio_generator: AsyncGenerator[bytes, None]) -> AsyncGenerator[Dict[str, Any], None]:
    """
    Transcribe from a streaming audio generator

    Args:
        audio_generator: Async generator yielding audio chunks

    Yields:
        Transcription results
    """
    if not self.provider:
        raise RuntimeError("STT provider not initialized")

    try:
        # Use the provider's streaming method
        async for transcript in self.provider.stream_audio_chunks(audio_generator):
            yield transcript

    except Exception as e:
        logger.error(f"Streaming transcription failed: {e}")
        raise

options: show_root_heading: true show_source: true

Text-to-Speech Service Module with provider-based streaming support.

TTSService

Service for handling Text-to-Speech operations

Source code in src/services/tts/tts.py
class TTSService:
    """Service for handling Text-to-Speech operations"""

    def __init__(self, agent_id: Optional[str] = None, session_id: Optional[str] = None):
        self.agent_id = agent_id
        self.session_id = session_id
        self.provider = None
        self._initialize_provider()

    def _initialize_provider(self):
        # Always use agent-based configuration
        agent_manager = get_agent_manager()
        if self.agent_id:
            agent_config = agent_manager.get_agent_config(self.agent_id)
        elif self.session_id:
            agent_config = agent_manager.get_session_agent(self.session_id)
        else:
            agent_config = agent_manager.get_agent_config()

        provider_name = agent_config.tts_provider
        provider_config = agent_config.tts_config

        logger.info(f"Initializing TTS provider from agent configuration: {provider_name}")
        logger.info(f"🔍 TTS provider config for {provider_name}: {provider_config}")

        try:
            # Initialize providers - mapping is now handled within each provider
            if provider_name == "async.ai":
                self.provider = AsyncAIProvider(provider_config)
            elif provider_name == "kokoro.local":
                self.provider = KokoroLocalProvider(provider_config)
            elif provider_name == "deepgram.com":
                self.provider = DeepgramProvider(provider_config)
            elif provider_name == "elevenlabs.io":
                self.provider = ElevenLabsProvider(provider_config)
            elif provider_name == "hume.ai":
                self.provider = HumeProvider(provider_config)
            else:
                raise ValueError(f"Unsupported TTS provider: {provider_name}")

            logger.info(f"TTS provider initialized: {type(self.provider).__name__}")
        except Exception as e:
            logger.error(f"Failed to initialize TTS provider '{provider_name}': {e}")
            raise

    async def stream_synthesis(self, text_generator: AsyncGenerator[str, None]) -> AsyncGenerator[bytes, None]:
        """Stream synthesis using the configured provider."""
        if not self.provider:
            raise RuntimeError("TTS provider not initialized")

        # Delegate to the provider's stream_synthesis method
        async for audio_chunk in self.provider.stream_synthesis(text_generator):
            yield audio_chunk

stream_synthesis(text_generator) async

Stream synthesis using the configured provider.

Source code in src/services/tts/tts.py
async def stream_synthesis(self, text_generator: AsyncGenerator[str, None]) -> AsyncGenerator[bytes, None]:
    """Stream synthesis using the configured provider."""
    if not self.provider:
        raise RuntimeError("TTS provider not initialized")

    # Delegate to the provider's stream_synthesis method
    async for audio_chunk in self.provider.stream_synthesis(text_generator):
        yield audio_chunk

options: show_root_heading: true show_source: true

Database models for stimm application.

Agent

Bases: Base

Agent model representing a stimm configuration.

Source code in src/database/models.py
class Agent(Base):
    """Agent model representing a stimm configuration."""

    __tablename__ = "agents"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    name = Column(String(255), nullable=False)
    description = Column(Text)

    # Provider selections
    llm_provider = Column(String(50), nullable=False)
    tts_provider = Column(String(50), nullable=False)
    stt_provider = Column(String(50), nullable=False)

    # Provider configurations
    llm_config = Column(JSONB, nullable=False, default=dict)
    tts_config = Column(JSONB, nullable=False, default=dict)
    stt_config = Column(JSONB, nullable=False, default=dict)

    # Agent settings
    is_default = Column(Boolean, default=False)
    is_active = Column(Boolean, default=True)
    is_system_agent = Column(Boolean, default=False)

    # System prompt (instructions)
    system_prompt = Column(Text)

    # RAG configuration (optional)
    rag_config_id = Column(UUID(as_uuid=True), ForeignKey("rag_configs.id", ondelete="SET NULL"), nullable=True)

    # Metadata
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    __table_args__ = (
        Index("idx_agents_user_id", "user_id"),
        Index("idx_agents_is_default", "is_default", postgresql_where=(is_default.is_(True))),
        Index("idx_agents_is_active", "is_active", postgresql_where=(is_active.is_(True))),
        Index("idx_agents_user_default", "user_id", unique=True, postgresql_where=(is_default.is_(True))),
    )

    def __repr__(self):
        return f"<Agent(id={self.id}, name='{self.name}', llm='{self.llm_provider}')>"

    def to_dict(self):
        """Convert agent to dictionary for API responses."""
        return {
            "id": str(self.id),
            "user_id": str(self.user_id),
            "name": self.name,
            "description": self.description,
            "llm_provider": self.llm_provider,
            "tts_provider": self.tts_provider,
            "stt_provider": self.stt_provider,
            "llm_config": self.llm_config,
            "tts_config": self.tts_config,
            "stt_config": self.stt_config,
            "is_default": self.is_default,
            "is_active": self.is_active,
            "is_system_agent": self.is_system_agent,
            "system_prompt": self.system_prompt,
            "rag_config_id": str(self.rag_config_id) if self.rag_config_id else None,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
        }

to_dict()

Convert agent to dictionary for API responses.

Source code in src/database/models.py
def to_dict(self):
    """Convert agent to dictionary for API responses."""
    return {
        "id": str(self.id),
        "user_id": str(self.user_id),
        "name": self.name,
        "description": self.description,
        "llm_provider": self.llm_provider,
        "tts_provider": self.tts_provider,
        "stt_provider": self.stt_provider,
        "llm_config": self.llm_config,
        "tts_config": self.tts_config,
        "stt_config": self.stt_config,
        "is_default": self.is_default,
        "is_active": self.is_active,
        "is_system_agent": self.is_system_agent,
        "system_prompt": self.system_prompt,
        "rag_config_id": str(self.rag_config_id) if self.rag_config_id else None,
        "created_at": self.created_at.isoformat() if self.created_at else None,
        "updated_at": self.updated_at.isoformat() if self.updated_at else None,
    }

AgentSession

Bases: Base

Agent session model for runtime agent switching.

Source code in src/database/models.py
class AgentSession(Base):
    """Agent session model for runtime agent switching."""

    __tablename__ = "agent_sessions"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    agent_id = Column(UUID(as_uuid=True), ForeignKey("agents.id", ondelete="CASCADE"), nullable=False)
    session_type = Column(String(50), nullable=False)  # 'stimm', 'chat', 'tts', 'stt'
    ip_address = Column(String(45))  # IPv6 support
    user_agent = Column(Text)

    created_at = Column(DateTime(timezone=True), server_default=func.now())
    expires_at = Column(DateTime(timezone=True))

    __table_args__ = (
        Index("idx_agent_sessions_user_agent", "user_id", "agent_id"),
        Index("idx_agent_sessions_expires", "expires_at"),
    )

    def __repr__(self):
        return f"<AgentSession(id={self.id}, type='{self.session_type}', agent='{self.agent_id}')>"

Document

Bases: Base

Document model for tracking ingested documents in RAG configurations.

Source code in src/database/models.py
class Document(Base):
    """Document model for tracking ingested documents in RAG configurations."""

    __tablename__ = "documents"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    rag_config_id = Column(UUID(as_uuid=True), ForeignKey("rag_configs.id", ondelete="CASCADE"), nullable=False)
    filename = Column(String(500), nullable=False)
    file_type = Column(String(50), nullable=False)  # 'pdf', 'docx', 'markdown', 'text'
    file_size_bytes = Column(Integer, nullable=True)
    chunk_count = Column(Integer, nullable=False)
    chunk_ids = Column(ARRAY(Text), nullable=False)  # Array of Qdrant point IDs
    namespace = Column(String(255), nullable=True)
    doc_metadata = Column(JSONB, nullable=True)  # Renamed from 'metadata' to avoid SQLAlchemy conflict
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    __table_args__ = (
        Index("idx_documents_rag_config", "rag_config_id"),
        Index("idx_documents_created_at", "created_at"),
    )

    def __repr__(self):
        return f"<Document(id={self.id}, filename='{self.filename}', type='{self.file_type}')>"

    def to_dict(self):
        """Convert document to dictionary for API responses."""
        return {
            "id": str(self.id),
            "rag_config_id": str(self.rag_config_id),
            "filename": self.filename,
            "file_type": self.file_type,
            "file_size_bytes": self.file_size_bytes,
            "chunk_count": self.chunk_count,
            "chunk_ids": self.chunk_ids,
            "namespace": self.namespace,
            "metadata": self.doc_metadata,  # Return as 'metadata' in API
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
        }

to_dict()

Convert document to dictionary for API responses.

Source code in src/database/models.py
def to_dict(self):
    """Convert document to dictionary for API responses."""
    return {
        "id": str(self.id),
        "rag_config_id": str(self.rag_config_id),
        "filename": self.filename,
        "file_type": self.file_type,
        "file_size_bytes": self.file_size_bytes,
        "chunk_count": self.chunk_count,
        "chunk_ids": self.chunk_ids,
        "namespace": self.namespace,
        "metadata": self.doc_metadata,  # Return as 'metadata' in API
        "created_at": self.created_at.isoformat() if self.created_at else None,
        "updated_at": self.updated_at.isoformat() if self.updated_at else None,
    }

RagConfig

Bases: Base

RAG configuration model representing a retrievable knowledge base.

Source code in src/database/models.py
class RagConfig(Base):
    """RAG configuration model representing a retrievable knowledge base."""

    __tablename__ = "rag_configs"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    user_id = Column(UUID(as_uuid=True), ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
    name = Column(String(255), nullable=False)
    description = Column(Text)
    provider_type = Column(String(20), nullable=False)  # 'vectorbase', 'saas_rag'
    provider = Column(String(50), nullable=False)  # e.g., 'qdrant.internal', 'pinecone.io', 'rag.saas'
    provider_config = Column(JSONB, nullable=False, default=dict)
    is_default = Column(Boolean, default=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    __table_args__ = (
        Index("idx_rag_configs_user_id", "user_id"),
        Index("idx_rag_configs_is_default", "is_default", postgresql_where=(is_default.is_(True))),
        Index("idx_rag_configs_user_default", "user_id", unique=True, postgresql_where=(is_default.is_(True))),
    )

    def __repr__(self):
        return f"<RagConfig(id={self.id}, name='{self.name}', provider='{self.provider}')>"

    def to_dict(self):
        """Convert RAG config to dictionary for API responses."""
        return {
            "id": str(self.id),
            "user_id": str(self.user_id),
            "name": self.name,
            "description": self.description,
            "provider_type": self.provider_type,
            "provider": self.provider,
            "provider_config": self.provider_config,
            "is_default": self.is_default,
            "is_active": self.is_active,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
        }

to_dict()

Convert RAG config to dictionary for API responses.

Source code in src/database/models.py
def to_dict(self):
    """Convert RAG config to dictionary for API responses."""
    return {
        "id": str(self.id),
        "user_id": str(self.user_id),
        "name": self.name,
        "description": self.description,
        "provider_type": self.provider_type,
        "provider": self.provider,
        "provider_config": self.provider_config,
        "is_default": self.is_default,
        "is_active": self.is_active,
        "created_at": self.created_at.isoformat() if self.created_at else None,
        "updated_at": self.updated_at.isoformat() if self.updated_at else None,
    }

User

Bases: Base

User model for future IAM support.

Source code in src/database/models.py
class User(Base):
    """User model for future IAM support."""

    __tablename__ = "users"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    username = Column(String(255), unique=True, nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

options: show_root_heading: true show_source: true

CLI Tool for testing voice agents from command line

clear_rooms(args) async

Delete all LiveKit rooms

Source code in src/cli/main.py
async def clear_rooms(args):
    """Delete all LiveKit rooms"""
    try:
        from livekit import api
        from livekit.api import TwirpError

        from environment_config import config

        lkapi = api.LiveKitAPI(
            url=config.livekit_url.replace("ws://", "http://"),
            api_key=config.livekit_api_key,
            api_secret=config.livekit_api_secret,
        )
        try:
            rooms = await lkapi.room.list_rooms(api.ListRoomsRequest())
            print(f"Found {len(rooms.rooms)} rooms")
            deleted = 0
            for room in rooms.rooms:
                try:
                    await lkapi.room.delete_room(api.DeleteRoomRequest(room=room.name))
                    print(f"✅ Deleted room: {room.name}")
                    deleted += 1
                except TwirpError as e:
                    # If room already doesn't exist, treat as warning
                    if "could not find object" in str(e):
                        print(f"⚠️  Room {room.name} cannot be deleted (may be protected or virtual)")
                    else:
                        print(f"❌ Failed to delete room {room.name}: {e}")
                except Exception as e:
                    print(f"❌ Failed to delete room {room.name}: {e}")
            print(f"Deleted {deleted} rooms")
        finally:
            await lkapi.aclose()
        return 0
    except Exception as e:
        print(f"❌ Error clearing rooms: {e}")
        import traceback

        traceback.print_exc()
        return 1

clear_sip_bridge(args) async

Clean all SIP bridge active rooms and processes

Source code in src/cli/main.py
async def clear_sip_bridge(args):
    """Clean all SIP bridge active rooms and processes"""
    try:
        from livekit import api
        from livekit.api import TwirpError

        from environment_config import config
        from services.sip_bridge_integration import sip_bridge_integration

        # First, clean up SIP bridge processes
        if sip_bridge_integration.is_enabled():
            print("Cleaning up SIP bridge agent processes...")
            sip_bridge_integration._cleanup_all_processes()
            print("✅ All agent processes terminated")
        else:
            print("SIP bridge is disabled")

        # Delete SIP rooms (optional, but we can also delete rooms with prefix 'sip-inbound')
        lkapi = api.LiveKitAPI(
            url=config.livekit_url.replace("ws://", "http://"),
            api_key=config.livekit_api_key,
            api_secret=config.livekit_api_secret,
        )
        try:
            rooms = await lkapi.room.list_rooms(api.ListRoomsRequest())
            sip_rooms = [room for room in rooms.rooms if room.name.startswith("sip-inbound")]
            print(f"Found {len(sip_rooms)} SIP rooms")
            deleted = 0
            for room in sip_rooms:
                try:
                    await lkapi.room.delete_room(api.DeleteRoomRequest(room=room.name))
                    print(f"✅ Deleted SIP room: {room.name}")
                    deleted += 1
                except TwirpError as e:
                    if "could not find object" in str(e):
                        print(f"⚠️  SIP room {room.name} already deleted or not found")
                    else:
                        print(f"❌ Failed to delete SIP room {room.name}: {e}")
                except Exception as e:
                    print(f"❌ Failed to delete SIP room {room.name}: {e}")
            print(f"Deleted {deleted} SIP rooms")
        finally:
            await lkapi.aclose()
        return 0
    except Exception as e:
        print(f"❌ Error clearing SIP bridge: {e}")
        import traceback

        traceback.print_exc()
        return 1

get_base_url(args)

Resolve base URL from args

Source code in src/cli/main.py
def get_base_url(args):
    """Resolve base URL from args"""
    if args.url:
        return args.url
    if args.http:
        return config.stimm_api_url
    return None

list_agents(args) async

List all available agents

Source code in src/cli/main.py
async def list_agents(args):
    """List all available agents"""
    try:
        base_url = get_base_url(args)
        if base_url:
            return await list_agents_http(base_url)
        else:
            return await list_agents_local()
    except Exception as e:
        print(f"❌ Error listing agents: {e}")
        import traceback

        traceback.print_exc()
        return 1

list_rooms(args) async

List all LiveKit rooms

Source code in src/cli/main.py
async def list_rooms(args):
    """List all LiveKit rooms"""
    try:
        from livekit import api

        from environment_config import config

        lkapi = api.LiveKitAPI(
            url=config.livekit_url.replace("ws://", "http://"),
            api_key=config.livekit_api_key,
            api_secret=config.livekit_api_secret,
        )
        try:
            rooms = await lkapi.room.list_rooms(api.ListRoomsRequest())
            print("\n📋 LiveKit Rooms:")
            print("=" * 80)
            for room in rooms.rooms:
                print(f"• {room.name}")
                print(f"  Participants: {room.num_participants}")
                print(f"  Creation time: {room.creation_time}")
                print(f"  Empty timeout: {room.empty_timeout}")
                print()
            print(f"Total rooms: {len(rooms.rooms)}")
        finally:
            await lkapi.aclose()
        return 0
    except Exception as e:
        print(f"❌ Error listing rooms: {e}")
        import traceback

        traceback.print_exc()
        return 1

main() async

Async main entry point

Source code in src/cli/main.py
async def main():
    """Async main entry point"""
    args = parse_args()
    configure_logging(args.verbose)

    if hasattr(args, "func"):
        return await args.func(args)

    return 1

parse_args()

Parse command line arguments

Source code in src/cli/main.py
def parse_args():
    """Parse command line arguments"""
    parser = argparse.ArgumentParser(
        description="CLI to interact with Voice Agents.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Talk with an agent in local mode (default)
  python -m src.cli.main talk --agent-name "ava"

  # Chat with an agent using a remote backend (default URL)
  python -m src.cli.main --http chat --agent-name "ava"

  # Chat with a specific backend URL
  python -m src.cli.main --http http://localhost:8001 chat --agent-name "ava"

  # List all agents from the local database
  python -m src.cli.main agents list
""",
    )

    # Global options
    parser.add_argument(
        "--http",
        action="store_true",
        help="Use HTTP mode. Can be used as a flag (default env URL) or with a value (custom URL).",
    )
    parser.add_argument(
        "--url",
        help=argparse.SUPPRESS,  # Hidden argument used by preprocessor
    )
    parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose logging")

    subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands")

    # 'agents' command
    parser_agents = subparsers.add_parser("agents", help="Manage agents")
    agents_subparsers = parser_agents.add_subparsers(dest="agents_command", required=True)
    parser_agents_list = agents_subparsers.add_parser("list", help="List available agents")
    parser_agents_list.set_defaults(func=list_agents)

    # 'chat' command
    parser_chat = subparsers.add_parser("chat", help="Start a text-based chat session")
    parser_chat.add_argument("--agent-name", help="Name of the agent to use")
    parser_chat.add_argument("--disable-rag", action="store_true", help="Disable RAG for the session")
    parser_chat.set_defaults(func=run_chat_mode)

    # 'talk' command
    parser_talk = subparsers.add_parser("talk", help="Start a voice-based session")
    parser_talk.add_argument("--agent-name", help="Name of the agent to use")
    parser_talk.add_argument("--room-name", help="Custom room name for LiveKit")
    parser_talk.add_argument("--disable-rag", action="store_true", help="Disable RAG for the session")
    parser_talk.set_defaults(func=run_talk_mode)

    # 'test' command
    parser_test = subparsers.add_parser("test", help="Run tests")
    test_subparsers = parser_test.add_subparsers(dest="test_command", required=True)
    parser_test_echo = test_subparsers.add_parser("echo", help="Test LiveKit echo pipeline")
    parser_test_echo.set_defaults(func=test_echo_pipeline)

    # 'livekit' command
    parser_livekit = subparsers.add_parser("livekit", help="Manage LiveKit rooms and SIP bridge")
    livekit_subparsers = parser_livekit.add_subparsers(dest="livekit_command", required=True)

    # list-rooms subcommand
    parser_list_rooms = livekit_subparsers.add_parser("list-rooms", help="List all LiveKit rooms")
    parser_list_rooms.set_defaults(func=list_rooms)

    # clear-rooms subcommand
    parser_clear_rooms = livekit_subparsers.add_parser("clear-rooms", help="Delete all LiveKit rooms")
    parser_clear_rooms.set_defaults(func=clear_rooms)

    # clear-sip-bridge subcommand
    parser_clear_sip_bridge = livekit_subparsers.add_parser("clear-sip-bridge", help="Clean all SIP bridge active rooms and processes")
    parser_clear_sip_bridge.set_defaults(func=clear_sip_bridge)

    return parser.parse_args(preprocess_argv())

preprocess_argv()

Preprocess argv to handle the ambiguous --http [URL] argument. If --http is followed by a URL (not a command/flag), convert it to --url URL. This allows supporting both '--http command' and '--http URL command'.

Source code in src/cli/main.py
def preprocess_argv():
    """
    Preprocess argv to handle the ambiguous --http [URL] argument.
    If --http is followed by a URL (not a command/flag), convert it to --url URL.
    This allows supporting both '--http command' and '--http URL command'.
    """
    argv = sys.argv[1:]
    new_argv = []
    i = 0
    commands = ["chat", "talk", "agents", "livekit", "test"]

    while i < len(argv):
        arg = argv[i]
        if arg == "--http":
            # Check if next argument is a value (URL) or a command/flag
            if i + 1 < len(argv):
                next_arg = argv[i + 1]
                if not next_arg.startswith("-") and next_arg not in commands:
                    # It's a URL! rewrite to --url
                    new_argv.append("--url")
                    new_argv.append(next_arg)
                    i += 2
                    continue
            # It's a flag (followed by command or nothing or another flag)
            new_argv.append("--http")
            i += 1
        else:
            new_argv.append(arg)
            i += 1
    return new_argv

run_chat_mode(args) async

Run agent in text-only mode

Source code in src/cli/main.py
async def run_chat_mode(args):
    """Run agent in text-only mode"""
    if args.http or args.url:
        return await run_chat_mode_http(args)
    else:
        return await run_chat_mode_local(args)

run_talk_mode(args) async

Run agent in full audio mode via LiveKit

Source code in src/cli/main.py
async def run_talk_mode(args):
    """Run agent in full audio mode via LiveKit"""
    if not args.agent_name:
        print("❌ Agent name is required. Use --agent-name <name>")
        return 1

    is_local = not (args.http or args.url)
    logging.info(f"Starting full audio mode for agent: {args.agent_name} (Local: {is_local})")

    base_url = get_base_url(args) or config.stimm_api_url

    try:
        agent_runner = AgentRunner(args.agent_name, args.room_name, verbose=args.verbose, is_local=is_local, base_url=base_url)
        await agent_runner.run()
    except KeyboardInterrupt:
        logging.info("Full mode interrupted by user")
    except Exception as e:
        logging.error(f"Error in full mode: {e}")
        return 1

    return 0

test_echo_pipeline(args) async

Test LiveKit audio pipeline with echo server and client

Source code in src/cli/main.py
async def test_echo_pipeline(args):
    """Test LiveKit audio pipeline with echo server and client"""
    import os
    import subprocess  # nosec B404
    import time

    logging.info("🚀 Starting LiveKit echo pipeline test")
    logging.info("This will start both echo server and client in parallel")
    logging.info("Speak into your microphone to hear yourself echoed back!")
    logging.info("Press Ctrl+C to stop both processes")

    server_process = None
    client_process = None
    tasks = []

    try:
        # Start echo server
        logging.info("🔄 Starting echo server...")
        server_process = subprocess.Popen(  # nosec B603
            [sys.executable, "-m", "src.cli.echo_server"],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,  # Line buffered
            env={**os.environ, "PYTHONUNBUFFERED": "1"},  # Force unbuffered output
        )

        # Wait a moment for server to start
        time.sleep(2)

        # Start echo client
        logging.info("🎧 Starting echo client...")
        client_process = subprocess.Popen(  # nosec B603
            [sys.executable, "-m", "src.cli.echo_client"],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            bufsize=1,  # Line buffered
            env={**os.environ, "PYTHONUNBUFFERED": "1"},  # Force unbuffered output
        )

        # Log output from both processes
        async def log_process_output(process, name):
            while True:
                if process.poll() is not None:
                    remaining_lines = process.stdout.readlines()
                    for line in remaining_lines:
                        if args.verbose:
                            logging.info(f"[{name}] {line.strip()}")
                    break

                line = await asyncio.get_event_loop().run_in_executor(None, process.stdout.readline)
                if line:
                    if args.verbose:
                        logging.info(f"[{name}] {line.strip()}")
                else:
                    if process.poll() is not None:
                        break
                    await asyncio.sleep(0.1)

        if args.verbose:
            tasks.append(asyncio.create_task(log_process_output(server_process, "SERVER")))
            tasks.append(asyncio.create_task(log_process_output(client_process, "CLIENT")))

        logging.info("✅ Echo pipeline running! Speak into your microphone to test.")
        logging.info("Press Ctrl+C to stop...")

        while True:
            if server_process.poll() is not None:
                logging.error("❌ Echo server crashed!")
                break
            if client_process.poll() is not None:
                logging.error("❌ Echo client crashed!")
                break
            await asyncio.sleep(1)

    except KeyboardInterrupt:
        logging.info("🛑 Stopping echo pipeline...")
    finally:
        if server_process and server_process.poll() is None:
            server_process.terminate()
        if client_process and client_process.poll() is None:
            client_process.terminate()

        if server_process:
            server_process.wait(timeout=5)
        if client_process:
            client_process.wait(timeout=5)

        for task in tasks:
            task.cancel()

        logging.info("✅ Echo pipeline stopped")
    return 0

options: show_root_heading: true show_source: true