Pipecat Bot Gateway¶
The Pipecat Gateway is Astradial's AI voice bot platform. It hosts conversational voice bots powered by Google Gemini Live and serves them over WebSocket connections from AstraPBX (phone calls) or browser clients.
What¶
The Pipecat Gateway is a FastAPI application that:
- Accepts WebSocket connections from AstraPBX (phone calls) or browser clients
- Authenticates callers using org-scoped API keys
- Loads bot modules (Python flow definitions or JSON editor flows)
- Runs Pipecat pipelines that process real-time bidirectional audio through an LLM
- Supports multiple serializers: Twilio protocol (browser clients) and AstraPBX protocol (phone calls via AudioSocket)
Each bot is a conversational flow with nodes (states), transitions (functions), and actions. The LLM (Gemini Live) handles natural language understanding and voice synthesis natively.
Why¶
Traditional IVR systems use rigid DTMF menus ("Press 1 for..."). Pipecat bots provide:
- Natural voice conversation — callers speak normally, the AI understands intent
- Gemini Live native voice — no separate STT/TTS services, lower latency
- Dynamic flows — conversation state machines with function-calling transitions
- Multi-tenant — each org has its own bots, API keys, and configurations
- Hot-reloadable — bot modules reload on file change without restarting the server
- Dual transport — same bot works from phone calls (AstraPBX) and browser (WebSocket client)
How It Works¶
System Architecture¶
graph TB
subgraph "Phone Call Path"
A[Caller] --> B[Asterisk PBX]
B -->|AudioSocket TCP| C[AstraPBX Node.js Relay]
C -->|Binary WebSocket PCM| D
end
subgraph "Browser Path"
E[Browser Client] -->|WebSocket Twilio Protocol| D
end
subgraph "Pipecat Gateway"
D[FastAPI WebSocket] --> F[Transport Detection]
F -->|AstraPBX| G[AstraPBXSerializer]
F -->|Twilio| H[TwilioFrameSerializer]
G --> I[Pipecat Pipeline]
H --> I
I --> J[VAD - Silero]
J --> K[LLM Context Aggregator]
K --> L[Gemini Live LLM]
L --> M[Output Transport]
end
subgraph "Bot Module"
N[Flow Nodes] --> O[Function Schemas]
O --> P[Transition Handlers]
end
I -.-> N Request Flow¶
- WebSocket connection arrives at
/ws/{org_id}/{bot_id}?key=<api_key> - Authentication validates the API key against the org
- Bot lookup finds the bot by ID, loads its module or JSON flow
- WebSocket accepted and handed to
run_bot_pipeline() - Transport detection via
parse_telephony_websocket()reads first two WS messages - Serializer selection based on provider (
astrapbx→ binary PCM, otherwise → Twilio ulaw/JSON) - Pipeline runs until the call ends
Pipeline Components¶
Input Transport → VAD → LLM Context Aggregator → Gemini Live LLM → Output Transport → Assistant Aggregator
| Component | Role |
|---|---|
| FastAPIWebsocketTransport | Receives/sends audio via WebSocket |
| SileroVADAnalyzer | Voice Activity Detection — detects when caller starts/stops speaking |
| LLMContextAggregator | Accumulates user speech into LLM context |
| GeminiLiveLLMService | Google Gemini Live — real-time voice LLM with native audio I/O |
| FlowManager | Manages conversation state machine (nodes, transitions, functions) |
Audio Serializers¶
| Serializer | Transport | Format | Use Case |
|---|---|---|---|
| AstraPBXSerializer | Binary WebSocket | Raw 16-bit PCM 8kHz | Phone calls via AudioSocket |
| TwilioFrameSerializer | Text WebSocket | ulaw base64 JSON | Browser clients, Twilio |
The serializer is auto-detected from the WebSocket handshake. AstraPBX sends provider: "astrapbx" in the customParameters of the Twilio-format start message.
Bot Module Structure¶
Bots live in bots/<module_name>/__init__.py. Each module must export a create_welcome_node() function that returns the initial NodeConfig.
Key Concepts¶
| Concept | Description |
|---|---|
| NodeConfig | A conversation state — has a name, role message, task messages, and available functions |
| FlowsFunctionSchema | A function the LLM can call to transition between nodes |
| Handler | An async function that processes function arguments and returns the next node |
| FlowResult | Structured data collected during a flow (e.g., check-in details) |
| FlowManager | Orchestrates node transitions, manages LLM context updates |
Example: Hotel Concierge Bot¶
from pipecat_flows import FlowArgs, FlowsFunctionSchema, NodeConfig
CONCIERGE_ROLE = "You are an elite concierge at The Grand Astral..."
# Function handler — called when LLM decides to route to check-in
async def handle_route_checkin(args: FlowArgs) -> tuple[dict, NodeConfig]:
return {"guest_name": args.get("guest_name", "")}, create_checkin_node()
# Function schema — tells the LLM when/how to call this function
route_checkin_schema = FlowsFunctionSchema(
name="route_to_checkin",
description="Guest wants to check in to the hotel",
properties={
"guest_name": {
"type": "string",
"description": "The guest's name if provided"
}
},
required=["guest_name"],
handler=handle_route_checkin,
)
# Welcome node — the entry point
def create_welcome_node() -> NodeConfig:
return NodeConfig(
name="welcome",
role_message=CONCIERGE_ROLE,
task_messages=[{
"role": "user",
"content": "Welcome the guest warmly. Ask how you may help."
}],
functions=[route_checkin_schema, route_checkout_schema, route_helpdesk_schema],
respond_immediately=True,
)
Conversation Flow Diagram¶
The hotel concierge bot has this flow structure:
graph TD
W[Welcome] -->|route_to_checkin| CI[Check-in]
W -->|route_to_checkout| CO[Check-out]
W -->|route_to_helpdesk| HD[Helpdesk]
CI -->|process_checkin| CIC[Check-in Confirm]
CO -->|process_checkout| COC[Check-out Confirm]
HD -->|route_to_enquiry| EN[Enquiry]
HD -->|route_to_raise_issue| RI[Raise Issue]
EN -->|another_enquiry| EN
EN -->|route_to_raise_issue| RI
EN -->|end_call| END[Farewell]
RI -->|submit_issue| RIC[Issue Confirm]
CIC -->|guest_needs_more_help| HD
CIC -->|end_call| END
COC -->|guest_needs_more_help| HD
COC -->|end_call| END
RIC -->|guest_needs_more_help| HD
RIC -->|end_call| END Gateway Configuration¶
Directory Structure¶
pipecat-flow/
gateway/
main.py # FastAPI app, CORS, lifespan
config.py # Environment config (host, port, admin key)
router_ws.py # WebSocket endpoint /ws/{org_id}/{bot_id}
router_admin.py # Admin API for managing bots/orgs
pipeline.py # Pipecat pipeline setup
astrapbx_serializer.py # Custom PCM serializer for AstraPBX
bot_loader.py # Hot-reloading bot module loader
flow_converter.py # JSON editor flow → NodeConfig converter
auth.py # API key validation
database.py # SQLite database (gateway.db)
models.py # Database models
bots/
hotel_concierge/ # Example bot module
__init__.py # Bot flow definition
Environment Variables¶
| Variable | Default | Description |
|---|---|---|
GATEWAY_HOST | 0.0.0.0 | Server bind address |
GATEWAY_PORT | 7860 | Server port |
GATEWAY_ADMIN_KEY | (none) | Admin API authentication key |
GOOGLE_API_KEY | (none) | Google AI API key (per-org in DB) |
Running¶
The server starts on http://0.0.0.0:7860 and is exposed via Cloudflare at bots.astradial.com.
WebSocket API¶
Endpoint¶
Handshake Protocol¶
After WebSocket connection, the client must send two messages to initialize the Twilio-compatible transport:
Message 1: Connected
Message 2: Start
{
"event": "start",
"start": {
"streamSid": "unique_stream_id",
"callSid": "unique_call_id",
"customParameters": {
"provider": "astrapbx"
}
}
}
Provider Detection
If customParameters.provider is "astrapbx", the gateway uses the AstraPBXSerializer (binary PCM). Otherwise, it uses the TwilioFrameSerializer (ulaw base64 JSON).
Audio Frames¶
After handshake, audio flows as:
- AstraPBX mode: Binary WebSocket frames containing raw 16-bit signed linear PCM at 8kHz
- Twilio mode: Text WebSocket frames with JSON
{"event": "media", "media": {"payload": "<base64 ulaw>"}}
Creating a New Bot¶
1. Create the bot module¶
Create bots/my_bot/__init__.py:
from pipecat_flows import FlowArgs, FlowsFunctionSchema, NodeConfig
ROLE = "You are a helpful assistant for Acme Corp..."
async def handle_greeting(args: FlowArgs) -> tuple[dict, NodeConfig]:
return {}, create_main_node()
greeting_schema = FlowsFunctionSchema(
name="greet_customer",
description="Customer has been greeted and wants to proceed",
properties={},
required=[],
handler=handle_greeting,
)
def create_welcome_node() -> NodeConfig:
return NodeConfig(
name="welcome",
role_message=ROLE,
task_messages=[{
"role": "user",
"content": "Greet the customer and ask how you can help."
}],
functions=[greeting_schema],
respond_immediately=True,
)
def create_main_node() -> NodeConfig:
# ... define your conversation flow
pass
2. Register in the database¶
# Via admin API
curl -X POST 'http://localhost:7860/admin/bots' \
-H 'Authorization: Bearer <admin_key>' \
-H 'Content-Type: application/json' \
-d '{
"org_id": "<org_uuid>",
"name": "My Bot",
"module_path": "my_bot",
"gemini_model": "gemini-3.1-flash-live-preview",
"gemini_voice_id": "Kore"
}'
3. Connect from AstraPBX¶
Create a user with routing_type: 'ai_agent' and the bot's WebSocket URL as routing_destination:
INSERT INTO users (org_id, username, extension, routing_type, routing_destination)
VALUES ('<org_id>', 'My Bot', '1003', 'ai_agent',
'ws://bots.astradial.com/ws/<org_id>/<bot_id>?key=<api_key>');
Then regenerate the dialplan.
Gemini Live LLM¶
The gateway uses Gemini Live (gemini-3.1-flash-live-preview) for real-time voice conversations:
- Native audio I/O — no separate STT/TTS services needed
- Function calling — LLM calls functions to transition between flow nodes
- Interruption support — users can interrupt the bot mid-sentence
- Voice selection — configurable voice ID per bot (e.g.,
Kore,Puck,Charon)
VAD Settings¶
Voice Activity Detection uses Silero with these defaults:
| Parameter | Value | Description |
|---|---|---|
confidence | 0.7 | VAD confidence threshold |
start_secs | 0.2 | Seconds of speech to trigger start |
stop_secs | 0.2 | Seconds of silence to trigger stop |
min_volume | 0.6 | Minimum audio volume threshold |
Date¶
Documentation created: 2026-03-31