Direkt zum Inhalt

Praxisleitfaden: Open-Source-Agenten-Frameworks und ElevenAgents

Anbindung von Open-Source-Agenten-Frameworks an ElevenLabs-Stimme über Custom LLM.

Black square with some squiggly lines.

In unserem vorherigen Beitrag zu Integration externer Agenten mit ElevenLabs Voice Orchestration haben wir beschrieben, wie Teams ihre bestehende textbasierte Agenten-Orchestrierung über das Individuelles LLM mit ElevenLabs verbinden können. Aufbauend darauf zeigt dieser Leitfaden, wie führende Open-Source-Agenten-Frameworks hinter der Custom LLM-Schnittstelle angepasst und bereitgestellt werden können. Das Ergebnis ist eine flexible Architektur, bei der Sprache auf ausgereifte Agentensysteme aufgesetzt wird, ohne das Zustandsmanagement, die Tool-Orchestrierung oder die anwendungsspezifische Steuerung zu beeinträchtigen. Framework-übergreifend folgen wir immer demselben Drei-Schritte-Muster: Generierungsanfrage erstellen, finale Textantwort extrahieren und im OpenAI-kompatiblen Server-Sent Events (SSE)-Format umwandeln. ElevenLabs unterstützt sowohl das Chat-Antwortenals auch das Antworten-Format. Dieser Leitfaden behandelt vier weit verbreitete Frameworks, das Muster lässt sich jedoch auf jede Laufzeit übertragen, die OpenAI-kompatiblen Streaming-Output erzeugen kann.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Allgemeine Einrichtung

Die Beispiele in diesem Abschnitt verwenden Python und FastAPI, aber jeder Stack, der HTTP-POST-Anfragen und Streaming-SSE-Antworten verarbeiten kann, ist geeignet. Erkennt die ElevenLabs Voice Orchestration ein wahrscheinliches Gesprächsende, sendet sie eine Generierungsanfrage an den konfigurierten Custom LLM-Endpunkt. Dieser Abschnitt erläutert die Kernkomponenten dieser Übersetzungsschicht – die Brücke oder den Proxy, der Voice Orchestration und Agenten-Framework miteinander verbindet.

Jedes Framework wird verständlicherweise von Kunden entweder aus Gewohnheit oder wegen spezieller Anforderungen gewählt. LlamaIndex wurde beispielsweise entwickelt, um Retrieval-Augmented Generation (RAG) zu vereinfachen, während CrewAI für die Automatisierung definierter Aufgaben konzipiert wurde. Unterschiedliche Designziele führen zu unterschiedlichen Antwortstrukturen, die jeweils spezifisch behandelt werden müssen. Das Streamen von Chunks, sobald das LLM sie generiert, statt auf eine vollständige Antwort zu warten, ist entscheidend, da das Text-to-Speech (TTS)-Modell so früher mit der Sprachausgabe beginnen kann und die wahrgenommene Latenz sinkt. Wir konzentrieren uns auf die vier populären Frameworks: LangGraph, Google ADK, CrewAI und LlamaIndex.

Hinweis zum gemeinsamen Code

Jedes Framework muss Antworten als OpenAI-kompatible SSE-Chunks streamen. Wir stellen eine kleine Hilfsfunktion vor, die in allen Beispielen zur Erstellung dieser Chunks verwendet wird.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

        "id": response_id,

        "object": "chat.completion.chunk",

        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],

    }

    return f"data: {json.dumps(payload)}\n\n"

Mit dieser Grundlage beginnen wir mit LangGraph.

LangGraph

LangGraph modelliert Agenten als Graphen, wobei Knoten einzelne Schritte und Kanten den Kontrollfluss zwischen ihnen darstellen. Die minimale Einrichtung ist einfach: Chat-Modell initialisieren, Agenten-Tools definieren und die Agenten-Graph-Laufzeit erstellen.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

Bei jeder Generierungsanfrage erhält der LangGraph-Agent den gesamten Gesprächsverlauf und kann so den benötigten Zustand intern verwalten. LangGraph unterstützt serverseitige Persistenz über Checkpoints, die wir hier jedoch nicht behandeln, um die Implementierung schlank zu halten.

Mit dem Zustandsmanagement als Basis folgt die nächste LangGraph-spezifische Entscheidung: der Streaming-Modus. LangGraph bietet zwei Optionen, die jeweils für unterschiedliche Anwendungsfälle geeignet sind:

  • stream_mode="values" liefert Snapshots des Graph-Zustands. Die Implementierung ist einfacher, aber jede Antwort enthält den vollständigen Nachrichtenstatus, was die Latenz bei Echtzeitgesprächen erhöht.
  • stream_mode="messages" streamt inkrementelle Nachrichten-Chunks aus dem Modell. Dies ist für Echtzeit-Sprachinteraktionen meist vorzuziehen, da die Zeit bis zur ersten Audioausgabe in der ElevenLabs-Orchestrierung reduziert wird.

Im Detail enthält die Nachrichten-Implementierung der Agenten-Schleife Zwischenschritte wie Tool-Updates, die nicht gesprochen werden sollen. Der Proxy filtert diese heraus und gibt nur benutzerrelevanten Antworttext an die TTS-Schicht weiter. Hier ein Beispiel für eine Tool-gestützte Runde.

[1] Modell entscheidet, ein Tool aufzurufen (tool_calls=["get_price"])

[2] Tool wird ausgeführt und liefert Daten zurück (result="$24.99")

[3] Modell erzeugt Antwort mit dem Ergebnis (content="It costs $24.99")

Nur die Chunks aus Schritt 3 sollten im SSE-Stream weitergeleitet werden. In der Praxis übernehmen zwei Prüfungen diese Filterung in der Streaming-Schleife: Eine prüft, ob langgraph_node == "model" ist, die andere überspringt leere Inhalte. So wird sichergestellt, dass nur benutzerrelevanter Assistententext als SSE an ElevenLabs weitergegeben wird. Im Folgenden eine schlanke Proxy-Implementierung.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    input = {"messages": req.messages}

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Nur Modell-Text-Chunks weiterleiten; Tool-Updates und Nicht-Text-Ereignisse überspringen.

            if metadata.get("langgraph_node") != "model":

                continue

            content = getattr(message_chunk, "content", None)

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            # Inkrementelle Token-Chunks im OpenAI-Format an ElevenLabs senden.

            yield sse_chunk(response_id, {"content": content})

         # Natürliches Ende signalisieren, bevor finish_reason: "stop" [DONE] verwendet wird

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

So werden nur benutzerrelevante Modell-Chunks an ElevenLabs weitergeleitet. Da LangGraph die interne Tool-Ausführung im Zustandsstream sichtbar macht, ist das Filtern explizit und wird vom Proxy gesteuert.

Als Nächstes betrachten wir die Besonderheiten der Arbeit mit Googles Agent Development Kit (ADK).

Google ADK

Googles ADK abstrahiert die Laufzeitschleife über wenige Kernkomponenten: Agent, Runner und SessionService. Der Runner sitzt zwischen HTTP-Schicht und Agentendefinition. Er übernimmt Nachrichtenrouting, Tool-Orchestrierung, Sitzungsverwaltung und Event-Streaming.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Mit Agent, Sitzungs-Backend und Runner initialisiert, löst der Proxy für jede eingehende Anfrage eine ADK-Session auf oder erstellt sie. In ADK steuert session_id die Speicherpersistenz: Wird dieselbe session_id über mehrere Runden verwendet, bleiben Verlauf, Tool-Aufrufe und vorherige Antworten erhalten. Da die Gesprächsidentität upstream bei ElevenLabs liegt, übernimmt der Proxy diese Zuordnung explizit. Durch die Übergabe des richtigen Identifikators bei der Generierungsanfrage kann das SDK den Kontext intern verwalten. Der beliebige Identifikator wird beim Start des Gesprächs über zusätzliche Parameter im Request-Body übergeben.

Mit vorbereiteter Nachricht und Session kann der Runner aufgerufen werden. Tool-Aufrufe und -Ergebnisse erscheinen weiterhin als interne ADK-Events, werden aber als Zwischenschritte behandelt und nicht als benutzerrelevante Ausgabe. Dadurch entfällt das manuelle Filtern wie bei Frameworks, bei denen Tool-Aufrufe als sichtbarer Text erscheinen.

Der folgende Handler ist eine vereinfachte Implementierung mit Session-Auflösung und get-or-create-Logik.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # In der Produktion sollte ein stabiler Identifikator aus dem Upstream-System verwendet werden.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # Im SSE-Modus sendet ADK partielle (inkrementelle) und finale (vollständige) Events.

            # Nur partielle Events weiterleiten, um doppelten Text zu vermeiden.
            # Hinweis: SSE-Streaming ist in ADK experimentell. In der Produktion sollten

            # beide Event-Typen berücksichtigt werden, falls das Modell keine Partials sendet.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Als Nächstes betrachten wir CrewAI, das von Haus aus stärker auf Aufgaben fokussiert ist.

CrewAI

CrewAI wurde entwickelt, um Multi-Agenten-Workflows rund um strukturierte Aufgaben (recherchieren, schreiben, zusammenfassen) zu orchestrieren, nicht für offene Dialogschleifen. Agenten werden mit Rolle, Ziel und Hintergrund definiert. Die Ausführung dreht sich um Task-Objekte mit klarer Beschreibung und erwarteter Ausgabe.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

Im Gegensatz zum Agenten-Loop-Modell von LangGraph und ADK werden bei CrewAI Task und Crew meist pro Anfrage erstellt, um die Arbeitseinheit für diese Gesprächsrunde zu definieren. Den Gesprächskontext führen wir fort, indem wir vorherige Runden als Platzhalter in die nächste Aufgabe einfügen. Die Variable {crew_chat_messages} wird bei jeder Anfrage mit dem aktuellen Gesprächsverlauf befüllt und zur Ausführungszeit in die Aufgabenbeschreibung interpoliert. Um sauberen, sprechfertigen Text zu erzeugen, filtern wir explizit Zwischenmuster (Thought, Action, Action Input, Observation) heraus und geben nur den finalen Antworttext aus.

Der folgende Handler vereint Aufgabenbau pro Anfrage, Verlaufseinbindung, Crew-Streaming, Trace-Filterung und Ausgabeformatierung.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task und Crew werden pro Anfrage erstellt (nicht beim Start).

    task = Task(

        description=(

            "Gesprächsverlauf:\n{crew_chat_messages}\n\n"

            "Antworten Sie auf die letzte Nachricht des Nutzers."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True liefert CrewStreamingOutput statt eines einzelnen CrewOutput.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Nicht-Text-Events (z.B. Tool-Aufrufe) überspringen.

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Nur Text nach dem "Final Answer:"-Marker weiterleiten

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Eventuelle Markdown-Artefakte am Ende der CrewAI-Ausgabe entfernen.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Fallback für kurze Antworten ohne "Final Answer:"-Marker

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Als Nächstes betrachten wir LlamaIndex, das einen anderen Ansatz mit einem nativen, ereignisgesteuerten Streaming-Modell verfolgt.

LlamaIndex

Im Gegensatz zu den anderen Frameworks in diesem Beitrag wurde LlamaIndex entwickelt, um LLMs mit externen Datenquellen (Dokumentenspeicher, Indizes, Retrieval-Pipelines) zu verbinden. Die Agenten-Schicht, FunctionAgent, baut darauf auf, um strukturierten Kontext abzurufen und zu verarbeiten, nicht für offene Dialoge oder Aufgabenbearbeitung.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

Um den Gesprächsfluss zu erhalten, wandelt der Proxy eingehende Nachrichten in LlamaIndex-Chatnachrichten um und trennt sie in die aktuelle Nutzereingabe (user_msg) und den bisherigen Verlauf (chat_history). Das event.delta-Feld jedes AgentStream-Events enthält das nächste Textfragment, das direkt als OpenAI-Delta-Content-Chunk weitergegeben werden kann. Nicht-leere Deltas werden direkt weitergeleitet, was diese Streaming-Bridge zur einfachsten im Leitfaden macht. Der Stream enthält sowohl Orchestrierungsereignisse (Tool-Aufrufe, Ergebnisse) als auch Sprachevents (Assistenten-Textdeltas). Um die Sprachausgabe sauber zu halten, gibt der Proxy nur AgentStream-Events weiter und überspringt leere Deltas.

[1] AgentStream (delta='')       ← ignoriert

[2] ToolCall                     ← ignoriert

[3] ToolCallResult               ← ignoriert

[4] AgentStream (delta='It')     ← weitergeleitet ✓

[5] AgentStream (delta=' costs') ← weitergeleitet ✓

[6] AgentStream (delta=' $49.99')← weitergeleitet ✓

Diese Trennung hält Zwischenschritte der Tool-Mechanik aus der Sprachausgabe heraus und ermöglicht gleichzeitig eine niedrige Latenz bei der inkrementellen Sprachausgabe. Der folgende Handler fasst diese Schritte zusammen.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Es wird angenommen, dass die letzte Nachricht immer eine Nutzereingabe mit String-Inhalt ist.

    # In der Produktion sollte eine defensive Prüfung für Rollen/Inhalte bei Nicht-Text-Payloads erfolgen.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndex gibt weniger Vorgaben für End-to-End-Gesprächsmuster als Frameworks mit umfangreicher Orchestrierung. Für produktive Einsätze müssen Kunden daher meist Session-Handling, Antwort-Filter, Tool-Orchestrierung und Tracing selbst implementieren.

Fazit

Jedes Framework in diesem Leitfaden verbindet sich über denselben Mechanismus mit ElevenLabs: Es akzeptiert eine OpenAI-ähnliche Completions- oder Responses-Anfrage und streamt SSE-Chunks zurück. So können Teams Sprach-Orchestrierung mit minimalen Änderungen auf bestehende Agentenlösungen aufsetzen und ihre bisherigen Investitionen erhalten, während sie Echtzeit-KI-Gespräche ermöglichen. Diese Modularität ist ein Grundprinzip der ElevenAgents-Plattform. Egal ob bestehende Agenten erweitert oder Voice-native Lösungen aufgebaut werden: Die Voice Orchestration von ElevenAgents ist darauf ausgelegt, beide Ansätze zu unterstützen.

Wenn Sie bereits einen Agenten mit einem Open-Source-Framework betreiben und Sprache aktivieren möchten, probieren Sie diesen Ansatz aus und teilen Sie uns Ihr Feedback mit.

Entdecken Sie Artikel des ElevenLabs-Teams

Erstellen Sie mit hochwertiger KI-Audio