Pular para o conteúdo

Guia prático: frameworks de agentes open-source e ElevenAgents

Conectando frameworks de agentes open-source à voz da ElevenLabs via Custom LLM.

Black square with some squiggly lines.

No nosso post anterior sobre Integração de Agentes Externos com Orquestração de Voz da ElevenLabs, mostramos como equipes podem conectar sua orquestração de agentes baseada em texto à ElevenLabs usando o LLM personalizado. Com base nesse conceito, este guia mostra como frameworks de agentes open-source podem ser adaptados e usados por trás da interface do Custom LLM. O resultado é uma arquitetura flexível, onde a voz é adicionada a sistemas de agentes já maduros sem comprometer o controle de estado, orquestração de ferramentas ou lógica específica da aplicação. Em todos os frameworks, seguimos o mesmo padrão de três etapas: criar uma requisição de geração, extrair a resposta final em texto e reformatar no padrão Server-Sent Events (SSE) compatível com OpenAI. A ElevenLabs suporta tanto os formatos Chat Completionsquanto Respostas. Embora este guia cubra quatro frameworks populares, o padrão pode ser aplicado a qualquer runtime que produza saída em streaming compatível com OpenAI.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Configuração geral

Os exemplos desta seção usam Python e FastAPI, mas qualquer stack que aceite requisições HTTP POST e respostas SSE em streaming funciona. Quando a orquestração de voz da ElevenLabs detecta o fim provável de uma fala, ela faz uma requisição de geração para o endpoint Custom LLM configurado. Aqui, mostramos os componentes principais dessa camada de tradução, que serve como ponte entre a orquestração de voz e o framework de agentes.

Cada framework pode ser escolhido por clientes por familiaridade ou por atender a um objetivo específico. O LlamaIndex, por exemplo, foi criado para facilitar a configuração de RAG (Retrieval-Augmented Generation), enquanto o CrewAI foi feito para automatizar tarefas definidas. Objetivos diferentes geram estruturas de resposta diferentes, e cada uma exige um tratamento específico. Fazer o streaming dos pedaços de resposta conforme o LLM gera, em vez de esperar a resposta completa, é fundamental para que o modelo de Transformar Texto em Áudio (TTS) comece a gerar áudio mais cedo, reduzindo a latência percebida. Aqui focamos nos quatro frameworks populares: LangGraph, Google ADK, CrewAI e LlamaIndex.

Nota sobre código compartilhado

Cada framework precisa enviar respostas em pedaços SSE compatíveis com OpenAI. Apresentamos uma função auxiliar usada nos exemplos para montar esses pedaços.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

Com essa base pronta, vamos começar com o LangGraph.

LangGraph

O LangGraph modela agentes como grafos, onde os nós representam etapas individuais e as arestas definem o fluxo de controle entre elas. A configuração mínima é simples: inicialize um modelo de chat, defina as ferramentas do agente e crie o runtime do grafo do agente.

    }

    return f"data: {json.dumps(payload)}\n\n"

Para cada solicitação de geração, o Agente LangGraph recebe todo o histórico da conversa, o que permite manter o estado necessário internamente. O LangGraph oferece persistência no servidor via

Com a gestão de estado resolvida, a próxima decisão específica do LangGraph é o modo de streaming, onde o LangGraph oferece duas opções, cada uma adequada para um caso de uso diferente:

O LangGraph modela agentes como grafos, onde nós representam etapas individuais e as conexões definem o fluxo de controle entre elas. A configuração mínima é simples: inicialize um modelo de chat, defina as ferramentas do agente e crie o runtime do grafo do agente.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

Para cada requisição de geração, o agente LangGraph recebe todo o histórico da conversa, permitindo manter o estado necessário internamente. O LangGraph suporta persistência do lado do servidor via Checkpoints, mas não abordamos isso aqui para manter a implementação simples.

Com o controle de estado resolvido, a próxima decisão específica do LangGraph é o modo de streaming, onde há duas opções, cada uma adequada para um caso de uso:

  • stream_mode="values" fornece snapshots do estado do grafo. É mais simples de implementar, mas inclui o estado completo da mensagem em cada resposta, o que aumenta a latência em conversas em tempo real.
  • parâmetros extras

Com a mensagem e a sessão prontas, o runner pode ser chamado. As chamadas e resultados das ferramentas ainda aparecem como eventos internos do ADK durante a execução, mas são tratados como etapas intermediárias de orquestração, não como saída para o usuário. Isso elimina a necessidade de um filtro manual, diferente de frameworks onde as chamadas de ferramentas aparecem como texto visível para o usuário.

O handler abaixo é uma implementação simplificada, incluindo a resolução da sessão e a lógica de criar ou recuperar a sessão.

[2] A ferramenta executa e retorna dados (result="$24.99")

[3] O modelo gera a resposta usando o resultado (content="Custa $24.99")

Agora, vamos ver o CrewAI, que é mais focado em tarefas por natureza.

CrewAI

O CrewAI foi criado para orquestrar fluxos de trabalho com múltiplos agentes em tarefas estruturadas (pesquisa, redação, resumo), em vez de loops de diálogo abertos. Os agentes são definidos com papel, objetivo e histórico. A execução gira em torno de objetos Task, cada um com uma descrição clara e resultado esperado.

    input = {"messages": req.messages}

    async def stream():

Diferente do modelo de loop de agente usado no LangGraph e ADK, o CrewAI normalmente constrói Task e Crew por solicitação, definindo a unidade de trabalho para aquela rodada da conversa. Mantemos o contexto da conversa ao inserir as interações anteriores na próxima tarefa usando um placeholder. A variável {crew_chat_messages} é preenchida a cada solicitação com o histórico da conversa e depois inserida na descrição da tarefa no momento da execução. Também buscamos gerar texto limpo e pronto para fala, filtrando explicitamente padrões intermediários de rastreamento (Thought, Action, Action Input, Observation) e emitindo apenas o texto da resposta final.

O handler abaixo reúne a construção de tarefas por solicitação, interpolação do histórico, streaming no nível do Crew, filtragem de rastreamento e formatação da saída.

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Só encaminhar pedaços de texto do modelo; ignorar atualizações de ferramentas e eventos não textuais.

Agora, vamos ver o LlamaIndex, que segue um caminho diferente, focado em um modelo nativo de streaming orientado a eventos.

LlamaIndex

Diferente dos outros frameworks abordados aqui, o LlamaIndex foi criado para conectar LLMs a fontes externas de dados (repositórios de documentos, índices, pipelines de busca). Sua camada de agente, FunctionAgent, atua sobre essa base para buscar e raciocinar sobre contexto estruturado, em vez de executar diálogos abertos ou tarefas.

            if not content:

                continue

Para manter a continuidade da conversa, o proxy transforma as mensagens recebidas em mensagens de chat do LlamaIndex, separando a última mensagem do usuário (user_msg) das anteriores (chat_history). Cada evento AgentStream tem um campo event.delta com o próximo fragmento de texto, que corresponde diretamente a um chunk delta.content no estilo OpenAI. Deltas não vazios podem ser enviados como estão, tornando esse o streaming mais simples do guia. O stream inclui eventos de orquestração (chamadas de ferramentas, resultados) e eventos de fala (deltas de texto do assistente). Para manter a saída de voz limpa, o proxy mantém apenas eventos AgentStream e ignora deltas vazios.

Para manter a continuidade da conversa, o proxy transforma as mensagens recebidas em mensagens de chat do LlamaIndex, depois separa na última fala do usuário (user_msg) e nas anteriores (chat_history). Cada campo event.delta do AgentStream contém o próximo fragmento de texto, que corresponde diretamente a um delta.content no estilo OpenAI. Deltas não vazios podem ser encaminhados como estão, tornando essa a ponte de streaming mais simples deste guia. O stream inclui eventos de orquestração (chamadas de ferramentas, resultados) e eventos de fala (deltas de texto do assistente). Para manter a saída de voz limpa, o proxy mantém apenas eventos AgentStream e ignora deltas vazios.

[1] AgentStream (delta='')       ← ignorado

Essa separação mantém a mecânica intermediária das ferramentas fora do áudio falado, preservando a fala incremental com baixa latência. O handler abaixo reúne esses passos.

            yield sse_chunk(response_id, {"content": content})

O LlamaIndex é menos prescritivo quanto aos padrões de execução de conversas do que frameworks com camadas de orquestração mais robustas. Em ambientes de produção, normalmente é necessário que o cliente implemente o gerenciamento de sessões, limites de resposta, orquestração de ferramentas e rastreamento.

O LlamaIndex é menos prescritivo sobre padrões de execução de conversas de ponta a ponta do que frameworks com camadas de orquestração mais robustas. Em ambientes de produção, isso normalmente exige que os clientes implementem controle de sessões, limites de resposta, orquestração de ferramentas e rastreamento.

Conclusão

Cada framework neste guia se conecta à ElevenLabs pelo mesmo contrato: aceita uma requisição de Completions ou Responses no estilo OpenAI e retorna blocos SSE em streaming. Isso permite que equipes adicionem orquestração de voz em cima de uma implementação de agente já existente com poucas mudanças, preservando o que já foi construído e liberando o uso em tempo real de

Se você já usa um agente com um framework open-source e quer ativar voz, experimente essa abordagem e conte pra gente o que achou.

Agora, vamos para as particularidades do Google Agent Development Kit (ADK)

Google ADK

O ADK do Google abstrai o loop de execução em alguns componentes principais: Agent, Runner e SessionService. O Runner do ADK fica entre a camada HTTP e a definição do agente. Ele gerencia o roteamento de mensagens, orquestração de ferramentas, ciclo de vida da sessão e streaming de eventos.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Com agente, backend de sessão e runner inicializados, o proxy resolve ou cria uma sessão ADK para cada requisição recebida. No ADK, o session_id controla a persistência da memória: reutilizar o mesmo session_id entre turnos mantém o histórico, chamadas de ferramentas e respostas anteriores. Como a identidade da conversa está na ElevenLabs, o proxy faz esse mapeamento de forma explícita. Ao passar o identificador correto na requisição de geração, o SDK consegue lidar com o contexto anterior internamente. Passamos esse identificador arbitrário no início da conversa através de parâmetros extras enviados no corpo da requisição.

Com a mensagem e a sessão prontas, o runner pode ser chamado. Chamadas de ferramentas e resultados ainda aparecem como eventos internos do ADK durante a execução, mas são tratados como etapas intermediárias de orquestração, não como saída para o usuário. Isso elimina a necessidade de filtro manual, diferente de frameworks onde chamadas de ferramentas aparecem como texto visível para o usuário.

O handler abaixo é uma implementação simplificada, incluindo resolução e criação de sessão no próprio código.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # Em produção, prefira um identificador estável do seu sistema de origem.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # No modo SSE, o ADK emite eventos parciais (incrementais) e finais (completos).

            # Encaminhar apenas eventos parciais evita duplicar o texto completo.
            # Observação: o streaming SSE é experimental no ADK. Em produção, trate

            # ambos os tipos de evento caso o backend do modelo não envie parciais.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Agora, vamos ver o CrewAI, que é mais centrado em tarefas.

CrewAI

O CrewAI foi criado para orquestrar fluxos de trabalho multiagente em tarefas estruturadas (pesquisar, escrever, resumir), e não em diálogos abertos. Os agentes são definidos com papel, objetivo e contexto. A execução gira em torno de objetos Task, cada um com descrição clara e saída esperada.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

Diferente do modelo de loop de agente usado no LangGraph e ADK, o CrewAI normalmente monta Task e Crew por requisição, definindo a unidade de trabalho daquele turno da conversa. Mantemos o contexto da conversa injetando os turnos anteriores na próxima tarefa via um placeholder. A variável {crew_chat_messages} é preenchida a cada requisição com o histórico da conversa, e interpolada na descrição da tarefa na execução. Para garantir um texto limpo e pronto para fala, filtramos explicitamente padrões intermediários (Thought, Action, Action Input, Observation) e emitimos apenas o texto da resposta final.

O handler abaixo reúne a construção da tarefa por requisição, interpolação do histórico, streaming no nível Crew, filtragem de trace e formatação da saída.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task e Crew são montados por requisição (não no início da aplicação).

    task = Task(

        description=(

            "Histórico da conversa:\n{crew_chat_messages}\n\n"

            "Responda à última mensagem do usuário."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True retorna CrewStreamingOutput em vez de CrewOutput único.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Ignorar eventos não textuais (ex: chamadas de ferramentas).

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Só encaminhar texto após o marcador "Final Answer:"

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Limpar possíveis resíduos de markdown do CrewAI.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Fallback para respostas curtas sem o marcador "Final Answer:"

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Agora, vamos para o LlamaIndex, que segue um modelo nativo de streaming orientado a eventos.

LlamaIndex

Diferente dos outros frameworks deste post, o LlamaIndex foi criado para conectar LLMs a fontes externas de dados (bancos de documentos, índices, pipelines de busca). Sua camada de agente, FunctionAgent, trabalha em cima disso para buscar e raciocinar sobre contexto estruturado, e não para diálogo aberto ou execução de tarefas.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

Para manter a continuidade da conversa, o proxy transforma as mensagens recebidas em mensagens de chat do LlamaIndex, separando o último turno do usuário (user_msg) dos turnos anteriores (chat_history). Cada evento AgentStream tem o próximo fragmento de texto em event.delta, que vai direto para um chunk delta.content no padrão OpenAI. Deltas não vazios podem ser enviados como estão, tornando esse o streaming mais simples do guia. O stream contém tanto eventos de orquestração (chamadas de ferramentas, resultados) quanto eventos de fala (deltas de texto do assistente). Para manter o áudio limpo, o proxy mantém apenas eventos AgentStream e ignora deltas vazios.

[1] AgentStream (delta='')       ← ignorado

[2] ToolCall                     ← ignorado

[3] ToolCallResult               ← ignorado

[4] AgentStream (delta='It')     ← encaminhado ✓

[5] AgentStream (delta=' costs') ← encaminhado ✓

[6] AgentStream (delta=' $49.99')← encaminhado ✓

Essa separação mantém a mecânica intermediária de ferramentas fora do áudio, garantindo baixa latência e fala incremental. O handler abaixo reúne esses passos.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Aqui assumimos que a última mensagem é sempre do usuário e contém texto.

    # Em produção, trate casos de payloads não textuais ou papéis diferentes.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

O LlamaIndex é menos prescritivo sobre padrões de runtime de conversação do que frameworks com camadas de orquestração mais robustas. Em produção, normalmente é preciso implementar controle de sessão, filtros de resposta, orquestração de ferramentas e tracing.

Conclusão

Cada framework deste guia se conecta à ElevenLabs pelo mesmo contrato: aceitar uma requisição de Completions ou Responses no padrão OpenAI e devolver pedaços SSE em streaming. Isso permite adicionar orquestração de voz a uma implementação de agente já existente com poucas mudanças, preservando o que já foi construído e liberando conversas em tempo real com IA. Essa modularidade é um princípio central da plataforma ElevenAgents. Seja para ampliar um agente já existente ou criar algo nativo em voz desde o início, a orquestração de voz do ElevenAgents está pronta para atender você.

Se você já usa um agente com framework open-source e quer ativar voz, experimente este caminho e conte para a gente o que achou.

Explore artigos da equipe ElevenLabs

Crie com o áudio de IA da mais alta qualidade