Pular para o conteúdo

Guia prático: frameworks de agentes open-source e ElevenAgents

Conectando frameworks de agentes open-source à voz da ElevenLabs via Custom LLM.

Black square with some squiggly lines.

No nosso post anterior sobre Integração de Agentes Externos com Orquestração de Voz da ElevenLabs, mostramos como equipes podem conectar sua orquestração de agentes baseada em texto à ElevenLabs usando o LLM personalizado. Com base nesse conceito, este guia mostra como frameworks de agentes open-source podem ser adaptados e usados por trás da interface do Custom LLM. O resultado é uma arquitetura flexível, onde a voz é adicionada a sistemas de agentes já maduros sem comprometer o controle de estado, orquestração de ferramentas ou lógica específica da aplicação. Em todos os frameworks, seguimos o mesmo padrão de três etapas: criar uma requisição de geração, extrair a resposta final em texto e reformatar no padrão Server-Sent Events (SSE) compatível com OpenAI. A ElevenLabs suporta tanto os formatos Chat Completionsquanto Respostas. Embora este guia cubra quatro frameworks populares, o padrão pode ser aplicado a qualquer runtime que produza saída em streaming compatível com OpenAI.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

Configuração geral

Os exemplos desta seção usam Python e FastAPI, mas qualquer stack que aceite requisições HTTP POST e respostas SSE em streaming funciona. Quando a orquestração de voz da ElevenLabs detecta o fim provável de uma fala, ela faz uma requisição de geração para o endpoint Custom LLM configurado. Aqui, mostramos os componentes principais dessa camada de tradução, que serve como ponte entre a orquestração de voz e o framework de agentes.

Cada framework pode ser escolhido por clientes por familiaridade ou por atender a um objetivo específico. O LlamaIndex, por exemplo, foi criado para facilitar a configuração de RAG (Retrieval-Augmented Generation), enquanto o CrewAI foi feito para automatizar tarefas definidas. Objetivos diferentes geram estruturas de resposta diferentes, e cada uma exige um tratamento específico. Fazer o streaming dos pedaços de resposta conforme o LLM gera, em vez de esperar a resposta completa, é fundamental para que o modelo de Transformar Texto em Áudio (TTS) comece a gerar áudio mais cedo, reduzindo a latência percebida. Aqui focamos nos quatro frameworks populares: LangGraph, Google ADK, CrewAI e LlamaIndex.

Nota sobre código compartilhado

Cada framework precisa enviar respostas em pedaços SSE compatíveis com OpenAI. Apresentamos uma função auxiliar usada nos exemplos para montar esses pedaços.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

        "id": response_id,

        "object": "chat.completion.chunk",

        "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],

    }

    return f"data: {json.dumps(payload)}\n\n"

Com essa base pronta, vamos começar pelo LangGraph.

LangGraph

O LangGraph modela agentes como grafos, onde nós representam etapas individuais e as conexões definem o fluxo de controle entre elas. A configuração mínima é simples: inicialize um modelo de chat, defina as ferramentas do agente e crie o runtime do grafo do agente.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

Para cada requisição de geração, o agente LangGraph recebe todo o histórico da conversa, permitindo manter o estado necessário internamente. O LangGraph suporta persistência do lado do servidor via Checkpoints, mas não abordamos isso aqui para manter a implementação simples.

Com o controle de estado resolvido, a próxima decisão específica do LangGraph é o modo de streaming, onde há duas opções, cada uma adequada para um caso de uso:

  • stream_mode="values" fornece snapshots do estado do grafo. É mais simples de implementar, mas inclui o estado completo da mensagem em cada resposta, o que aumenta a latência em conversas em tempo real.
  • stream_mode="messages" faz o streaming de pedaços incrementais da mensagem do modelo. Geralmente é o preferido para interações de voz em tempo real, pois reduz o tempo até o primeiro áudio na orquestração da ElevenLabs.

Mais especificamente, a implementação baseada em mensagens inclui etapas intermediárias, como atualizações de chamadas de ferramentas, que não devem ser faladas. O proxy filtra essas etapas, enviando apenas o texto da resposta para o TTS. Veja um exemplo de turno com ferramenta:

[1] O modelo decide chamar uma ferramenta (tool_calls=["get_price"])

[2] A ferramenta executa e retorna dados (result="$24.99")

[3] O modelo gera a resposta usando o resultado (content="Custa $24.99")

Naturalmente, apenas os pedaços do passo 3 devem ser enviados no stream SSE. Na prática, dois filtros garantem isso no loop de streaming: um para manter apenas eventos com langgraph_node == "model" e outro que ignora conteúdo vazio. Assim, apenas o texto do assistente é enviado para a ElevenLabs como SSE. Veja uma implementação leve do proxy de requisição:

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    input = {"messages": req.messages}

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # Só encaminhar pedaços de texto do modelo; ignorar atualizações de ferramentas e eventos não textuais.

            if metadata.get("langgraph_node") != "model":

                continue

            content = getattr(message_chunk, "content", None)

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            # Enviar pedaços incrementais para a ElevenLabs no formato OpenAI.

            yield sse_chunk(response_id, {"content": content})

         # Sinalizar conclusão natural antes de usar finish_reason: "stop" [DONE]

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Assim, apenas os pedaços de texto do modelo são enviados para a ElevenLabs. Como o LangGraph expõe a execução interna de ferramentas pelo stream de estado, o filtro é explícito e controlado pelo proxy.

Agora, vamos para as particularidades do Google Agent Development Kit (ADK)

Google ADK

O ADK do Google abstrai o loop de execução em alguns componentes principais: Agent, Runner e SessionService. O Runner do ADK fica entre a camada HTTP e a definição do agente. Ele gerencia o roteamento de mensagens, orquestração de ferramentas, ciclo de vida da sessão e streaming de eventos.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

Com agente, backend de sessão e runner inicializados, o proxy resolve ou cria uma sessão ADK para cada requisição recebida. No ADK, o session_id controla a persistência da memória: reutilizar o mesmo session_id entre turnos mantém o histórico, chamadas de ferramentas e respostas anteriores. Como a identidade da conversa está na ElevenLabs, o proxy faz esse mapeamento de forma explícita. Ao passar o identificador correto na requisição de geração, o SDK consegue lidar com o contexto anterior internamente. Passamos esse identificador arbitrário no início da conversa através de parâmetros extras enviados no corpo da requisição.

Com a mensagem e a sessão prontas, o runner pode ser chamado. Chamadas de ferramentas e resultados ainda aparecem como eventos internos do ADK durante a execução, mas são tratados como etapas intermediárias de orquestração, não como saída para o usuário. Isso elimina a necessidade de filtro manual, diferente de frameworks onde chamadas de ferramentas aparecem como texto visível para o usuário.

O handler abaixo é uma implementação simplificada, incluindo resolução e criação de sessão no próprio código.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # Em produção, prefira um identificador estável do seu sistema de origem.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # No modo SSE, o ADK emite eventos parciais (incrementais) e finais (completos).

            # Encaminhar apenas eventos parciais evita duplicar o texto completo.
            # Observação: o streaming SSE é experimental no ADK. Em produção, trate

            # ambos os tipos de evento caso o backend do modelo não envie parciais.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

Agora, vamos ver o CrewAI, que é mais centrado em tarefas.

CrewAI

O CrewAI foi criado para orquestrar fluxos de trabalho multiagente em tarefas estruturadas (pesquisar, escrever, resumir), e não em diálogos abertos. Os agentes são definidos com papel, objetivo e contexto. A execução gira em torno de objetos Task, cada um com descrição clara e saída esperada.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

Diferente do modelo de loop de agente usado no LangGraph e ADK, o CrewAI normalmente monta Task e Crew por requisição, definindo a unidade de trabalho daquele turno da conversa. Mantemos o contexto da conversa injetando os turnos anteriores na próxima tarefa via um placeholder. A variável {crew_chat_messages} é preenchida a cada requisição com o histórico da conversa, e interpolada na descrição da tarefa na execução. Para garantir um texto limpo e pronto para fala, filtramos explicitamente padrões intermediários (Thought, Action, Action Input, Observation) e emitimos apenas o texto da resposta final.

O handler abaixo reúne a construção da tarefa por requisição, interpolação do histórico, streaming no nível Crew, filtragem de trace e formatação da saída.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task e Crew são montados por requisição (não no início da aplicação).

    task = Task(

        description=(

            "Histórico da conversa:\n{crew_chat_messages}\n\n"

            "Responda à última mensagem do usuário."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True retorna CrewStreamingOutput em vez de CrewOutput único.

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # Ignorar eventos não textuais (ex: chamadas de ferramentas).

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # Só encaminhar texto após o marcador "Final Answer:"

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # Limpar possíveis resíduos de markdown do CrewAI.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # Fallback para respostas curtas sem o marcador "Final Answer:"

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

Agora, vamos para o LlamaIndex, que segue um modelo nativo de streaming orientado a eventos.

LlamaIndex

Diferente dos outros frameworks deste post, o LlamaIndex foi criado para conectar LLMs a fontes externas de dados (bancos de documentos, índices, pipelines de busca). Sua camada de agente, FunctionAgent, trabalha em cima disso para buscar e raciocinar sobre contexto estruturado, e não para diálogo aberto ou execução de tarefas.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

Para manter a continuidade da conversa, o proxy transforma as mensagens recebidas em mensagens de chat do LlamaIndex, separando o último turno do usuário (user_msg) dos turnos anteriores (chat_history). Cada evento AgentStream tem o próximo fragmento de texto em event.delta, que vai direto para um chunk delta.content no padrão OpenAI. Deltas não vazios podem ser enviados como estão, tornando esse o streaming mais simples do guia. O stream contém tanto eventos de orquestração (chamadas de ferramentas, resultados) quanto eventos de fala (deltas de texto do assistente). Para manter o áudio limpo, o proxy mantém apenas eventos AgentStream e ignora deltas vazios.

[1] AgentStream (delta='')       ← ignorado

[2] ToolCall                     ← ignorado

[3] ToolCallResult               ← ignorado

[4] AgentStream (delta='It')     ← encaminhado ✓

[5] AgentStream (delta=' costs') ← encaminhado ✓

[6] AgentStream (delta=' $49.99')← encaminhado ✓

Essa separação mantém a mecânica intermediária de ferramentas fora do áudio, garantindo baixa latência e fala incremental. O handler abaixo reúne esses passos.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Aqui assumimos que a última mensagem é sempre do usuário e contém texto.

    # Em produção, trate casos de payloads não textuais ou papéis diferentes.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

O LlamaIndex é menos prescritivo sobre padrões de runtime de conversação do que frameworks com camadas de orquestração mais robustas. Em produção, normalmente é preciso implementar controle de sessão, filtros de resposta, orquestração de ferramentas e tracing.

Conclusão

Cada framework deste guia se conecta à ElevenLabs pelo mesmo contrato: aceitar uma requisição de Completions ou Responses no padrão OpenAI e devolver pedaços SSE em streaming. Isso permite adicionar orquestração de voz a uma implementação de agente já existente com poucas mudanças, preservando o que já foi construído e liberando conversas em tempo real com IA. Essa modularidade é um princípio central da plataforma ElevenAgents. Seja para ampliar um agente já existente ou criar algo nativo em voz desde o início, a orquestração de voz do ElevenAgents está pronta para atender você.

Se você já usa um agente com framework open-source e quer ativar voz, experimente este caminho e conte para a gente o que achou.

Explore artigos da equipe ElevenLabs

Crie com o áudio de IA da mais alta qualidade