Salta al contenido

Guía práctica: frameworks de agentes open-source y ElevenAgents

Conecta frameworks de agentes open-source con la voz de ElevenLabs usando Custom LLM.

Black square with some squiggly lines.

En nuestra publicación anterior sobre Integrar agentes externos con la orquestación de voz de ElevenLabs, explicamos cómo los equipos pueden conectar su orquestación de agentes basada en texto con ElevenLabs a través de LLM personalizado. Sobre esa base, en esta guía mostramos cómo adaptar y desplegar los principales frameworks de agentes open-source detrás de la interfaz de Custom LLM. El resultado es una arquitectura flexible donde la voz se añade a sistemas de agentes consolidados sin perder el control de estado, la orquestación de herramientas o el control específico de cada aplicación. En todos los frameworks seguimos el mismo patrón de tres pasos: crear una petición de generación, extraer la respuesta final en texto y reformatearla en formato Server-Sent Events (SSE) compatible con OpenAI. ElevenLabs es compatible con los formatos Completado de chaty Respuestas. Aunque aquí cubrimos cuatro frameworks muy usados, este patrón se puede aplicar a cualquier runtime que genere streaming compatible con OpenAI.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Configuración general

Los ejemplos de esta sección usan Python y FastAPI, aunque cualquier stack que gestione peticiones HTTP POST y respuestas SSE en streaming funcionará. Cuando la orquestación de voz de ElevenLabs detecta el final probable de un turno, lanza una petición de generación a la ruta de Custom LLM configurada. Aquí repasamos los componentes clave de esa capa de traducción, el puente o proxy que hace que la orquestación de voz y el framework de agentes hablen el mismo idioma.

Es lógico que cada framework se elija por familiaridad o por su capacidad para cumplir un objetivo concreto. LlamaIndex, por ejemplo, se creó para facilitar la configuración de Retrieval-Augmented Generation (RAG), mientras que CrewAI se diseñó para automatizar tareas definidas en la era de los agentes. Diferentes objetivos de diseño generan respuestas distintas y cada uno requiere un manejo específico. Hacer streaming de los fragmentos a medida que el LLM los genera, en vez de esperar a que termine todo el turno, es clave porque permite que el modelo Texto a Voz (TTS) empiece a generar audio antes y así se reduce la latencia percibida. Nos centramos en cuatro frameworks populares: LangGraph, Google ADK, CrewAI y LlamaIndex.

Nota sobre el código compartido

Cada framework debe enviar las respuestas como fragmentos SSE compatibles con OpenAI. Aquí presentamos una pequeña función de ayuda que se usa en todos los ejemplos para construir estos fragmentos.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

Con esa base lista, vamos a empezar con LangGraph.

LangGraph

LangGraph modela agentes como grafos, donde los nodos representan pasos individuales y los enlaces definen el flujo de control entre ellos. La configuración mínima es sencilla: inicializa un modelo de chat, define herramientas del agente y crea el runtime del grafo de agentes.

    }

    return f"data: {json.dumps(payload)}\n\n"

En cada petición de generación, el Agente de LangGraph recibe todo el historial de la conversación, lo que le permite mantener el estado necesario internamente. LangGraph permite persistencia en el servidor mediante

Una vez gestionado el estado, la siguiente decisión específica de LangGraph es el modo de streaming, donde LangGraph ofrece dos opciones, cada una pensada para un caso de uso distinto:

LangGraph modela los agentes como grafos, donde los nodos representan pasos individuales y los enlaces definen el flujo de control entre ellos. La configuración mínima es sencilla: inicializa un modelo de chat, define las herramientas del agente y crea el runtime del grafo de agentes.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

En cada petición de generación, el agente de LangGraph recibe todo el historial de la conversación, lo que le permite mantener el estado necesario internamente. LangGraph permite persistencia en servidor mediante Puntos de control, aunque aquí no los tratamos para mantener la implementación sencilla.

Con la gestión de estado resuelta, la siguiente decisión específica de LangGraph es el modo de streaming, donde LangGraph ofrece dos opciones, cada una pensada para un caso de uso distinto:

  • stream_mode="values" da instantáneas del estado del grafo. Es más fácil de implementar pero incluye el estado completo del mensaje en cada respuesta, lo que añade latencia en flujos conversacionales en tiempo real.
  • parámetros extra

Con el mensaje y la sesión preparados, se puede invocar el runner. Las llamadas y resultados de herramientas siguen apareciendo como eventos internos de ADK durante la ejecución, pero se tratan como pasos intermedios de orquestación y no como salida para el usuario. Así se evita tener que filtrar manualmente, a diferencia de otros frameworks donde las llamadas a herramientas aparecen como texto visible para el usuario.

El siguiente handler es una implementación simplificada que incluye la resolución de sesión y la lógica de crear o recuperar sesión en línea.

[2] La herramienta se ejecuta y devuelve datos (result="$24.99")

[3] El modelo genera la respuesta usando el resultado (content="Cuesta $24.99")

Ahora vamos a ver CrewAI, que por diseño está más centrado en tareas.

CrewAI

CrewAI está pensado para orquestar flujos de trabajo multiagente en torno a tareas estructuradas (investigar, redactar, resumir) en vez de bucles de diálogo abiertos. Los agentes se definen con un rol, objetivo e historia. La ejecución gira en torno a objetos Task, cada uno con una descripción clara y un resultado esperado.

    input = {"messages": req.messages}

    async def stream():

A diferencia del modelo de bucle de agente usado en LangGraph y ADK, CrewAI normalmente crea Task y Crew por cada petición para definir la unidad de trabajo de ese turno en la conversación. Mantenemos el contexto conversacional añadiendo los turnos previos a la siguiente tarea mediante un placeholder. La variable {crew_chat_messages} se rellena en cada petición con el historial de la conversación y luego se inserta en la descripción de la tarea al ejecutarla. Además, filtramos explícitamente patrones intermedios de trazado (Thought, Action, Action Input, Observation) para generar solo texto final listo para voz.

El handler siguiente reúne la construcción de tareas por petición, la interpolación del historial, el streaming a nivel Crew, el filtrado de trazas y el formato de la salida.

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Solo envía fragmentos de texto del modelo; omite actualizaciones de herramientas y eventos no textuales.

Ahora vamos a ver LlamaIndex, que sigue un enfoque diferente centrado en un modelo nativo de streaming basado en eventos.

LlamaIndex

A diferencia de los otros frameworks de esta guía, LlamaIndex está pensado para conectar LLMs con fuentes de datos externas (almacenamiento de documentos, índices, pipelines de recuperación). Su capa de agente, FunctionAgent, se apoya en esa base para recuperar y razonar sobre contexto estructurado, en vez de ejecutar tareas o mantener diálogos abiertos.

            if not content:

                continue

Para mantener la continuidad de la conversación, el proxy transforma los mensajes entrantes en mensajes de chat de LlamaIndex y los divide en el último turno del usuario (user_msg) y los turnos previos (chat_history). Cada evento AgentStream tiene en event.delta el siguiente fragmento de texto, que se corresponde directamente con un chunk delta.content al estilo OpenAI. Los deltas no vacíos pueden enviarse tal cual, lo que hace que este sea el puente de streaming más sencillo de la guía. El stream contiene tanto eventos de orquestación (llamadas a herramientas, resultados) como de voz (deltas de texto del asistente). Para mantener la voz limpia, el proxy solo mantiene eventos AgentStream y omite deltas vacíos.

Para mantener la continuidad de la conversación, el proxy transforma los mensajes entrantes en mensajes de chat de LlamaIndex, luego los divide en el turno más reciente del usuario (user_msg) y los turnos anteriores (chat_history). Cada campo event.delta de AgentStream contiene el siguiente fragmento de texto, que se corresponde directamente con un chunk delta.content al estilo OpenAI. Los deltas no vacíos pueden reenviarse tal cual, lo que convierte este método en el puente de streaming más sencillo de la guía. El stream incluye tanto eventos de orquestación (llamadas a herramientas, resultados) como eventos de voz (deltas de texto del asistente). Para mantener el audio limpio, el proxy solo conserva eventos AgentStream y omite los deltas vacíos.

[1] AgentStream (delta='')       ← ignorado

Esta separación mantiene la mecánica de las herramientas fuera del audio, pero permite un habla incremental y con baja latencia. El handler que ves abajo une todos estos pasos.

            yield sse_chunk(response_id, {"content": content})

LlamaIndex es menos estricto con los patrones de ejecución conversacional de principio a fin que los frameworks con capas de orquestación más complejas. En producción, normalmente requiere que los clientes implementen la gestión de sesiones, límites de respuesta, orquestación de herramientas y trazado.

LlamaIndex es menos estricto con los patrones de ejecución conversacional de principio a fin que los frameworks con capas de orquestación más complejas. En entornos de producción, esto suele requerir que los clientes implementen gestión de sesiones, límites en las respuestas, orquestación de herramientas y trazabilidad.

Conclusión

Cada framework de esta guía se conecta a ElevenLabs mediante el mismo contrato: acepta una petición de Completions o Responses al estilo OpenAI y devuelve fragmentos SSE en streaming. Así, los equipos pueden añadir orquestación de voz sobre una implementación de agente ya existente con cambios mínimos, manteniendo lo que ya han creado y desbloqueando

Si ya tienes un agente funcionando con un framework open source y quieres activar la voz, prueba este método y cuéntanos qué te parece.

Ahora pasamos a los matices de trabajar con el Agent Development Kit (ADK) de Google

Google ADK

El ADK de Google abstrae el bucle de ejecución tras unos pocos elementos clave: Agent, Runner y SessionService. El Runner de ADK se sitúa entre la capa HTTP y la definición del agente. Gestiona el enrutamiento de mensajes, la orquestación de herramientas, el ciclo de vida de la sesión y el streaming de eventos.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Con el agente, el backend de sesión y el runner inicializados, el proxy resuelve o crea una sesión ADK para cada petición entrante. En ADK, session_id controla la persistencia de memoria: reutilizar el mismo session_id entre turnos mantiene el historial, las llamadas a herramientas y las respuestas previas. Como la identidad de la conversación está aguas arriba en ElevenLabs, el proxy gestiona este mapeo de forma explícita. Al pasar el identificador correcto en la petición de generación, el SDK puede gestionar el contexto anterior internamente. El identificador arbitrario se pasa al iniciar la conversación mediante parámetros extra incluidos en el cuerpo de la petición.

Con el mensaje y la sesión listos, se puede invocar el runner. Las llamadas y resultados de herramientas siguen apareciendo como eventos internos de ADK durante la ejecución, pero se tratan como pasos intermedios de orquestación y no como salida para el usuario. Así se evita tener que filtrar manualmente, a diferencia de otros frameworks donde las llamadas a herramientas aparecen como texto visible para el usuario.

El siguiente handler es una implementación simplificada que incluye la resolución y creación de sesión en línea.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # En producción, usa un identificador estable de tu sistema principal.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # En modo SSE, ADK emite eventos parciales (incrementales) y finales (completos).

            # Reenviar solo los parciales evita duplicar el texto completo.
            # Nota: el streaming SSE es experimental en ADK. En producción, gestiona

            # ambos tipos de evento por si el backend del modelo no emite parciales.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Ahora vamos con CrewAI, que está más orientado a tareas por diseño.

CrewAI

CrewAI se diseñó para orquestar flujos de trabajo multiagente en torno a tareas estructuradas (investigar, redactar, resumir) en vez de bucles de diálogo abiertos. Los agentes se definen con un rol, objetivo y backstory. La ejecución gira en torno a objetos Task, cada uno con una descripción clara y una salida esperada.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

A diferencia del modelo de bucle de agente de LangGraph y ADK, CrewAI suele crear Task y Crew por cada petición para definir la unidad de trabajo de ese turno en la conversación. El contexto conversacional se mantiene inyectando los turnos previos en la siguiente tarea mediante un placeholder. La variable {crew_chat_messages} se rellena en cada petición con el historial de la conversación y luego se inserta en la descripción de la tarea al ejecutarla. Además, filtramos explícitamente los patrones intermedios de trazado (Thought, Action, Action Input, Observation) y solo emitimos el texto final para que la salida sea limpia y lista para voz.

El siguiente handler reúne la construcción de tareas por petición, la interpolación del historial, el streaming a nivel Crew, el filtrado de trazas y el formateo de la salida.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task y Crew se crean por cada petición (no al arrancar la app).

    task = Task(

        description=(

            "Historial de la conversación:\n{crew_chat_messages}\n\n"

            "Responde al último mensaje del usuario."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True devuelve CrewStreamingOutput en vez de un único CrewOutput.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Omite eventos no textuales (por ejemplo, llamadas a herramientas).

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Solo envía texto después del marcador "Final Answer:"

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Limpia posibles restos de markdown del output de CrewAI.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Fallback para respuestas cortas sin el marcador "Final Answer:"

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Ahora vamos con LlamaIndex, que sigue un enfoque diferente centrado en un modelo nativo de streaming basado en eventos.

LlamaIndex

A diferencia de los otros frameworks de esta guía, LlamaIndex se diseñó para conectar LLMs con fuentes de datos externas (almacenamiento de documentos, índices, pipelines de recuperación). Su capa de agentes, FunctionAgent, se apoya en esa base para recuperar y razonar sobre contexto estructurado, en vez de diálogo abierto o ejecución de tareas.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

Para mantener la continuidad de la conversación, el proxy transforma los mensajes entrantes en mensajes de chat de LlamaIndex y los divide en el último turno del usuario (user_msg) y los turnos previos (chat_history). Cada evento AgentStream tiene en event.delta el siguiente fragmento de texto, que se mapea directamente a un chunk delta.content al estilo OpenAI. Los deltas no vacíos se pueden reenviar tal cual, lo que hace que este sea el puente de streaming más sencillo de la guía. El stream contiene tanto eventos de orquestación (llamadas a herramientas, resultados) como eventos de voz (fragmentos de texto del asistente). Para que la salida de voz sea limpia, el proxy solo mantiene eventos AgentStream y omite los deltas vacíos.

[1] AgentStream (delta='')       ← ignorado

[2] ToolCall                     ← ignorado

[3] ToolCallResult               ← ignorado

[4] AgentStream (delta='It')     ← se reenvía ✓

[5] AgentStream (delta=' costs') ← se reenvía ✓

[6] AgentStream (delta=' $49.99')← se reenvía ✓

Esta separación mantiene la mecánica de herramientas fuera del audio y permite una voz incremental y de baja latencia. El siguiente handler reúne todos estos pasos.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Se asume que el último mensaje siempre es del usuario y tiene contenido de tipo string.

    # En producción, añade comprobaciones para roles/contenido no textual.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndex es menos estricto con los patrones de runtime conversacional de principio a fin que los frameworks con capas de orquestación más pesadas. En despliegues en producción, normalmente esto requiere que el cliente implemente la gestión de sesiones, controles de respuesta, orquestación de herramientas y trazabilidad.

Conclusión

Todos los frameworks de esta guía se conectan a ElevenLabs mediante el mismo contrato: aceptar una petición tipo Completions o Responses al estilo OpenAI y devolver fragmentos SSE en streaming. Así, los equipos pueden añadir orquestación de voz sobre una implementación de agentes ya existente con cambios mínimos, manteniendo lo que ya tienen y desbloqueando IA conversacional en tiempo real. Esta modularidad es clave en la plataforma ElevenAgents. Tanto si amplías un agente existente como si creas uno nativo de voz desde cero, la orquestación de voz de ElevenAgents se adapta a tu caso.

Si ya usas un agente con un framework open-source y quieres añadir voz, prueba este enfoque y cuéntanos tu experiencia.

Descubre artículos del equipo de ElevenLabs

Crea con el audio IA de la más alta calidad