
Practical guide: open-source agent frameworks and ElevenAgents
Connecting open-source agent frameworks to ElevenLabs voice via Custom LLM.
In our previous post on Integrating External Agents with ElevenLabs Voice Orchestration, we outlined how teams can connect their existing text-based agent orchestration to ElevenLabs via the Custom LLM. Building on that foundation, this guide demonstrates how leading open-source agent frameworks can be adapted and deployed behind the Custom LLM interface. The result is a flexible architecture where voice is layered onto mature agent systems without compromising state management, tool orchestration, or application-specific control. Across frameworks, we follow the same three-step pattern regardless of framework: create a generation request, extract the final text response, and reformat it in an OpenAI-compatible Server-Sent Events (SSE) format. ElevenLabs supports both the Chat Completions and Responses formats. While this guide covers four widely adopted frameworks, the patterns generalize to any runtime that can produce OpenAI-compatible streaming output.
.webp&w=3840&q=95)
General setup
The examples in this section use Python and FastAPI, though any stack that handles HTTP POST requests and streaming SSE responses will work. When ElevenLabs' voice orchestration detects a likely turn end, it fires a generation request to the configured Custom LLM endpoint. This section walks through the core components of that translation layer, the bridge or proxy that makes the voice orchestration and agent framework speak the same language.
Understandably, each framework may be chosen by customers due to general familiarity or its ability to fulfill a specific purpose. LlamaIndex, for example, was originally developed to make setting up Retrieval-Augmented Generation (RAG) simpler whereas CrewAI was built for automating defined tasks in an era of agents. Different design goals produce different response structures, and each requires specific handling. Streaming chunks as the LLM generates them, rather than waiting for a complete turn, is critical in that it allows the Text-to-Speech (TTS) model to begin generating speech earlier, thus reducing perceived latency. We focus on the four popular frameworks, mainly LangGraph, Google ADK, CrewAI and LlamaIndex.
A note on shared code
Each framework must stream responses as OpenAI-compatible SSE chunks. We introduce a small helper function used across examples to construct these chunks.
def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:
payload = {
"id": response_id,
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return f"data: {json.dumps(payload)}\n\n"
With that foundation in place, let’s start with LangGraph.
LangGraph
LangGraph models agents as graphs, where nodes represent individual steps and edges define control flow between them. The minimal setup is straightforward: initialize a chat model, define agent tools, and create the agent graph runtime.
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
api_key=os.getenv("OPENAI_API_KEY"),
llm, tools=tool_list, system_prompt=system_prompt,)
For every generation request, the LangGraph Agent receives the full conversation history, which allows it to maintain the required state internally. LangGraph supports server-side persistence via Checkpoints, although we do not cover them here in an effort to keep the implementation minimal.
With state management handled, the next LangGraph-specific decision point is streaming mode where LangGraph provides two options, each suited to a different use case:
- stream_mode="values" gives graph state snapshots. It is simpler to implement but includes a fuller message state on each response, which adds latency in real-time conversational flows.
- stream_mode="messages" streams incremental message chunks from the model. This is generally preferred for realtime voice interactions, since it reduces time-to-first-audio in the ElevenLabs orchestration layer.
More specifically, the messages implementation of the agent loop includes intermediate steps such as tool calling updates, that should not be spoken aloud. The proxy filters these out, passing only user-facing response text to the TTS layer. We provide an example of a tool-enabled turn.
[1] Model decides to call a tool (tool_calls=["get_price"])
[2] Tool executes and returns data (result="$24.99")
[3] Model produces response using result (content="It costs $24.99")
Naturally, only the chunks from step 3 should be forwarded in the SSE stream. In practice, two guard checks handle this filtering in the streaming loop: one to keep only langgraph_node == "model" events, and one that skips empty content. Together, these checks ensure only user-facing assistant text is forwarded to ElevenLabs as SSE. Putting these concepts together, we provide a lightweight implementation of the request proxy.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
input = {"messages": req.messages}
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):
# Only forward model text chunks; skip tool updates and non-text events.
if metadata.get("langgraph_node") != "model":
continue
content = getattr(message_chunk, "content", None)
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
# Send incremental token-like chunks to ElevenLabs in OpenAI format.
yield sse_chunk(response_id, {"content": content})
# Signal natural completion before using the finish_reason: "stop" [DONE]
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
This ensures only user-facing model chunks are forwarded to ElevenLabs. Because LangGraph makes its internal tool execution visible through the state stream, filtering is explicit and controlled by the proxy.
Next, we dive into the nuances of working with Google’s Agent Development Kit (ADK)
Google ADK
Google's ADK abstracts the runtime loop behind a few core primitives: Agent, Runner, and SessionService. ADK’s Runner sits between the HTTP layer and agent definition. It handles message routing, tool orchestration, session lifecycle, and event streaming.
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types as genai_types
agent = Agent(
name=name,
model=model,
instruction=instruction,
tools=[tool_list],
)
session_service = InMemorySessionService()
runner = Runner(
agent=agent,
app_name=app_name,
session_service=session_service
)
With the agent, session backend, and runner initialized, the proxy resolves or creates an ADK session for each incoming request. In ADK, session_id controls memory persistence: reusing the same session_id across turns automatically carries forward history, tool calls, and prior responses. Since conversation identity lives upstream in ElevenLabs, the proxy handles this mapping explicitly. By passing the correct identifier for the generation request, the SDK is able to handle prior context internally. We pass the arbitrary identifier during conversation initiation through extra parameters passed to the body of the request.
With the message and session prepared, the runner can be invoked. Tool calls and tool results still appear as internal ADK events during execution, but they are treated as intermediary orchestration steps rather than user-facing output. This removes the need for a manual filter compared to frameworks where tool calls appear as user-visible text.
The handler below is a simplified implementation including session resolution and get-or-create logic inline.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request):
# In production, prefer a stable identifier from your upstream system.
session_id = req.elevenlabs_extra_body.arbitrary_identifier
session = await session_service.get_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
if not session:
session = await session_service.create_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")
content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if not event.content or not event.content.parts:
continue
# In SSE mode, ADK emits partial (incremental) and final (complete) events.
# Forwarding only partial events avoids duplicating the full text. # Note: SSE streaming is experimental in ADK. For production, reconcile
# both event types in case the model backend doesn't emit partials.
if not getattr(event, "partial", False):
continue
text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)
if not text:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": text})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Next, we’ll look at CrewAI, which is more task-centric by design.
CrewAI
CrewAI was designed to orchestrate multi-agent workflows around structured tasks (research, write, summarize) rather than open-ended dialogue loops. Agents are defined with a role, goal, and backstory. The execution is centered around Task objects each with a clear description and expected output.
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from crewai.types.streaming import StreamChunkType
llm = LLM(
model=model_id,
api_key=os.getenv("OPENAI_API_KEY")
)
store_agent = Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm,
verbose=False,
)
Unlike the agent-loop model used in LangGraph and ADK, CrewAI typically constructs Task and Crew per request to define the unit of work for that turn in the conversation. We carry conversational context forward by injecting prior turns into the next task via a placeholder. The {crew_chat_messages} variable is populated on each request with the running conversation history, then interpolated into the task description at execution time. We further aim to produce clean, speech-ready text, by explicitly filtering out intermediary tracing patterns (Thought, Action, Action Input, Observation) and emitting only final-answer text.
The handler below brings together per-request task construction, history interpolation, Crew-level streaming, trace filtering, and output formatting.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Task and Crew are assembled per request (not at startup).
task = Task(
description=(
"Conversation history:\n{crew_chat_messages}\n\n"
"Respond to the user's latest message."
),
expected_output=expected_output,
agent=store_agent,
)
# stream=True returns CrewStreamingOutput instead of a single CrewOutput.
crew = Crew(
agents=[store_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
stream=True,
)
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
final_marker = "final answer:"
marker_buffer = ""
marker_found = False
emitted_any_content = False
streaming = await crew.kickoff_async(
inputs={"crew_chat_messages": json.dumps(req.messages)}
)
async for chunk in streaming:
# Skip non-text events (e.g. tool calls).
if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:
continue
# Only forward text after the "Final Answer:" marker
if not marker_found:
marker_buffer += chunk.content
idx = marker_buffer.lower().find("final answer:")
if idx == -1:
continue
marker_found = True
content = marker_buffer[idx + 13:].lstrip()
marker_buffer = ""
else:
content = chunk.content
# Clean up any trailing markdown artifacts from CrewAI output.
content = content.rstrip("`").rstrip()
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": content})
# Fallback to handle short responses without the "Final Answer:" marker
if not sent_role:
raw = getattr(streaming, "result", None)
fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()
if fallback:
yield sse_chunk(response_id, {"role": "assistant"})
yield sse_chunk(response_id, {"content": fallback})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
Next, we look at LlamaIndex, which takes a different path focusing on a native event-driven streaming model.
LlamaIndex
Unlike the other frameworks covered in this post, LlamaIndex was designed to connect LLMs to external data sources (document stores, indexes, retrieval pipelines). Its agent layer, FunctionAgent, sits on top of that foundation to retrieve and reason over structured context, rather than open-dialogue or task execution.
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent, AgentStream
from llama_index.core.base.llms.types import ChatMessage, MessageRole
llm = OpenAI(
model=model,
api_key=os.getenv("OPENAI_API_KEY")
)
agent = FunctionAgent(
tools=[list_inventory, get_item_price],
llm=llm,
system_prompt=system_prompt,
)
To preserve conversational continuity, the proxy transforms incoming messages into LlamaIndex chat messages, then splits them into the latest user turn (user_msg) and prior turns (chat_history). Each AgentStream event's event.delta field contains the next text fragment, which maps directly into an OpenAI-style delta.content chunk. Non-empty deltas can be forwarded as-is, making this the most straightforward streaming bridge in the guide. The stream contains both orchestration events (tool calls, results) and speech events (assistant text deltas). To keep voice output clean, the proxy keeps only AgentStream events and skips empty deltas.
[1] AgentStream (delta='') ← ignored
[2] ToolCall ← ignored
[3] ToolCallResult ← ignored
[4] AgentStream (delta='It') ← forwarded ✓
[5] AgentStream (delta=' costs') ← forwarded ✓
[6] AgentStream (delta=' $49.99')← forwarded ✓
This separation keeps intermediary tool mechanics out of spoken output while preserving low-latency incremental speech. The drop-in handler below brings these steps together.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# This assumes the last message is always a user turn with string content.
# For production, add defensive role/content handling for non-text payloads.
chat_history = [
ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")
for m in req.messages
]
user_text = chat_history.pop().content
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
handler = agent.run(user_msg=user_text, chat_history=chat_history)
async for event in handler.stream_events():
if not isinstance(event, AgentStream):
continue
if not event.delta:
continue
yield sse_chunk(response_id, {"content": event.delta})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
LlamaIndex is less prescriptive about the end-to-end conversational runtime patterns than the frameworks with heavier built-in orchestration layers. For production deployments, this typically requires customers to implement session handling, response guardrails, tool orchestration, and tracing.
Conclusion
Each framework in this guide connects to ElevenLabs through the same contract: accept an OpenAI-style Completions or Responses request and stream back SSE chunks. This allows teams to layer voice orchestration on top of an existing agent implementation with minimal changes, thus preserving what they have already built while unlocking real-time conversational AI. This modularity is a core tenet of the ElevenAgents platform. Whether organizations are extending an existing agent or building voice-native from the start, ElevenAgent’s voice orchestration is built to meet them where they are.
If you are already running an agent with an open-source framework and want to enable voice, try this approach and let us know what you think.



