
Praxisleitfaden: Open-Source-Agenten-Frameworks und ElevenAgents
Anbindung von Open-Source-Agenten-Frameworks an ElevenLabs-Stimme über Custom LLM.
In unserem vorherigen Beitrag zu Integration externer Agenten mit ElevenLabs Voice Orchestration haben wir beschrieben, wie Teams ihre bestehende textbasierte Agenten-Orchestrierung über das Individuelles LLM mit ElevenLabs verbinden können. Aufbauend darauf zeigt dieser Leitfaden, wie führende Open-Source-Agenten-Frameworks hinter der Custom LLM-Schnittstelle angepasst und bereitgestellt werden können. Das Ergebnis ist eine flexible Architektur, bei der Sprache auf ausgereifte Agentensysteme aufgesetzt wird, ohne das Zustandsmanagement, die Tool-Orchestrierung oder die anwendungsspezifische Steuerung zu beeinträchtigen. Framework-übergreifend folgen wir immer demselben Drei-Schritte-Muster: Generierungsanfrage erstellen, finale Textantwort extrahieren und im OpenAI-kompatiblen Server-Sent Events (SSE)-Format umwandeln. ElevenLabs unterstützt sowohl das Chat-Antwortenals auch das Antworten-Format. Dieser Leitfaden behandelt vier weit verbreitete Frameworks, das Muster lässt sich jedoch auf jede Laufzeit übertragen, die OpenAI-kompatiblen Streaming-Output erzeugen kann.
.webp&w=3840&q=95)
Allgemeine Einrichtung
Die Beispiele in diesem Abschnitt verwenden Python und FastAPI, aber jeder Stack, der HTTP-POST-Anfragen und Streaming-SSE-Antworten verarbeiten kann, ist geeignet. Erkennt die ElevenLabs Voice Orchestration ein wahrscheinliches Gesprächsende, sendet sie eine Generierungsanfrage an den konfigurierten Custom LLM-Endpunkt. Dieser Abschnitt erläutert die Kernkomponenten dieser Übersetzungsschicht – die Brücke oder den Proxy, der Voice Orchestration und Agenten-Framework miteinander verbindet.
Jedes Framework wird verständlicherweise von Kunden entweder aus Gewohnheit oder wegen spezieller Anforderungen gewählt. LlamaIndex wurde beispielsweise entwickelt, um Retrieval-Augmented Generation (RAG) zu vereinfachen, während CrewAI für die Automatisierung definierter Aufgaben konzipiert wurde. Unterschiedliche Designziele führen zu unterschiedlichen Antwortstrukturen, die jeweils spezifisch behandelt werden müssen. Das Streamen von Chunks, sobald das LLM sie generiert, statt auf eine vollständige Antwort zu warten, ist entscheidend, da das Text-to-Speech (TTS)-Modell so früher mit der Sprachausgabe beginnen kann und die wahrgenommene Latenz sinkt. Wir konzentrieren uns auf die vier populären Frameworks: LangGraph, Google ADK, CrewAI und LlamaIndex.
Hinweis zum gemeinsamen Code
Jedes Framework muss Antworten als OpenAI-kompatible SSE-Chunks streamen. Wir stellen eine kleine Hilfsfunktion vor, die in allen Beispielen zur Erstellung dieser Chunks verwendet wird.
def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:
payload = {
"id": response_id,
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return f"data: {json.dumps(payload)}\n\n"
Mit dieser Grundlage beginnen wir mit LangGraph.
LangGraph
LangGraph modelliert Agenten als Graphen, wobei Knoten einzelne Schritte und Kanten den Kontrollfluss zwischen ihnen darstellen. Die minimale Einrichtung ist einfach: Chat-Modell initialisieren, Agenten-Tools definieren und die Agenten-Graph-Laufzeit erstellen.
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
api_key=os.getenv("OPENAI_API_KEY"),
llm, tools=tool_list, system_prompt=system_prompt,)
Bei jeder Generierungsanfrage erhält der LangGraph-Agent den gesamten Gesprächsverlauf und kann so den benötigten Zustand intern verwalten. LangGraph unterstützt serverseitige Persistenz über Checkpoints, die wir hier jedoch nicht behandeln, um die Implementierung schlank zu halten.
Mit dem Zustandsmanagement als Basis folgt die nächste LangGraph-spezifische Entscheidung: der Streaming-Modus. LangGraph bietet zwei Optionen, die jeweils für unterschiedliche Anwendungsfälle geeignet sind:
- stream_mode="values" liefert Snapshots des Graph-Zustands. Die Implementierung ist einfacher, aber jede Antwort enthält den vollständigen Nachrichtenstatus, was die Latenz bei Echtzeitgesprächen erhöht.
- stream_mode="messages" streamt inkrementelle Nachrichten-Chunks aus dem Modell. Dies ist für Echtzeit-Sprachinteraktionen meist vorzuziehen, da die Zeit bis zur ersten Audioausgabe in der ElevenLabs-Orchestrierung reduziert wird.
Im Detail enthält die Nachrichten-Implementierung der Agenten-Schleife Zwischenschritte wie Tool-Updates, die nicht gesprochen werden sollen. Der Proxy filtert diese heraus und gibt nur benutzerrelevanten Antworttext an die TTS-Schicht weiter. Hier ein Beispiel für eine Tool-gestützte Runde.
[1] Modell entscheidet, ein Tool aufzurufen (tool_calls=["get_price"])
[2] Tool wird ausgeführt und liefert Daten zurück (result="$24.99")
[3] Modell erzeugt Antwort mit dem Ergebnis (content="It costs $24.99")
Nur die Chunks aus Schritt 3 sollten im SSE-Stream weitergeleitet werden. In der Praxis übernehmen zwei Prüfungen diese Filterung in der Streaming-Schleife: Eine prüft, ob langgraph_node == "model" ist, die andere überspringt leere Inhalte. So wird sichergestellt, dass nur benutzerrelevanter Assistententext als SSE an ElevenLabs weitergegeben wird. Im Folgenden eine schlanke Proxy-Implementierung.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
input = {"messages": req.messages}
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):
# Nur Modell-Text-Chunks weiterleiten; Tool-Updates und Nicht-Text-Ereignisse überspringen.
if metadata.get("langgraph_node") != "model":
continue
content = getattr(message_chunk, "content", None)
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
# Inkrementelle Token-Chunks im OpenAI-Format an ElevenLabs senden.
yield sse_chunk(response_id, {"content": content})
# Natürliches Ende signalisieren, bevor finish_reason: "stop" [DONE] verwendet wird
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
So werden nur benutzerrelevante Modell-Chunks an ElevenLabs weitergeleitet. Da LangGraph die interne Tool-Ausführung im Zustandsstream sichtbar macht, ist das Filtern explizit und wird vom Proxy gesteuert.
Als Nächstes betrachten wir die Besonderheiten der Arbeit mit Googles Agent Development Kit (ADK).
Google ADK
Googles ADK abstrahiert die Laufzeitschleife über wenige Kernkomponenten: Agent, Runner und SessionService. Der Runner sitzt zwischen HTTP-Schicht und Agentendefinition. Er übernimmt Nachrichtenrouting, Tool-Orchestrierung, Sitzungsverwaltung und Event-Streaming.
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types as genai_types
agent = Agent(
name=name,
model=model,
instruction=instruction,
tools=[tool_list],
)
session_service = InMemorySessionService()
runner = Runner(
agent=agent,
app_name=app_name,
session_service=session_service
)
Mit Agent, Sitzungs-Backend und Runner initialisiert, löst der Proxy für jede eingehende Anfrage eine ADK-Session auf oder erstellt sie. In ADK steuert session_id die Speicherpersistenz: Wird dieselbe session_id über mehrere Runden verwendet, bleiben Verlauf, Tool-Aufrufe und vorherige Antworten erhalten. Da die Gesprächsidentität upstream bei ElevenLabs liegt, übernimmt der Proxy diese Zuordnung explizit. Durch die Übergabe des richtigen Identifikators bei der Generierungsanfrage kann das SDK den Kontext intern verwalten. Der beliebige Identifikator wird beim Start des Gesprächs über zusätzliche Parameter im Request-Body übergeben.
Mit vorbereiteter Nachricht und Session kann der Runner aufgerufen werden. Tool-Aufrufe und -Ergebnisse erscheinen weiterhin als interne ADK-Events, werden aber als Zwischenschritte behandelt und nicht als benutzerrelevante Ausgabe. Dadurch entfällt das manuelle Filtern wie bei Frameworks, bei denen Tool-Aufrufe als sichtbarer Text erscheinen.
Der folgende Handler ist eine vereinfachte Implementierung mit Session-Auflösung und get-or-create-Logik.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request):
# In der Produktion sollte ein stabiler Identifikator aus dem Upstream-System verwendet werden.
session_id = req.elevenlabs_extra_body.arbitrary_identifier
session = await session_service.get_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
if not session:
session = await session_service.create_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")
content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if not event.content or not event.content.parts:
continue
# Im SSE-Modus sendet ADK partielle (inkrementelle) und finale (vollständige) Events.
# Nur partielle Events weiterleiten, um doppelten Text zu vermeiden. # Hinweis: SSE-Streaming ist in ADK experimentell. In der Produktion sollten
# beide Event-Typen berücksichtigt werden, falls das Modell keine Partials sendet.
if not getattr(event, "partial", False):
continue
text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)
if not text:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": text})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Als Nächstes betrachten wir CrewAI, das von Haus aus stärker auf Aufgaben fokussiert ist.
CrewAI
CrewAI wurde entwickelt, um Multi-Agenten-Workflows rund um strukturierte Aufgaben (recherchieren, schreiben, zusammenfassen) zu orchestrieren, nicht für offene Dialogschleifen. Agenten werden mit Rolle, Ziel und Hintergrund definiert. Die Ausführung dreht sich um Task-Objekte mit klarer Beschreibung und erwarteter Ausgabe.
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from crewai.types.streaming import StreamChunkType
llm = LLM(
model=model_id,
api_key=os.getenv("OPENAI_API_KEY")
)
store_agent = Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm,
verbose=False,
)
Im Gegensatz zum Agenten-Loop-Modell von LangGraph und ADK werden bei CrewAI Task und Crew meist pro Anfrage erstellt, um die Arbeitseinheit für diese Gesprächsrunde zu definieren. Den Gesprächskontext führen wir fort, indem wir vorherige Runden als Platzhalter in die nächste Aufgabe einfügen. Die Variable {crew_chat_messages} wird bei jeder Anfrage mit dem aktuellen Gesprächsverlauf befüllt und zur Ausführungszeit in die Aufgabenbeschreibung interpoliert. Um sauberen, sprechfertigen Text zu erzeugen, filtern wir explizit Zwischenmuster (Thought, Action, Action Input, Observation) heraus und geben nur den finalen Antworttext aus.
Der folgende Handler vereint Aufgabenbau pro Anfrage, Verlaufseinbindung, Crew-Streaming, Trace-Filterung und Ausgabeformatierung.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Task und Crew werden pro Anfrage erstellt (nicht beim Start).
task = Task(
description=(
"Gesprächsverlauf:\n{crew_chat_messages}\n\n"
"Antworten Sie auf die letzte Nachricht des Nutzers."
),
expected_output=expected_output,
agent=store_agent,
)
# stream=True liefert CrewStreamingOutput statt eines einzelnen CrewOutput.
crew = Crew(
agents=[store_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
stream=True,
)
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
final_marker = "final answer:"
marker_buffer = ""
marker_found = False
emitted_any_content = False
streaming = await crew.kickoff_async(
inputs={"crew_chat_messages": json.dumps(req.messages)}
)
async for chunk in streaming:
# Nicht-Text-Events (z.B. Tool-Aufrufe) überspringen.
if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:
continue
# Nur Text nach dem "Final Answer:"-Marker weiterleiten
if not marker_found:
marker_buffer += chunk.content
idx = marker_buffer.lower().find("final answer:")
if idx == -1:
continue
marker_found = True
content = marker_buffer[idx + 13:].lstrip()
marker_buffer = ""
else:
content = chunk.content
# Eventuelle Markdown-Artefakte am Ende der CrewAI-Ausgabe entfernen.
content = content.rstrip("`").rstrip()
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": content})
# Fallback für kurze Antworten ohne "Final Answer:"-Marker
if not sent_role:
raw = getattr(streaming, "result", None)
fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()
if fallback:
yield sse_chunk(response_id, {"role": "assistant"})
yield sse_chunk(response_id, {"content": fallback})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
Als Nächstes betrachten wir LlamaIndex, das einen anderen Ansatz mit einem nativen, ereignisgesteuerten Streaming-Modell verfolgt.
LlamaIndex
Im Gegensatz zu den anderen Frameworks in diesem Beitrag wurde LlamaIndex entwickelt, um LLMs mit externen Datenquellen (Dokumentenspeicher, Indizes, Retrieval-Pipelines) zu verbinden. Die Agenten-Schicht, FunctionAgent, baut darauf auf, um strukturierten Kontext abzurufen und zu verarbeiten, nicht für offene Dialoge oder Aufgabenbearbeitung.
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent, AgentStream
from llama_index.core.base.llms.types import ChatMessage, MessageRole
llm = OpenAI(
model=model,
api_key=os.getenv("OPENAI_API_KEY")
)
agent = FunctionAgent(
tools=[list_inventory, get_item_price],
llm=llm,
system_prompt=system_prompt,
)
Um den Gesprächsfluss zu erhalten, wandelt der Proxy eingehende Nachrichten in LlamaIndex-Chatnachrichten um und trennt sie in die aktuelle Nutzereingabe (user_msg) und den bisherigen Verlauf (chat_history). Das event.delta-Feld jedes AgentStream-Events enthält das nächste Textfragment, das direkt als OpenAI-Delta-Content-Chunk weitergegeben werden kann. Nicht-leere Deltas werden direkt weitergeleitet, was diese Streaming-Bridge zur einfachsten im Leitfaden macht. Der Stream enthält sowohl Orchestrierungsereignisse (Tool-Aufrufe, Ergebnisse) als auch Sprachevents (Assistenten-Textdeltas). Um die Sprachausgabe sauber zu halten, gibt der Proxy nur AgentStream-Events weiter und überspringt leere Deltas.
[1] AgentStream (delta='') ← ignoriert
[2] ToolCall ← ignoriert
[3] ToolCallResult ← ignoriert
[4] AgentStream (delta='It') ← weitergeleitet ✓
[5] AgentStream (delta=' costs') ← weitergeleitet ✓
[6] AgentStream (delta=' $49.99')← weitergeleitet ✓
Diese Trennung hält Zwischenschritte der Tool-Mechanik aus der Sprachausgabe heraus und ermöglicht gleichzeitig eine niedrige Latenz bei der inkrementellen Sprachausgabe. Der folgende Handler fasst diese Schritte zusammen.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Es wird angenommen, dass die letzte Nachricht immer eine Nutzereingabe mit String-Inhalt ist.
# In der Produktion sollte eine defensive Prüfung für Rollen/Inhalte bei Nicht-Text-Payloads erfolgen.
chat_history = [
ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")
for m in req.messages
]
user_text = chat_history.pop().content
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
handler = agent.run(user_msg=user_text, chat_history=chat_history)
async for event in handler.stream_events():
if not isinstance(event, AgentStream):
continue
if not event.delta:
continue
yield sse_chunk(response_id, {"content": event.delta})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
LlamaIndex gibt weniger Vorgaben für End-to-End-Gesprächsmuster als Frameworks mit umfangreicher Orchestrierung. Für produktive Einsätze müssen Kunden daher meist Session-Handling, Antwort-Filter, Tool-Orchestrierung und Tracing selbst implementieren.
Fazit
Jedes Framework in diesem Leitfaden verbindet sich über denselben Mechanismus mit ElevenLabs: Es akzeptiert eine OpenAI-ähnliche Completions- oder Responses-Anfrage und streamt SSE-Chunks zurück. So können Teams Sprach-Orchestrierung mit minimalen Änderungen auf bestehende Agentenlösungen aufsetzen und ihre bisherigen Investitionen erhalten, während sie Echtzeit-KI-Gespräche ermöglichen. Diese Modularität ist ein Grundprinzip der ElevenAgents-Plattform. Egal ob bestehende Agenten erweitert oder Voice-native Lösungen aufgebaut werden: Die Voice Orchestration von ElevenAgents ist darauf ausgelegt, beide Ansätze zu unterstützen.
Wenn Sie bereits einen Agenten mit einem Open-Source-Framework betreiben und Sprache aktivieren möchten, probieren Sie diesen Ansatz aus und teilen Sie uns Ihr Feedback mit.



