
Guía práctica: frameworks de agentes open-source y ElevenAgents
Conecta frameworks de agentes open-source con la voz de ElevenLabs usando Custom LLM.
En nuestra publicación anterior sobre Integrar agentes externos con la orquestación de voz de ElevenLabs, explicamos cómo los equipos pueden conectar su orquestación de agentes basada en texto con ElevenLabs a través de LLM personalizado. Sobre esa base, en esta guía mostramos cómo adaptar y desplegar los principales frameworks de agentes open-source detrás de la interfaz de Custom LLM. El resultado es una arquitectura flexible donde la voz se añade a sistemas de agentes consolidados sin perder el control de estado, la orquestación de herramientas o el control específico de cada aplicación. En todos los frameworks seguimos el mismo patrón de tres pasos: crear una petición de generación, extraer la respuesta final en texto y reformatearla en formato Server-Sent Events (SSE) compatible con OpenAI. ElevenLabs es compatible con los formatos Completado de chaty Respuestas. Aunque aquí cubrimos cuatro frameworks muy usados, este patrón se puede aplicar a cualquier runtime que genere streaming compatible con OpenAI.
.webp&w=3840&q=95)
Configuración general
Los ejemplos de esta sección usan Python y FastAPI, aunque cualquier stack que gestione peticiones HTTP POST y respuestas SSE en streaming funcionará. Cuando la orquestación de voz de ElevenLabs detecta el final probable de un turno, lanza una petición de generación a la ruta de Custom LLM configurada. Aquí repasamos los componentes clave de esa capa de traducción, el puente o proxy que hace que la orquestación de voz y el framework de agentes hablen el mismo idioma.
Es lógico que cada framework se elija por familiaridad o por su capacidad para cumplir un objetivo concreto. LlamaIndex, por ejemplo, se creó para facilitar la configuración de Retrieval-Augmented Generation (RAG), mientras que CrewAI se diseñó para automatizar tareas definidas en la era de los agentes. Diferentes objetivos de diseño generan respuestas distintas y cada uno requiere un manejo específico. Hacer streaming de los fragmentos a medida que el LLM los genera, en vez de esperar a que termine todo el turno, es clave porque permite que el modelo Texto a Voz (TTS) empiece a generar audio antes y así se reduce la latencia percibida. Nos centramos en cuatro frameworks populares: LangGraph, Google ADK, CrewAI y LlamaIndex.
Nota sobre el código compartido
Cada framework debe enviar las respuestas como fragmentos SSE compatibles con OpenAI. Aquí presentamos una pequeña función de ayuda que se usa en todos los ejemplos para construir estos fragmentos.
def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:
payload = {
"id": response_id,
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return f"data: {json.dumps(payload)}\n\n"
Con esto listo, empezamos con LangGraph.
LangGraph
LangGraph modela los agentes como grafos, donde los nodos representan pasos individuales y los enlaces definen el flujo de control entre ellos. La configuración mínima es sencilla: inicializa un modelo de chat, define las herramientas del agente y crea el runtime del grafo de agentes.
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
api_key=os.getenv("OPENAI_API_KEY"),
llm, tools=tool_list, system_prompt=system_prompt,)
En cada petición de generación, el agente de LangGraph recibe todo el historial de la conversación, lo que le permite mantener el estado necesario internamente. LangGraph permite persistencia en servidor mediante Puntos de control, aunque aquí no los tratamos para mantener la implementación sencilla.
Con la gestión de estado resuelta, la siguiente decisión específica de LangGraph es el modo de streaming, donde LangGraph ofrece dos opciones, cada una pensada para un caso de uso distinto:
- stream_mode="values" da instantáneas del estado del grafo. Es más fácil de implementar pero incluye el estado completo del mensaje en cada respuesta, lo que añade latencia en flujos conversacionales en tiempo real.
- stream_mode="messages" envía fragmentos incrementales de mensajes desde el modelo. Suele ser la mejor opción para interacciones de voz en tiempo real, ya que reduce el tiempo hasta el primer audio en la capa de orquestación de ElevenLabs.
En concreto, la implementación por mensajes del bucle del agente incluye pasos intermedios como actualizaciones de llamadas a herramientas, que no deben leerse en voz alta. El proxy filtra estos eventos y solo pasa el texto relevante para el usuario a la capa TTS. Aquí tienes un ejemplo de un turno con herramienta:
[1] El modelo decide llamar a una herramienta (tool_calls=["get_price"])
[2] La herramienta se ejecuta y devuelve datos (result="$24.99")
[3] El modelo genera la respuesta usando el resultado (content="Cuesta $24.99")
Solo los fragmentos del paso 3 deben enviarse en el stream SSE. En la práctica, dos comprobaciones filtran estos fragmentos en el bucle de streaming: una para quedarse solo con los eventos langgraph_node == "model" y otra que omite contenido vacío. Así, solo el texto relevante para el usuario llega a ElevenLabs como SSE. Juntando estos conceptos, aquí tienes una implementación ligera del proxy de peticiones.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
input = {"messages": req.messages}
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):
# Solo envía fragmentos de texto del modelo; omite actualizaciones de herramientas y eventos no textuales.
if metadata.get("langgraph_node") != "model":
continue
content = getattr(message_chunk, "content", None)
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
# Envía fragmentos incrementales tipo token a ElevenLabs en formato OpenAI.
yield sse_chunk(response_id, {"content": content})
# Señala la finalización natural antes de usar finish_reason: "stop" [DONE]
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Así solo se reenvían a ElevenLabs los fragmentos del modelo relevantes para el usuario. Como LangGraph hace visible la ejecución interna de herramientas a través del stream de estado, el filtrado es explícito y lo controla el proxy.
Ahora pasamos a los matices de trabajar con el Agent Development Kit (ADK) de Google
Google ADK
El ADK de Google abstrae el bucle de ejecución tras unos pocos elementos clave: Agent, Runner y SessionService. El Runner de ADK se sitúa entre la capa HTTP y la definición del agente. Gestiona el enrutamiento de mensajes, la orquestación de herramientas, el ciclo de vida de la sesión y el streaming de eventos.
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types as genai_types
agent = Agent(
name=name,
model=model,
instruction=instruction,
tools=[tool_list],
)
session_service = InMemorySessionService()
runner = Runner(
agent=agent,
app_name=app_name,
session_service=session_service
)
Con el agente, el backend de sesión y el runner inicializados, el proxy resuelve o crea una sesión ADK para cada petición entrante. En ADK, session_id controla la persistencia de memoria: reutilizar el mismo session_id entre turnos mantiene el historial, las llamadas a herramientas y las respuestas previas. Como la identidad de la conversación está aguas arriba en ElevenLabs, el proxy gestiona este mapeo de forma explícita. Al pasar el identificador correcto en la petición de generación, el SDK puede gestionar el contexto anterior internamente. El identificador arbitrario se pasa al iniciar la conversación mediante parámetros extra incluidos en el cuerpo de la petición.
Con el mensaje y la sesión listos, se puede invocar el runner. Las llamadas y resultados de herramientas siguen apareciendo como eventos internos de ADK durante la ejecución, pero se tratan como pasos intermedios de orquestación y no como salida para el usuario. Así se evita tener que filtrar manualmente, a diferencia de otros frameworks donde las llamadas a herramientas aparecen como texto visible para el usuario.
El siguiente handler es una implementación simplificada que incluye la resolución y creación de sesión en línea.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request):
# En producción, usa un identificador estable de tu sistema principal.
session_id = req.elevenlabs_extra_body.arbitrary_identifier
session = await session_service.get_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
if not session:
session = await session_service.create_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")
content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if not event.content or not event.content.parts:
continue
# En modo SSE, ADK emite eventos parciales (incrementales) y finales (completos).
# Reenviar solo los parciales evita duplicar el texto completo. # Nota: el streaming SSE es experimental en ADK. En producción, gestiona
# ambos tipos de evento por si el backend del modelo no emite parciales.
if not getattr(event, "partial", False):
continue
text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)
if not text:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": text})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Ahora vamos con CrewAI, que está más orientado a tareas por diseño.
CrewAI
CrewAI se diseñó para orquestar flujos de trabajo multiagente en torno a tareas estructuradas (investigar, redactar, resumir) en vez de bucles de diálogo abiertos. Los agentes se definen con un rol, objetivo y backstory. La ejecución gira en torno a objetos Task, cada uno con una descripción clara y una salida esperada.
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from crewai.types.streaming import StreamChunkType
llm = LLM(
model=model_id,
api_key=os.getenv("OPENAI_API_KEY")
)
store_agent = Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm,
verbose=False,
)
A diferencia del modelo de bucle de agente de LangGraph y ADK, CrewAI suele crear Task y Crew por cada petición para definir la unidad de trabajo de ese turno en la conversación. El contexto conversacional se mantiene inyectando los turnos previos en la siguiente tarea mediante un placeholder. La variable {crew_chat_messages} se rellena en cada petición con el historial de la conversación y luego se inserta en la descripción de la tarea al ejecutarla. Además, filtramos explícitamente los patrones intermedios de trazado (Thought, Action, Action Input, Observation) y solo emitimos el texto final para que la salida sea limpia y lista para voz.
El siguiente handler reúne la construcción de tareas por petición, la interpolación del historial, el streaming a nivel Crew, el filtrado de trazas y el formateo de la salida.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Task y Crew se crean por cada petición (no al arrancar la app).
task = Task(
description=(
"Historial de la conversación:\n{crew_chat_messages}\n\n"
"Responde al último mensaje del usuario."
),
expected_output=expected_output,
agent=store_agent,
)
# stream=True devuelve CrewStreamingOutput en vez de un único CrewOutput.
crew = Crew(
agents=[store_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
stream=True,
)
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
final_marker = "final answer:"
marker_buffer = ""
marker_found = False
emitted_any_content = False
streaming = await crew.kickoff_async(
inputs={"crew_chat_messages": json.dumps(req.messages)}
)
async for chunk in streaming:
# Omite eventos no textuales (por ejemplo, llamadas a herramientas).
if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:
continue
# Solo envía texto después del marcador "Final Answer:"
if not marker_found:
marker_buffer += chunk.content
idx = marker_buffer.lower().find("final answer:")
if idx == -1:
continue
marker_found = True
content = marker_buffer[idx + 13:].lstrip()
marker_buffer = ""
else:
content = chunk.content
# Limpia posibles restos de markdown del output de CrewAI.
content = content.rstrip("`").rstrip()
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": content})
# Fallback para respuestas cortas sin el marcador "Final Answer:"
if not sent_role:
raw = getattr(streaming, "result", None)
fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()
if fallback:
yield sse_chunk(response_id, {"role": "assistant"})
yield sse_chunk(response_id, {"content": fallback})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
Ahora vamos con LlamaIndex, que sigue un enfoque diferente centrado en un modelo nativo de streaming basado en eventos.
LlamaIndex
A diferencia de los otros frameworks de esta guía, LlamaIndex se diseñó para conectar LLMs con fuentes de datos externas (almacenamiento de documentos, índices, pipelines de recuperación). Su capa de agentes, FunctionAgent, se apoya en esa base para recuperar y razonar sobre contexto estructurado, en vez de diálogo abierto o ejecución de tareas.
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent, AgentStream
from llama_index.core.base.llms.types import ChatMessage, MessageRole
llm = OpenAI(
model=model,
api_key=os.getenv("OPENAI_API_KEY")
)
agent = FunctionAgent(
tools=[list_inventory, get_item_price],
llm=llm,
system_prompt=system_prompt,
)
Para mantener la continuidad de la conversación, el proxy transforma los mensajes entrantes en mensajes de chat de LlamaIndex y los divide en el último turno del usuario (user_msg) y los turnos previos (chat_history). Cada evento AgentStream tiene en event.delta el siguiente fragmento de texto, que se mapea directamente a un chunk delta.content al estilo OpenAI. Los deltas no vacíos se pueden reenviar tal cual, lo que hace que este sea el puente de streaming más sencillo de la guía. El stream contiene tanto eventos de orquestación (llamadas a herramientas, resultados) como eventos de voz (fragmentos de texto del asistente). Para que la salida de voz sea limpia, el proxy solo mantiene eventos AgentStream y omite los deltas vacíos.
[1] AgentStream (delta='') ← ignorado
[2] ToolCall ← ignorado
[3] ToolCallResult ← ignorado
[4] AgentStream (delta='It') ← se reenvía ✓
[5] AgentStream (delta=' costs') ← se reenvía ✓
[6] AgentStream (delta=' $49.99')← se reenvía ✓
Esta separación mantiene la mecánica de herramientas fuera del audio y permite una voz incremental y de baja latencia. El siguiente handler reúne todos estos pasos.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Se asume que el último mensaje siempre es del usuario y tiene contenido de tipo string.
# En producción, añade comprobaciones para roles/contenido no textual.
chat_history = [
ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")
for m in req.messages
]
user_text = chat_history.pop().content
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
handler = agent.run(user_msg=user_text, chat_history=chat_history)
async for event in handler.stream_events():
if not isinstance(event, AgentStream):
continue
if not event.delta:
continue
yield sse_chunk(response_id, {"content": event.delta})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
LlamaIndex es menos estricto con los patrones de runtime conversacional de principio a fin que los frameworks con capas de orquestación más pesadas. En despliegues en producción, normalmente esto requiere que el cliente implemente la gestión de sesiones, controles de respuesta, orquestación de herramientas y trazabilidad.
Conclusión
Todos los frameworks de esta guía se conectan a ElevenLabs mediante el mismo contrato: aceptar una petición tipo Completions o Responses al estilo OpenAI y devolver fragmentos SSE en streaming. Así, los equipos pueden añadir orquestación de voz sobre una implementación de agentes ya existente con cambios mínimos, manteniendo lo que ya tienen y desbloqueando IA conversacional en tiempo real. Esta modularidad es clave en la plataforma ElevenAgents. Tanto si amplías un agente existente como si creas uno nativo de voz desde cero, la orquestación de voz de ElevenAgents se adapta a tu caso.
Si ya usas un agente con un framework open-source y quieres añadir voz, prueba este enfoque y cuéntanos tu experiencia.



