Gå till innehåll

Praktisk guide: open source-agentramverk och ElevenAgents

Koppla open source-agentramverk till ElevenLabs-röst via Custom LLM.

Black square with some squiggly lines.

I vårt tidigare inlägg om Integrera externa agenter med ElevenLabs Voice Orchestration beskrev vi hur team kan koppla sin befintliga textbaserade agentorkestrering till ElevenLabs via Anpassad LLM. Med det som grund visar vi här hur ledande open source-agentramverk kan anpassas och användas bakom Custom LLM-gränssnittet. Resultatet är en flexibel arkitektur där röst läggs på etablerade agentsystem utan att tumma på tillståndshantering, verktygsorkestrering eller applikationsspecifik kontroll. Oavsett ramverk följer vi samma trestegsmönster: skapa en generation request, extrahera det slutgiltiga textsvaret och formatera det i OpenAI-kompatibelt Server-Sent Events (SSE)-format. ElevenLabs stödjer både Chat-kompletteringaroch Svar-formaten. Även om guiden täcker fyra populära ramverk kan mönstren användas för alla miljöer som kan producera OpenAI-kompatibel strömmande output.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Allmän setup

Exemplen i det här avsnittet använder Python och FastAPI, men vilken stack som helst som hanterar HTTP POST och strömmande SSE-svar fungerar. När ElevenLabs voice orchestration upptäcker att en tur troligen är slut skickas en generation request till den konfigurerade Custom LLM-endpointen. Här går vi igenom kärnkomponenterna i det översättningslagret – bron eller proxyn som gör att voice orchestration och agentramverket kan prata med varandra.

Olika ramverk väljs förstås av kunder beroende på vana eller förmåga att lösa ett visst behov. LlamaIndex togs till exempel fram för att förenkla Retrieval-Augmented Generation (RAG), medan CrewAI byggdes för att automatisera definierade uppgifter i agenternas tidsålder. Olika designmål ger olika svarstrukturer, och varje ramverk kräver särskild hantering. Att strömma ut textbitar medan LLM:en genererar dem, istället för att vänta på ett helt svar, är viktigt eftersom Text to Speech (TTS)-modellen då kan börja läsa upp tidigare och därmed minska upplevd fördröjning. Vi fokuserar på fyra populära ramverk: LangGraph, Google ADK, CrewAI och LlamaIndex.

Om delad kod

Varje ramverk måste strömma svar som OpenAI-kompatibla SSE-bitar. Vi introducerar en liten hjälpfunktion som används i exemplen för att bygga dessa bitar.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

        "id": response_id,

        "object": "chat.completion.chunk",

        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],

    }

    return f"data: {json.dumps(payload)}\n\n"

Med det på plats börjar vi med LangGraph.

LangGraph

LangGraph modellerar agenter som grafer där noder är enskilda steg och kanter styr flödet mellan dem. Grundinställningen är enkel: initiera en chat-modell, definiera agentverktyg och skapa agentgrafens runtime.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

Vid varje generation request får LangGraph-agenten hela konversationshistoriken, vilket gör att den kan hålla rätt tillstånd internt. LangGraph stödjer serverlagring via Checkpoints, men vi går inte igenom det här för att hålla det enkelt.

När tillståndshanteringen är löst är nästa LangGraph-specifika val strömningsläge, där LangGraph erbjuder två alternativ beroende på användningsområde:

  • stream_mode="values" ger ögonblicksbilder av grafens tillstånd. Det är enklare att implementera men skickar med hela meddelandetillståndet varje gång, vilket ökar fördröjningen i realtidsdialoger.
  • stream_mode="messages" strömmar ut inkrementella meddelandebitar från modellen. Det är oftast bäst för röstinteraktion i realtid, eftersom det minskar tiden till första ljud i ElevenLabs-orkestreringen.

Mer specifikt innehåller messages-implementeringen av agentloopen mellanliggande steg, som verktygsanrop, som inte ska läsas upp. Proxyn filtrerar bort dessa och skickar bara svarstext till TTS-lagret. Här är ett exempel på en tur med verktygsanrop.

[1] Modellen väljer att anropa ett verktyg (tool_calls=["get_price"])

[2] Verktyget körs och returnerar data (result="$24.99")

[3] Modellen ger svar med resultatet (content="It costs $24.99")

Endast bitarna från steg 3 ska skickas vidare i SSE-strömmen. I praktiken hanteras detta av två kontroller i strömningsloopen: en som bara släpper igenom langgraph_node == "model"-händelser, och en som hoppar över tomt innehåll. Tillsammans ser de till att bara svarstext till användaren skickas till ElevenLabs som SSE. Här är en enkel implementation av proxyn.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    input = {"messages": req.messages}

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Skicka bara vidare modellens textbitar; hoppa över verktygsuppdateringar och icke-text.

            if metadata.get("langgraph_node") != "model":

                continue

            content = getattr(message_chunk, "content", None)

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            # Skicka inkrementella textbitar till ElevenLabs i OpenAI-format.

            yield sse_chunk(response_id, {"content": content})

         # Signalera naturligt avslut innan finish_reason: "stop" [DONE]

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Detta gör att bara modellens svar till användaren skickas till ElevenLabs. Eftersom LangGraph visar sina interna verktygsanrop i tillståndsströmmen är filtreringen tydlig och styrs av proxyn.

Nu går vi vidare till detaljerna kring Google Agent Development Kit (ADK)

Google ADK

Googles ADK abstraherar runtime-loopen med några kärnkomponenter: Agent, Runner och SessionService. ADK:s Runner sitter mellan HTTP-lagret och agentdefinitionen. Den hanterar meddelanderouting, verktygsorkestrering, sessioner och händelseströmning.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Med agent, session-backend och runner på plats löser proxyn eller skapar en ADK-session för varje inkommande request. I ADK styr session_id minneshanteringen: om samma session_id återanvänds mellan turer sparas automatiskt historik, verktygsanrop och tidigare svar. Eftersom konversationsidentiteten finns uppströms i ElevenLabs hanterar proxyn denna koppling. Genom att skicka rätt identifierare för generation request kan SDK:n hantera tidigare kontext internt. Vi skickar den godtyckliga identifieraren vid konversationsstart via extra parametrar som skickas i requestens body.

När meddelandet och sessionen är förberedda kan runnern köras. Verktygsanrop och resultat syns fortfarande som interna ADK-händelser under körning, men de behandlas som mellanliggande steg och inte som svar till användaren. Det gör att man slipper manuell filtrering jämfört med ramverk där verktygsanrop syns som text. 

Handlern nedan är en förenklad implementation med sessionhantering och get-or-create-logik direkt i koden.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # I produktion, använd en stabil identifierare från ditt uppströmsystem.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # I SSE-läge skickar ADK ut både partial (inkrementella) och final (slutgiltiga) händelser.

            # Skicka bara partial events för att undvika dubblerad text.
            # Obs: SSE-streaming är experimentellt i ADK. I produktion, hantera

            # båda händelsetyperna om modellbackend inte skickar partials.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Nu tittar vi på CrewAI, som är mer uppgiftsfokuserad i sin design.

CrewAI

CrewAI är byggt för att orkestrera multi-agent-workflows kring strukturerade uppgifter (research, skriva, sammanfatta) snarare än öppna dialoger. Agenter definieras med roll, mål och bakgrund. Utförandet kretsar kring Task-objekt, var och en med tydlig beskrivning och förväntat resultat.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

Till skillnad från agentloop-modellen i LangGraph och ADK skapar CrewAI vanligtvis Task och Crew per request för att definiera arbetsenheten för den turen. Vi för över kontext mellan turer genom att lägga in tidigare meddelanden i nästa task via en variabel. Variabeln {crew_chat_messages} fylls på varje request med konversationshistoriken och läggs in i taskbeskrivningen vid körning. För att få ren, talvänlig text filtrerar vi dessutom bort mellanliggande spårningsmönster (Thought, Action, Action Input, Observation) och skickar bara slutsvaret.

Handlern nedan samlar ihop task per request, historikinjektion, Crew-nivåns streaming, spårningsfiltrering och outputformatering.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task och Crew sätts ihop per request (inte vid start).

    task = Task(

        description=(

            "Konversationshistorik:\n{crew_chat_messages}\n\n"

            "Svara på användarens senaste meddelande."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True ger CrewStreamingOutput istället för ett CrewOutput.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Hoppa över icke-text (t.ex. verktygsanrop).

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Skicka bara text efter "Final Answer:"-markören

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Ta bort eventuella markdown-rester från CrewAI-output.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Fallback för korta svar utan "Final Answer:"-markör

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Nu tittar vi på LlamaIndex, som har ett annat fokus med en händelsedriven strömningsmodell.

LlamaIndex

Till skillnad från de andra ramverken här är LlamaIndex byggt för att koppla LLM:er till externa datakällor (dokumentlager, index, sökrör). Agentlagret, FunctionAgent, ligger ovanpå och hämtar och resonerar över strukturerad kontext snarare än att hantera öppen dialog eller uppgifter.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

För att bevara kontexten omvandlar proxyn inkommande meddelanden till LlamaIndex chat-meddelanden, och delar upp dem i senaste användartur (user_msg) och tidigare turer (chat_history). Varje AgentStream-events event.delta innehåller nästa textbit, som direkt kan skickas vidare som en OpenAI-liknande delta.content. Icke-tomma deltas kan skickas som de är, vilket gör detta till den enklaste strömningsbron i guiden. Strömmen innehåller både orkestreringshändelser (verktygsanrop, resultat) och talhändelser (assistentens textbitar). För att hålla röstutdata ren skickar proxyn bara AgentStream-händelser och hoppar över tomma deltas.

[1] AgentStream (delta='')       ← ignoreras

[2] ToolCall                     ← ignoreras

[3] ToolCallResult               ← ignoreras

[4] AgentStream (delta='It')     ← skickas ✓

[5] AgentStream (delta=' costs') ← skickas ✓

[6] AgentStream (delta=' $49.99')← skickas ✓

Denna separation håller mellanliggande verktygslogik borta från talet och ger låg fördröjning. Handlern nedan visar stegen tillsammans.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Antar att sista meddelandet alltid är en användartur med text.

    # I produktion, lägg till hantering för andra roller/innehållstyper.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndex är mindre styrande kring hela konversationsflödet än ramverk med mer inbyggd orkestrering. I produktion behöver kunder ofta själva bygga sessionhantering, svarskontroller, verktygsorkestrering och spårning.

Slutsats

Alla ramverk i denna guide kopplas till ElevenLabs på samma sätt: ta emot en OpenAI-liknande Completions- eller Responses-request och strömma tillbaka SSE-bitar. Det gör att team kan lägga på voice orchestration ovanpå sin befintliga agentlösning med minimala ändringar, så att det de redan byggt kan användas vidare – nu med realtidskonversation via AI. Denna modularitet är en grundpelare i ElevenAgents. Oavsett om du bygger vidare på en befintlig agent eller skapar voice-native från början är ElevenAgents voice orchestration byggd för att möta dig där du är.

Om du redan kör en agent med ett open source-ramverk och vill lägga till röst, testa gärna detta och berätta vad du tycker.

Utforska artiklar av ElevenLabs-teamet

Skapa med AI-ljud av högsta kvalitet