Skip to content

Pipecat Bot Gateway

The Pipecat Gateway is Astradial's AI voice bot platform. It hosts conversational voice bots powered by Google Gemini Live and serves them over WebSocket connections from AstraPBX (phone calls) or browser clients.


What

The Pipecat Gateway is a FastAPI application that:

  • Accepts WebSocket connections from AstraPBX (phone calls) or browser clients
  • Authenticates callers using org-scoped API keys
  • Loads bot modules (Python flow definitions or JSON editor flows)
  • Runs Pipecat pipelines that process real-time bidirectional audio through an LLM
  • Supports multiple serializers: Twilio protocol (browser clients) and AstraPBX protocol (phone calls via AudioSocket)

Each bot is a conversational flow with nodes (states), transitions (functions), and actions. The LLM (Gemini Live) handles natural language understanding and voice synthesis natively.


Why

Traditional IVR systems use rigid DTMF menus ("Press 1 for..."). Pipecat bots provide:

  • Natural voice conversation — callers speak normally, the AI understands intent
  • Gemini Live native voice — no separate STT/TTS services, lower latency
  • Dynamic flows — conversation state machines with function-calling transitions
  • Multi-tenant — each org has its own bots, API keys, and configurations
  • Hot-reloadable — bot modules reload on file change without restarting the server
  • Dual transport — same bot works from phone calls (AstraPBX) and browser (WebSocket client)

How It Works

System Architecture

graph TB
    subgraph "Phone Call Path"
        A[Caller] --> B[Asterisk PBX]
        B -->|AudioSocket TCP| C[AstraPBX Node.js Relay]
        C -->|Binary WebSocket PCM| D
    end

    subgraph "Browser Path"
        E[Browser Client] -->|WebSocket Twilio Protocol| D
    end

    subgraph "Pipecat Gateway"
        D[FastAPI WebSocket] --> F[Transport Detection]
        F -->|AstraPBX| G[AstraPBXSerializer]
        F -->|Twilio| H[TwilioFrameSerializer]
        G --> I[Pipecat Pipeline]
        H --> I
        I --> J[VAD - Silero]
        J --> K[LLM Context Aggregator]
        K --> L[Gemini Live LLM]
        L --> M[Output Transport]
    end

    subgraph "Bot Module"
        N[Flow Nodes] --> O[Function Schemas]
        O --> P[Transition Handlers]
    end

    I -.-> N

Request Flow

  1. WebSocket connection arrives at /ws/{org_id}/{bot_id}?key=<api_key>
  2. Authentication validates the API key against the org
  3. Bot lookup finds the bot by ID, loads its module or JSON flow
  4. WebSocket accepted and handed to run_bot_pipeline()
  5. Transport detection via parse_telephony_websocket() reads first two WS messages
  6. Serializer selection based on provider (astrapbx → binary PCM, otherwise → Twilio ulaw/JSON)
  7. Pipeline runs until the call ends

Pipeline Components

Input Transport → VAD → LLM Context Aggregator → Gemini Live LLM → Output Transport → Assistant Aggregator
Component Role
FastAPIWebsocketTransport Receives/sends audio via WebSocket
SileroVADAnalyzer Voice Activity Detection — detects when caller starts/stops speaking
LLMContextAggregator Accumulates user speech into LLM context
GeminiLiveLLMService Google Gemini Live — real-time voice LLM with native audio I/O
FlowManager Manages conversation state machine (nodes, transitions, functions)

Audio Serializers

Serializer Transport Format Use Case
AstraPBXSerializer Binary WebSocket Raw 16-bit PCM 8kHz Phone calls via AudioSocket
TwilioFrameSerializer Text WebSocket ulaw base64 JSON Browser clients, Twilio

The serializer is auto-detected from the WebSocket handshake. AstraPBX sends provider: "astrapbx" in the customParameters of the Twilio-format start message.


Bot Module Structure

Bots live in bots/<module_name>/__init__.py. Each module must export a create_welcome_node() function that returns the initial NodeConfig.

Key Concepts

Concept Description
NodeConfig A conversation state — has a name, role message, task messages, and available functions
FlowsFunctionSchema A function the LLM can call to transition between nodes
Handler An async function that processes function arguments and returns the next node
FlowResult Structured data collected during a flow (e.g., check-in details)
FlowManager Orchestrates node transitions, manages LLM context updates

Example: Hotel Concierge Bot

from pipecat_flows import FlowArgs, FlowsFunctionSchema, NodeConfig

CONCIERGE_ROLE = "You are an elite concierge at The Grand Astral..."

# Function handler — called when LLM decides to route to check-in
async def handle_route_checkin(args: FlowArgs) -> tuple[dict, NodeConfig]:
    return {"guest_name": args.get("guest_name", "")}, create_checkin_node()

# Function schema — tells the LLM when/how to call this function
route_checkin_schema = FlowsFunctionSchema(
    name="route_to_checkin",
    description="Guest wants to check in to the hotel",
    properties={
        "guest_name": {
            "type": "string",
            "description": "The guest's name if provided"
        }
    },
    required=["guest_name"],
    handler=handle_route_checkin,
)

# Welcome node — the entry point
def create_welcome_node() -> NodeConfig:
    return NodeConfig(
        name="welcome",
        role_message=CONCIERGE_ROLE,
        task_messages=[{
            "role": "user",
            "content": "Welcome the guest warmly. Ask how you may help."
        }],
        functions=[route_checkin_schema, route_checkout_schema, route_helpdesk_schema],
        respond_immediately=True,
    )

Conversation Flow Diagram

The hotel concierge bot has this flow structure:

graph TD
    W[Welcome] -->|route_to_checkin| CI[Check-in]
    W -->|route_to_checkout| CO[Check-out]
    W -->|route_to_helpdesk| HD[Helpdesk]

    CI -->|process_checkin| CIC[Check-in Confirm]
    CO -->|process_checkout| COC[Check-out Confirm]

    HD -->|route_to_enquiry| EN[Enquiry]
    HD -->|route_to_raise_issue| RI[Raise Issue]

    EN -->|another_enquiry| EN
    EN -->|route_to_raise_issue| RI
    EN -->|end_call| END[Farewell]

    RI -->|submit_issue| RIC[Issue Confirm]

    CIC -->|guest_needs_more_help| HD
    CIC -->|end_call| END
    COC -->|guest_needs_more_help| HD
    COC -->|end_call| END
    RIC -->|guest_needs_more_help| HD
    RIC -->|end_call| END

Gateway Configuration

Directory Structure

pipecat-flow/
  gateway/
    main.py                  # FastAPI app, CORS, lifespan
    config.py                # Environment config (host, port, admin key)
    router_ws.py             # WebSocket endpoint /ws/{org_id}/{bot_id}
    router_admin.py          # Admin API for managing bots/orgs
    pipeline.py              # Pipecat pipeline setup
    astrapbx_serializer.py   # Custom PCM serializer for AstraPBX
    bot_loader.py            # Hot-reloading bot module loader
    flow_converter.py        # JSON editor flow → NodeConfig converter
    auth.py                  # API key validation
    database.py              # SQLite database (gateway.db)
    models.py                # Database models
  bots/
    hotel_concierge/         # Example bot module
      __init__.py            # Bot flow definition

Environment Variables

Variable Default Description
GATEWAY_HOST 0.0.0.0 Server bind address
GATEWAY_PORT 7860 Server port
GATEWAY_ADMIN_KEY (none) Admin API authentication key
GOOGLE_API_KEY (none) Google AI API key (per-org in DB)

Running

cd pipecat-flow
uv run python -m gateway.main

The server starts on http://0.0.0.0:7860 and is exposed via Cloudflare at bots.astradial.com.


WebSocket API

Endpoint

ws://bots.astradial.com/ws/{org_id}/{bot_id}?key={api_key}

Handshake Protocol

After WebSocket connection, the client must send two messages to initialize the Twilio-compatible transport:

Message 1: Connected

{
  "event": "connected",
  "protocol": "Call",
  "version": "1.0.0"
}

Message 2: Start

{
  "event": "start",
  "start": {
    "streamSid": "unique_stream_id",
    "callSid": "unique_call_id",
    "customParameters": {
      "provider": "astrapbx"
    }
  }
}

Provider Detection

If customParameters.provider is "astrapbx", the gateway uses the AstraPBXSerializer (binary PCM). Otherwise, it uses the TwilioFrameSerializer (ulaw base64 JSON).

Audio Frames

After handshake, audio flows as:

  • AstraPBX mode: Binary WebSocket frames containing raw 16-bit signed linear PCM at 8kHz
  • Twilio mode: Text WebSocket frames with JSON {"event": "media", "media": {"payload": "<base64 ulaw>"}}

Creating a New Bot

1. Create the bot module

mkdir bots/my_bot

Create bots/my_bot/__init__.py:

from pipecat_flows import FlowArgs, FlowsFunctionSchema, NodeConfig

ROLE = "You are a helpful assistant for Acme Corp..."

async def handle_greeting(args: FlowArgs) -> tuple[dict, NodeConfig]:
    return {}, create_main_node()

greeting_schema = FlowsFunctionSchema(
    name="greet_customer",
    description="Customer has been greeted and wants to proceed",
    properties={},
    required=[],
    handler=handle_greeting,
)

def create_welcome_node() -> NodeConfig:
    return NodeConfig(
        name="welcome",
        role_message=ROLE,
        task_messages=[{
            "role": "user",
            "content": "Greet the customer and ask how you can help."
        }],
        functions=[greeting_schema],
        respond_immediately=True,
    )

def create_main_node() -> NodeConfig:
    # ... define your conversation flow
    pass

2. Register in the database

# Via admin API
curl -X POST 'http://localhost:7860/admin/bots' \
  -H 'Authorization: Bearer <admin_key>' \
  -H 'Content-Type: application/json' \
  -d '{
    "org_id": "<org_uuid>",
    "name": "My Bot",
    "module_path": "my_bot",
    "gemini_model": "gemini-3.1-flash-live-preview",
    "gemini_voice_id": "Kore"
  }'

3. Connect from AstraPBX

Create a user with routing_type: 'ai_agent' and the bot's WebSocket URL as routing_destination:

INSERT INTO users (org_id, username, extension, routing_type, routing_destination)
VALUES ('<org_id>', 'My Bot', '1003', 'ai_agent',
  'ws://bots.astradial.com/ws/<org_id>/<bot_id>?key=<api_key>');

Then regenerate the dialplan.


Gemini Live LLM

The gateway uses Gemini Live (gemini-3.1-flash-live-preview) for real-time voice conversations:

  • Native audio I/O — no separate STT/TTS services needed
  • Function calling — LLM calls functions to transition between flow nodes
  • Interruption support — users can interrupt the bot mid-sentence
  • Voice selection — configurable voice ID per bot (e.g., Kore, Puck, Charon)

VAD Settings

Voice Activity Detection uses Silero with these defaults:

Parameter Value Description
confidence 0.7 VAD confidence threshold
start_secs 0.2 Seconds of speech to trigger start
stop_secs 0.2 Seconds of silence to trigger stop
min_volume 0.6 Minimum audio volume threshold

Date

Documentation created: 2026-03-31