
Praktisk guide: open source-agentramverk och ElevenAgents
Koppla open source-agentramverk till ElevenLabs-röst via Custom LLM.
I vårt tidigare inlägg om Integrera externa agenter med ElevenLabs Voice Orchestration beskrev vi hur team kan koppla sin befintliga textbaserade agentorkestrering till ElevenLabs via Anpassad LLM. Med det som grund visar vi här hur ledande open source-agentramverk kan anpassas och användas bakom Custom LLM-gränssnittet. Resultatet är en flexibel arkitektur där röst läggs på etablerade agentsystem utan att tumma på tillståndshantering, verktygsorkestrering eller applikationsspecifik kontroll. Oavsett ramverk följer vi samma trestegsmönster: skapa en generation request, extrahera det slutgiltiga textsvaret och formatera det i OpenAI-kompatibelt Server-Sent Events (SSE)-format. ElevenLabs stödjer både Chat-kompletteringaroch Svar-formaten. Även om guiden täcker fyra populära ramverk kan mönstren användas för alla miljöer som kan producera OpenAI-kompatibel strömmande output.
.webp&w=3840&q=95)
Allmän setup
Exemplen i det här avsnittet använder Python och FastAPI, men vilken stack som helst som hanterar HTTP POST och strömmande SSE-svar fungerar. När ElevenLabs voice orchestration upptäcker att en tur troligen är slut skickas en generation request till den konfigurerade Custom LLM-endpointen. Här går vi igenom kärnkomponenterna i det översättningslagret – bron eller proxyn som gör att voice orchestration och agentramverket kan prata med varandra.
Olika ramverk väljs förstås av kunder beroende på vana eller förmåga att lösa ett visst behov. LlamaIndex togs till exempel fram för att förenkla Retrieval-Augmented Generation (RAG), medan CrewAI byggdes för att automatisera definierade uppgifter i agenternas tidsålder. Olika designmål ger olika svarstrukturer, och varje ramverk kräver särskild hantering. Att strömma ut textbitar medan LLM:en genererar dem, istället för att vänta på ett helt svar, är viktigt eftersom Text to Speech (TTS)-modellen då kan börja läsa upp tidigare och därmed minska upplevd fördröjning. Vi fokuserar på fyra populära ramverk: LangGraph, Google ADK, CrewAI och LlamaIndex.
Om delad kod
Varje ramverk måste strömma svar som OpenAI-kompatibla SSE-bitar. Vi introducerar en liten hjälpfunktion som används i exemplen för att bygga dessa bitar.
def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:
payload = {
"id": response_id,
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return f"data: {json.dumps(payload)}\n\n"
Med det på plats börjar vi med LangGraph.
LangGraph
LangGraph modellerar agenter som grafer där noder är enskilda steg och kanter styr flödet mellan dem. Grundinställningen är enkel: initiera en chat-modell, definiera agentverktyg och skapa agentgrafens runtime.
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
api_key=os.getenv("OPENAI_API_KEY"),
llm, tools=tool_list, system_prompt=system_prompt,)
Vid varje generation request får LangGraph-agenten hela konversationshistoriken, vilket gör att den kan hålla rätt tillstånd internt. LangGraph stödjer serverlagring via Checkpoints, men vi går inte igenom det här för att hålla det enkelt.
När tillståndshanteringen är löst är nästa LangGraph-specifika val strömningsläge, där LangGraph erbjuder två alternativ beroende på användningsområde:
- stream_mode="values" ger ögonblicksbilder av grafens tillstånd. Det är enklare att implementera men skickar med hela meddelandetillståndet varje gång, vilket ökar fördröjningen i realtidsdialoger.
- stream_mode="messages" strömmar ut inkrementella meddelandebitar från modellen. Det är oftast bäst för röstinteraktion i realtid, eftersom det minskar tiden till första ljud i ElevenLabs-orkestreringen.
Mer specifikt innehåller messages-implementeringen av agentloopen mellanliggande steg, som verktygsanrop, som inte ska läsas upp. Proxyn filtrerar bort dessa och skickar bara svarstext till TTS-lagret. Här är ett exempel på en tur med verktygsanrop.
[1] Modellen väljer att anropa ett verktyg (tool_calls=["get_price"])
[2] Verktyget körs och returnerar data (result="$24.99")
[3] Modellen ger svar med resultatet (content="It costs $24.99")
Endast bitarna från steg 3 ska skickas vidare i SSE-strömmen. I praktiken hanteras detta av två kontroller i strömningsloopen: en som bara släpper igenom langgraph_node == "model"-händelser, och en som hoppar över tomt innehåll. Tillsammans ser de till att bara svarstext till användaren skickas till ElevenLabs som SSE. Här är en enkel implementation av proxyn.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
input = {"messages": req.messages}
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):
# Skicka bara vidare modellens textbitar; hoppa över verktygsuppdateringar och icke-text.
if metadata.get("langgraph_node") != "model":
continue
content = getattr(message_chunk, "content", None)
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
# Skicka inkrementella textbitar till ElevenLabs i OpenAI-format.
yield sse_chunk(response_id, {"content": content})
# Signalera naturligt avslut innan finish_reason: "stop" [DONE]
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Detta gör att bara modellens svar till användaren skickas till ElevenLabs. Eftersom LangGraph visar sina interna verktygsanrop i tillståndsströmmen är filtreringen tydlig och styrs av proxyn.
Nu går vi vidare till detaljerna kring Google Agent Development Kit (ADK)
Google ADK
Googles ADK abstraherar runtime-loopen med några kärnkomponenter: Agent, Runner och SessionService. ADK:s Runner sitter mellan HTTP-lagret och agentdefinitionen. Den hanterar meddelanderouting, verktygsorkestrering, sessioner och händelseströmning.
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types as genai_types
agent = Agent(
name=name,
model=model,
instruction=instruction,
tools=[tool_list],
)
session_service = InMemorySessionService()
runner = Runner(
agent=agent,
app_name=app_name,
session_service=session_service
)
Med agent, session-backend och runner på plats löser proxyn eller skapar en ADK-session för varje inkommande request. I ADK styr session_id minneshanteringen: om samma session_id återanvänds mellan turer sparas automatiskt historik, verktygsanrop och tidigare svar. Eftersom konversationsidentiteten finns uppströms i ElevenLabs hanterar proxyn denna koppling. Genom att skicka rätt identifierare för generation request kan SDK:n hantera tidigare kontext internt. Vi skickar den godtyckliga identifieraren vid konversationsstart via extra parametrar som skickas i requestens body.
När meddelandet och sessionen är förberedda kan runnern köras. Verktygsanrop och resultat syns fortfarande som interna ADK-händelser under körning, men de behandlas som mellanliggande steg och inte som svar till användaren. Det gör att man slipper manuell filtrering jämfört med ramverk där verktygsanrop syns som text.
Handlern nedan är en förenklad implementation med sessionhantering och get-or-create-logik direkt i koden.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request):
# I produktion, använd en stabil identifierare från ditt uppströmsystem.
session_id = req.elevenlabs_extra_body.arbitrary_identifier
session = await session_service.get_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
if not session:
session = await session_service.create_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")
content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if not event.content or not event.content.parts:
continue
# I SSE-läge skickar ADK ut både partial (inkrementella) och final (slutgiltiga) händelser.
# Skicka bara partial events för att undvika dubblerad text. # Obs: SSE-streaming är experimentellt i ADK. I produktion, hantera
# båda händelsetyperna om modellbackend inte skickar partials.
if not getattr(event, "partial", False):
continue
text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)
if not text:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": text})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Nu tittar vi på CrewAI, som är mer uppgiftsfokuserad i sin design.
CrewAI
CrewAI är byggt för att orkestrera multi-agent-workflows kring strukturerade uppgifter (research, skriva, sammanfatta) snarare än öppna dialoger. Agenter definieras med roll, mål och bakgrund. Utförandet kretsar kring Task-objekt, var och en med tydlig beskrivning och förväntat resultat.
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from crewai.types.streaming import StreamChunkType
llm = LLM(
model=model_id,
api_key=os.getenv("OPENAI_API_KEY")
)
store_agent = Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm,
verbose=False,
)
Till skillnad från agentloop-modellen i LangGraph och ADK skapar CrewAI vanligtvis Task och Crew per request för att definiera arbetsenheten för den turen. Vi för över kontext mellan turer genom att lägga in tidigare meddelanden i nästa task via en variabel. Variabeln {crew_chat_messages} fylls på varje request med konversationshistoriken och läggs in i taskbeskrivningen vid körning. För att få ren, talvänlig text filtrerar vi dessutom bort mellanliggande spårningsmönster (Thought, Action, Action Input, Observation) och skickar bara slutsvaret.
Handlern nedan samlar ihop task per request, historikinjektion, Crew-nivåns streaming, spårningsfiltrering och outputformatering.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Task och Crew sätts ihop per request (inte vid start).
task = Task(
description=(
"Konversationshistorik:\n{crew_chat_messages}\n\n"
"Svara på användarens senaste meddelande."
),
expected_output=expected_output,
agent=store_agent,
)
# stream=True ger CrewStreamingOutput istället för ett CrewOutput.
crew = Crew(
agents=[store_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
stream=True,
)
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
final_marker = "final answer:"
marker_buffer = ""
marker_found = False
emitted_any_content = False
streaming = await crew.kickoff_async(
inputs={"crew_chat_messages": json.dumps(req.messages)}
)
async for chunk in streaming:
# Hoppa över icke-text (t.ex. verktygsanrop).
if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:
continue
# Skicka bara text efter "Final Answer:"-markören
if not marker_found:
marker_buffer += chunk.content
idx = marker_buffer.lower().find("final answer:")
if idx == -1:
continue
marker_found = True
content = marker_buffer[idx + 13:].lstrip()
marker_buffer = ""
else:
content = chunk.content
# Ta bort eventuella markdown-rester från CrewAI-output.
content = content.rstrip("`").rstrip()
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": content})
# Fallback för korta svar utan "Final Answer:"-markör
if not sent_role:
raw = getattr(streaming, "result", None)
fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()
if fallback:
yield sse_chunk(response_id, {"role": "assistant"})
yield sse_chunk(response_id, {"content": fallback})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
Nu tittar vi på LlamaIndex, som har ett annat fokus med en händelsedriven strömningsmodell.
LlamaIndex
Till skillnad från de andra ramverken här är LlamaIndex byggt för att koppla LLM:er till externa datakällor (dokumentlager, index, sökrör). Agentlagret, FunctionAgent, ligger ovanpå och hämtar och resonerar över strukturerad kontext snarare än att hantera öppen dialog eller uppgifter.
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent, AgentStream
from llama_index.core.base.llms.types import ChatMessage, MessageRole
llm = OpenAI(
model=model,
api_key=os.getenv("OPENAI_API_KEY")
)
agent = FunctionAgent(
tools=[list_inventory, get_item_price],
llm=llm,
system_prompt=system_prompt,
)
För att bevara kontexten omvandlar proxyn inkommande meddelanden till LlamaIndex chat-meddelanden, och delar upp dem i senaste användartur (user_msg) och tidigare turer (chat_history). Varje AgentStream-events event.delta innehåller nästa textbit, som direkt kan skickas vidare som en OpenAI-liknande delta.content. Icke-tomma deltas kan skickas som de är, vilket gör detta till den enklaste strömningsbron i guiden. Strömmen innehåller både orkestreringshändelser (verktygsanrop, resultat) och talhändelser (assistentens textbitar). För att hålla röstutdata ren skickar proxyn bara AgentStream-händelser och hoppar över tomma deltas.
[1] AgentStream (delta='') ← ignoreras
[2] ToolCall ← ignoreras
[3] ToolCallResult ← ignoreras
[4] AgentStream (delta='It') ← skickas ✓
[5] AgentStream (delta=' costs') ← skickas ✓
[6] AgentStream (delta=' $49.99')← skickas ✓
Denna separation håller mellanliggande verktygslogik borta från talet och ger låg fördröjning. Handlern nedan visar stegen tillsammans.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Antar att sista meddelandet alltid är en användartur med text.
# I produktion, lägg till hantering för andra roller/innehållstyper.
chat_history = [
ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")
for m in req.messages
]
user_text = chat_history.pop().content
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
handler = agent.run(user_msg=user_text, chat_history=chat_history)
async for event in handler.stream_events():
if not isinstance(event, AgentStream):
continue
if not event.delta:
continue
yield sse_chunk(response_id, {"content": event.delta})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
LlamaIndex är mindre styrande kring hela konversationsflödet än ramverk med mer inbyggd orkestrering. I produktion behöver kunder ofta själva bygga sessionhantering, svarskontroller, verktygsorkestrering och spårning.
Slutsats
Alla ramverk i denna guide kopplas till ElevenLabs på samma sätt: ta emot en OpenAI-liknande Completions- eller Responses-request och strömma tillbaka SSE-bitar. Det gör att team kan lägga på voice orchestration ovanpå sin befintliga agentlösning med minimala ändringar, så att det de redan byggt kan användas vidare – nu med realtidskonversation via AI. Denna modularitet är en grundpelare i ElevenAgents. Oavsett om du bygger vidare på en befintlig agent eller skapar voice-native från början är ElevenAgents voice orchestration byggd för att möta dig där du är.
Om du redan kör en agent med ett open source-ramverk och vill lägga till röst, testa gärna detta och berätta vad du tycker.



