Passer au contenu

Guide pratique : frameworks d'agents open source et ElevenAgents

Connecter des frameworks d'agents open source à la voix ElevenLabs via Custom LLM.

Black square with some squiggly lines.

Dans notre précédent article sur Intégrer des agents externes avec l’orchestration vocale ElevenLabs, nous avons expliqué comment les équipes peuvent connecter leur orchestration d’agents textuels existante à ElevenLabs via le LLM personnalisé. En s’appuyant sur cette base, ce guide montre comment les principaux frameworks d’agents open source peuvent être adaptés et déployés derrière l’interface Custom LLM. Le résultat : une architecture flexible où la voix s’ajoute à des systèmes d’agents éprouvés, sans compromettre la gestion d’état, l’orchestration des outils ou le contrôle spécifique à l’application. Quel que soit le framework, nous suivons toujours le même schéma en trois étapes : créer une requête de génération, extraire la réponse texte finale et la reformater au format Server-Sent Events (SSE) compatible OpenAI. ElevenLabs prend en charge les formats Complétions de chatet Réponses. Ce guide couvre quatre frameworks largement utilisés, mais la logique s’applique à tout environnement capable de produire un flux compatible OpenAI.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Configuration générale

Les exemples de cette section utilisent Python et FastAPI, mais toute stack capable de gérer des requêtes HTTP POST et des réponses SSE en streaming conviendra. Lorsque l’orchestration vocale ElevenLabs détecte une fin de tour probable, elle envoie une requête de génération à l’endpoint Custom LLM configuré. Cette section détaille les éléments clés de cette couche de traduction, le pont ou proxy qui permet à l’orchestration vocale et au framework d’agent de communiquer.

Chaque framework peut être choisi par les clients pour sa familiarité ou sa capacité à répondre à un besoin précis. LlamaIndex, par exemple, a été conçu pour simplifier la mise en place du Retrieval-Augmented Generation (RAG), tandis que CrewAI a été pensé pour automatiser des tâches définies à l’ère des agents. Des objectifs de conception différents produisent des structures de réponse différentes, chacune nécessitant un traitement spécifique. Diffuser les fragments au fur et à mesure de leur génération par le LLM, plutôt que d’attendre la fin du tour, est essentiel : cela permet au modèle Text to Speech (TTS) de commencer à générer la voix plus tôt et donc de réduire la latence perçue. Nous nous concentrons ici sur quatre frameworks populaires : LangGraph, Google ADK, CrewAI et LlamaIndex.

Note sur le code partagé

Chaque framework doit diffuser les réponses sous forme de fragments SSE compatibles OpenAI. Nous présentons une petite fonction utilitaire utilisée dans tous les exemples pour construire ces fragments.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

        "id": response_id,

        "object": "chat.completion.chunk",

        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],

    }

    return f"data: {json.dumps(payload)}\n\n"

Avec cette base, commençons par LangGraph.

LangGraph

LangGraph modélise les agents sous forme de graphes, où chaque nœud représente une étape et chaque lien définit le flux de contrôle. La configuration minimale est simple : initialiser un modèle de chat, définir les outils de l’agent et créer le runtime du graphe d’agents.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

À chaque requête de génération, l’agent LangGraph reçoit tout l’historique de la conversation, ce qui lui permet de gérer l’état nécessaire en interne. LangGraph prend en charge la persistance côté serveur via les Points de contrôle, mais nous ne les abordons pas ici pour garder l’implémentation simple.

Une fois la gestion d’état assurée, le choix suivant propre à LangGraph concerne le mode de streaming, qui propose deux options selon le cas d’usage :

  • stream_mode="values" fournit des instantanés de l’état du graphe. C’est plus simple à mettre en place mais chaque réponse inclut l’état complet du message, ce qui ajoute de la latence dans les échanges en temps réel.
  • stream_mode="messages" diffuse des fragments de message incrémentaux depuis le modèle. C’est généralement préférable pour les interactions vocales en temps réel, car cela réduit le délai avant la première sortie audio dans l’orchestration ElevenLabs.

Plus précisément, l’implémentation messages de la boucle agent inclut des étapes intermédiaires comme les mises à jour d’appels d’outils, qui ne doivent pas être prononcées. Le proxy filtre ces éléments et ne transmet que le texte destiné à l’utilisateur à la couche TTS. Voici un exemple de tour avec outil.

[1] Le modèle décide d’appeler un outil (tool_calls=["get_price"])

[2] L’outil s’exécute et renvoie une donnée (result="$24.99")

[3] Le modèle produit une réponse avec ce résultat (content="It costs $24.99")

Seuls les fragments de l’étape 3 doivent être transmis dans le flux SSE. En pratique, deux vérifications assurent ce filtrage dans la boucle de streaming : une pour ne garder que les événements langgraph_node == "model", et une pour ignorer les contenus vides. Ensemble, elles garantissent que seul le texte assistant destiné à l’utilisateur est transmis à ElevenLabs en SSE. Voici une implémentation légère du proxy de requête.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    input = {"messages": req.messages}

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Ne transmettre que les fragments texte du modèle ; ignorer les mises à jour d’outils et les événements non textuels.

            if metadata.get("langgraph_node") != "model":

                continue

            content = getattr(message_chunk, "content", None)

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            # Envoyer les fragments incrémentaux à ElevenLabs au format OpenAI.

            yield sse_chunk(response_id, {"content": content})

         # Signaler la fin naturelle avant d’utiliser finish_reason: "stop" [DONE]

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Cela garantit que seuls les fragments destinés à l’utilisateur sont transmis à ElevenLabs. Comme LangGraph rend l’exécution interne des outils visible dans le flux d’état, le filtrage est explicite et géré par le proxy.

Passons maintenant aux spécificités du travail avec l’Agent Development Kit (ADK) de Google

Google ADK

L’ADK de Google abstrait la boucle d’exécution derrière quelques primitives : Agent, Runner et SessionService. Le Runner d’ADK fait le lien entre la couche HTTP et la définition de l’agent. Il gère le routage des messages, l’orchestration des outils, le cycle de vie des sessions et le streaming des événements.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Une fois l’agent, le backend de session et le runner initialisés, le proxy résout ou crée une session ADK pour chaque requête entrante. Dans ADK, session_id contrôle la persistance de la mémoire : réutiliser le même session_id d’un tour à l’autre permet de conserver l’historique, les appels d’outils et les réponses précédentes. Comme l’identité de la conversation est gérée en amont dans ElevenLabs, le proxy s’occupe explicitement de cette correspondance. En transmettant le bon identifiant lors de la requête de génération, le SDK peut gérer le contexte précédent en interne. On passe cet identifiant arbitraire lors de l’initiation de la conversation via les paramètres supplémentaires transmis dans le corps de la requête.

Une fois le message et la session prêts, le runner peut être appelé. Les appels d’outils et leurs résultats apparaissent toujours comme des événements internes ADK pendant l’exécution, mais ils sont considérés comme des étapes d’orchestration intermédiaires et non comme des sorties destinées à l’utilisateur. Cela évite d’avoir à filtrer manuellement, contrairement aux frameworks où les appels d’outils apparaissent comme du texte visible par l’utilisateur.

Le handler ci-dessous est une implémentation simplifiée qui inclut la résolution de session et la logique de création si besoin.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # En production, privilégiez un identifiant stable provenant de votre système amont.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # En mode SSE, ADK émet des événements partiels (incrémentaux) et finaux (complets).

            # Ne transmettre que les événements partiels évite de dupliquer le texte complet.
            # Remarque : le streaming SSE est expérimental dans ADK. En production, gérez

            # les deux types d’événements au cas où le backend ne produirait pas de partiels.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Passons maintenant à CrewAI, qui est conçu autour des tâches.

CrewAI

CrewAI a été conçu pour orchestrer des workflows multi-agents autour de tâches structurées (recherche, rédaction, synthèse) plutôt que des boucles de dialogue ouvertes. Les agents sont définis avec un rôle, un objectif et une histoire. L’exécution s’articule autour d’objets Task, chacun avec une description claire et un résultat attendu.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

Contrairement au modèle de boucle d’agent utilisé dans LangGraph et ADK, CrewAI construit généralement Task et Crew à chaque requête pour définir l’unité de travail du tour de conversation. Nous conservons le contexte conversationnel en injectant les tours précédents dans la tâche suivante via un placeholder. La variable {crew_chat_messages} est remplie à chaque requête avec l’historique de la conversation, puis interpolée dans la description de la tâche au moment de l’exécution. Pour produire un texte propre et prêt à être vocalisé, nous filtrons explicitement les traces intermédiaires (Thought, Action, Action Input, Observation) et ne diffusons que la réponse finale.

Le handler ci-dessous regroupe la construction de la tâche à chaque requête, l’injection de l’historique, le streaming au niveau Crew, le filtrage des traces et la mise en forme de la sortie.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task et Crew sont assemblés à chaque requête (pas au démarrage).

    task = Task(

        description=(

            "Historique de la conversation :\n{crew_chat_messages}\n\n"

            "Répondez au dernier message de l’utilisateur."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True retourne CrewStreamingOutput au lieu d’un CrewOutput unique.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Ignorer les événements non textuels (ex : appels d’outils).

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Ne transmettre le texte qu’après le marqueur "Final Answer:"

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Nettoyer les éventuels artefacts markdown à la fin de la sortie CrewAI.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Gestion de secours pour les réponses courtes sans le marqueur "Final Answer:"

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Voyons maintenant LlamaIndex, qui adopte une approche différente centrée sur un modèle de streaming piloté par les événements.

LlamaIndex

Contrairement aux autres frameworks présentés ici, LlamaIndex a été conçu pour connecter les LLM à des sources de données externes (bases de documents, index, pipelines de recherche). Sa couche agent, FunctionAgent, s’appuie sur cette base pour récupérer et raisonner sur du contexte structuré, plutôt que pour gérer du dialogue ouvert ou l’exécution de tâches.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

Pour préserver la continuité de la conversation, le proxy transforme les messages entrants en messages de chat LlamaIndex, puis les sépare en dernier tour utilisateur (user_msg) et historique précédent (chat_history). Le champ event.delta de chaque événement AgentStream contient le prochain fragment de texte, qui correspond directement à un fragment delta.content au format OpenAI. Les deltas non vides peuvent être transmis tels quels, ce qui fait de ce pont de streaming le plus simple du guide. Le flux contient à la fois des événements d’orchestration (appels d’outils, résultats) et des événements de parole (fragments assistant). Pour garder la sortie vocale propre, le proxy ne conserve que les événements AgentStream et ignore les deltas vides.

[1] AgentStream (delta='')       ← ignoré

[2] ToolCall                     ← ignoré

[3] ToolCallResult               ← ignoré

[4] AgentStream (delta='It')     ← transmis ✓

[5] AgentStream (delta=' costs') ← transmis ✓

[6] AgentStream (delta=' $49.99')← transmis ✓

Cette séparation permet d’exclure la mécanique intermédiaire des outils de la sortie vocale tout en conservant une latence minimale. Le handler ci-dessous rassemble ces étapes.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # On suppose ici que le dernier message est toujours un tour utilisateur avec un contenu texte.

    # En production, ajoutez une gestion défensive des rôles/contenus pour les payloads non textuels.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndex est moins directif sur les schémas d’exécution conversationnelle de bout en bout que les frameworks avec des couches d’orchestration plus lourdes. En production, cela implique souvent de gérer la session, les garde-fous sur les réponses, l’orchestration des outils et le suivi.

Conclusion

Chaque framework présenté ici se connecte à ElevenLabs via le même principe : accepter une requête Completions ou Responses au format OpenAI et renvoyer des fragments SSE en streaming. Cela permet aux équipes d’ajouter l’orchestration vocale à une implémentation d’agent existante avec très peu de modifications, tout en conservant ce qui a déjà été construit et en ouvrant la voie à l’IA conversationnelle en temps réel. Cette modularité est au cœur de la philosophie ElevenAgents. Que vous adaptiez un agent existant ou que vous construisiez une expérience vocale native, l’orchestration vocale ElevenAgents est conçue pour s’adapter à vos besoins.

Si vous utilisez déjà un agent avec un framework open source et souhaitez activer la voix, essayez cette approche et dites-nous ce que vous en pensez.

Découvrez les articles de l'équipe ElevenLabs

Créez avec l'audio IA de la plus haute qualité