본문 바로가기

실전 가이드: 오픈소스 에이전트 프레임워크와 ElevenAgents

오픈소스 에이전트 프레임워크를 Custom LLM을 통해 ElevenLabs 음성에 연결하기.

Black square with some squiggly lines.

이전 글 외부 에이전트와 ElevenLabs 음성 오케스트레이션 통합하기에서, 기존 텍스트 기반 에이전트 오케스트레이션을 ElevenLabs에 커스텀 LLM을 통해 연결하는 방법을 소개했습니다. 이번 가이드에서는 대표적인 오픈소스 에이전트 프레임워크를 Custom LLM 인터페이스 뒤에서 어떻게 적용하고 배포할 수 있는지 보여줍니다. 이를 통해 음성을 성숙한 에이전트 시스템에 유연하게 결합할 수 있으며, 상태 관리, 도구 오케스트레이션, 앱별 제어 기능도 그대로 유지할 수 있습니다. 프레임워크에 상관없이, 생성 요청 만들기, 최종 텍스트 응답 추출, 그리고 OpenAI 호환 SSE(Server-Sent Events) 형식으로 재구성하는 3단계 패턴을 따릅니다. ElevenLabs는 채팅 완성응답 형식을 모두 지원합니다. 이 가이드에서는 널리 사용되는 네 가지 프레임워크를 다루지만, 이 패턴은 OpenAI 호환 스트리밍 출력을 생성할 수 있는 모든 런타임에 적용할 수 있습니다.

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

기본 설정

이 섹션의 예시는 Python과 FastAPI를 사용하지만, HTTP POST 요청과 스트리밍 SSE 응답을 처리할 수 있는 어떤 스택도 사용할 수 있습니다. ElevenLabs의 음성 오케스트레이션이 대화의 끝을 감지하면, 설정된 Custom LLM 엔드포인트로 생성 요청을 보냅니다. 이 섹션에서는 음성 오케스트레이션과 에이전트 프레임워크가 같은 언어로 소통할 수 있도록 해주는 번역 계층, 즉 브릿지(프록시)의 핵심 구성 요소를 설명합니다.

각 프레임워크는 익숙함이나 특정 목적에 맞는 기능 때문에 고객이 선택할 수 있습니다. 예를 들어, LlamaIndex는 RAG(Retrieval-Augmented Generation) 설정을 쉽게 하기 위해 개발되었고, CrewAI는 에이전트 시대에 명확한 작업 자동화를 위해 만들어졌습니다. 설계 목표가 다르면 응답 구조도 달라지며, 각각에 맞는 처리가 필요합니다. LLM이 생성하는 대로 청크를 스트리밍하면, 한 턴이 끝날 때까지 기다리지 않고 텍스트 음성 변환(TTS) 모델이 더 빨리 음성 생성을 시작할 수 있어 지연이 줄어듭니다. 이번 가이드에서는 LangGraph, Google ADK, CrewAI, LlamaIndex 네 가지 프레임워크에 집중합니다.

공통 코드에 대한 참고

각 프레임워크는 OpenAI 호환 SSE 청크로 응답을 스트리밍해야 합니다. 예시에서 공통적으로 사용하는 간단한 헬퍼 함수를 소개합니다.

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

이제 기반이 마련되었으니, LangGraph부터 시작해 보겠습니다.

LangGraph

LangGraph는 에이전트를 그래프로 모델링하며, 각 노드는 개별 단계를, 엣지는 단계 간의 제어 흐름을 나타냅니다. 최소 설정은 간단합니다. 챗 모델을 초기화하고, 에이전트 도구를 정의한 뒤, 에이전트 그래프 런타임을 생성하면 됩니다.

    }

    return f"data: {json.dumps(payload)}\n\n"

각 생성 요청마다 LangGraph 에이전트는 전체 대화 기록을 받아 내부적으로 필요한 상태를 유지할 수 있습니다. LangGraph는 서버 측 지속성을

상태 관리가 처리되면, 다음 LangGraph의 주요 결정 포인트는 스트리밍 모드입니다. LangGraph는 각각 다른 용도에 맞는 두 가지 옵션을 제공합니다:

LangGraph는 에이전트를 그래프로 모델링하며, 노드는 개별 단계, 엣지는 단계 간의 제어 흐름을 나타냅니다. 최소 설정은 간단합니다: 챗 모델 초기화, 에이전트 도구 정의, 에이전트 그래프 런타임 생성.

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

생성 요청마다 LangGraph 에이전트는 전체 대화 기록을 받아 내부적으로 필요한 상태를 유지할 수 있습니다. LangGraph는 체크포인트를 통한 서버 측 상태 저장도 지원하지만, 여기서는 구현을 간단히 하기 위해 다루지 않습니다.

상태 관리가 처리되면, 다음 LangGraph의 주요 결정 포인트는 스트리밍 모드입니다. LangGraph는 각각 다른 용도에 맞는 두 가지 옵션을 제공합니다:

  • stream_mode="values"는 그래프 상태 스냅샷을 제공합니다. 구현이 더 간단하지만, 각 응답마다 전체 메시지 상태가 포함되어 실시간 대화에서는 지연이 늘어날 수 있습니다.
  • 추가 파라미터

메시지와 세션이 준비되면 러너를 호출할 수 있습니다. 도구 호출과 결과는 실행 중 내부 ADK 이벤트로 나타나지만, 사용자에게 보여지는 출력이 아니라 중간 오케스트레이션 단계로 처리됩니다. 이로 인해 도구 호출이 사용자에게 노출되는 프레임워크와 달리 별도의 필터링이 필요하지 않습니다.

아래 핸들러는 세션 확인 및 생성 로직을 포함한 간단한 구현 예시입니다.

[2] 도구 실행 후 데이터 반환 (result="$24.99")

[3] 모델이 결과를 사용해 응답 생성 (content="It costs $24.99")

다음으로, 더 과업 중심적으로 설계된 CrewAI를 살펴보겠습니다.

CrewAI

CrewAI는 구조화된 과업(리서치, 작성, 요약 등)을 중심으로 여러 에이전트의 워크플로우를 오케스트레이션하도록 설계되었습니다. 에이전트는 역할, 목표, 배경 스토리로 정의됩니다. 실행은 각기 명확한 설명과 기대 결과를 가진 Task 객체를 중심으로 이루어집니다.

    input = {"messages": req.messages}

    async def stream():

LangGraph와 ADK에서 사용하는 에이전트 루프 모델과 달리, CrewAI는 보통 요청마다 Task와 Crew를 생성해 해당 턴의 작업 단위를 정의합니다. 대화 맥락은 이전 턴을 플레이스홀더를 통해 다음 작업에 주입하는 방식으로 이어집니다. {crew_chat_messages} 변수에 현재 대화 기록이 요청마다 채워지고, 실행 시 작업 설명에 삽입됩니다. 또한, 중간 추적 패턴(Thought, Action, Action Input, Observation)을 명확히 걸러내고 최종 답변 텍스트만 출력해, 음성으로 읽기 좋은 깔끔한 텍스트를 생성하는 것을 목표로 합니다.

아래 핸들러는 요청별 작업 생성, 기록 삽입, Crew 단위 스트리밍, 추적 필터링, 출력 포맷팅을 모두 결합한 예시입니다.

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # 모델 텍스트 청크만 전달; 도구 업데이트 및 비텍스트 이벤트는 건너뜀.

다음으로, 네이티브 이벤트 기반 스트리밍 모델에 집중하는 LlamaIndex를 살펴보겠습니다.

LlamaIndex

이 글에서 다룬 다른 프레임워크와 달리, LlamaIndex는 LLM을 외부 데이터 소스(문서 저장소, 인덱스, 검색 파이프라인 등)와 연결하도록 설계되었습니다. 에이전트 계층인 FunctionAgent는 이러한 기반 위에서 구조화된 맥락을 검색하고 추론하는 데 중점을 두며, 열린 대화나 과업 실행이 목적이 아닙니다.

            if not content:

                continue

대화의 연속성을 유지하기 위해, 프록시는 들어오는 메시지를 LlamaIndex 챗 메시지로 변환한 뒤, 최신 사용자 턴(user_msg)과 이전 턴(chat_history)으로 분리합니다. 각 AgentStream 이벤트의 event.delta 필드에는 다음 텍스트 조각이 담겨 있으며, 이는 OpenAI 스타일의 delta.content 조각과 바로 매핑됩니다. 비어 있지 않은 델타는 그대로 전달할 수 있어, 이 가이드에서 가장 간단한 스트리밍 브릿지입니다. 스트림에는 오케스트레이션 이벤트(도구 호출, 결과)와 음성 이벤트(어시스턴트 텍스트 델타)가 모두 포함됩니다. 음성 출력의 깔끔함을 위해, 프록시는 AgentStream 이벤트만 남기고 비어 있는 델타는 건너뜁니다.

대화의 연속성을 유지하기 위해 프록시는 들어오는 메시지를 LlamaIndex 채팅 메시지로 변환한 뒤, 최신 사용자 턴(user_msg)과 이전 턴(chat_history)으로 분리합니다. 각 AgentStream 이벤트의 event.delta 필드에는 다음 텍스트 조각이 담기며, 이는 OpenAI 스타일의 delta.content 청크와 바로 매핑됩니다. 비어 있지 않은 델타는 그대로 전달할 수 있어, 이 가이드에서 가장 간단한 스트리밍 브리지입니다. 스트림에는 오케스트레이션 이벤트(도구 호출, 결과)와 음성 이벤트(어시스턴트 텍스트 델타)가 모두 포함됩니다. 음성 출력이 깔끔하게 유지되도록 프록시는 AgentStream 이벤트만 남기고 비어 있는 델타는 건너뜁니다.

[1] AgentStream (delta='')       ← 무시

이렇게 분리하면 중간 도구 처리 과정이 음성 출력에 포함되지 않으면서도, 저지연의 점진적 음성 출력을 유지할 수 있습니다. 아래 핸들러 예시는 이 과정을 하나로 모아 보여줍니다.

            yield sse_chunk(response_id, {"content": content})

LlamaIndex는 내장 오케스트레이션 계층이 무거운 프레임워크에 비해, 대화 런타임 패턴에 대한 제약이 덜합니다. 실제 서비스 환경에서는 세션 관리, 응답 가이드라인, 도구 오케스트레이션, 추적 등을 고객이 직접 구현해야 하는 경우가 많습니다.

LlamaIndex는 내장 오케스트레이션 레이어가 많은 프레임워크에 비해, 대화형 런타임 패턴에 대한 제약이 적습니다. 실제 운영 환경에서는 세션 관리, 응답 가이드라인, 도구 오케스트레이션, 트레이싱 등을 직접 구현해야 하는 경우가 많습니다.

결론

이 가이드의 각 프레임워크는 동일한 방식으로 ElevenLabs에 연결됩니다. OpenAI 스타일의 Completions 또는 Responses 요청을 받아 SSE 청크로 스트리밍합니다. 이를 통해 팀은 기존 에이전트 구현 위에 음성 오케스트레이션을 최소한의 변경만으로 추가할 수 있어, 이미 구축한 시스템을 그대로 유지하면서 실시간

이미 오픈소스 프레임워크로 에이전트를 운영 중이고 음성 기능을 추가하고 싶다면, 이 방법을 시도해보고 의견을 들려주세요.

다음으로, Google의 Agent Development Kit(ADK) 사용 시의 특징을 살펴봅니다.

Google ADK

Google의 ADK는 런타임 루프를 몇 가지 핵심 프리미티브(Agent, Runner, SessionService)로 추상화합니다. ADK의 Runner는 HTTP 계층과 에이전트 정의 사이에 위치하며, 메시지 라우팅, 도구 오케스트레이션, 세션 라이프사이클, 이벤트 스트리밍을 처리합니다.

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

에이전트, 세션 백엔드, 러너가 초기화되면, 프록시는 각 요청마다 ADK 세션을 찾거나 새로 만듭니다. ADK에서는 session_id가 메모리 지속성을 제어합니다. 같은 session_id를 여러 턴에 재사용하면 대화 기록, 도구 호출, 이전 응답이 자동으로 이어집니다. 대화의 식별자는 ElevenLabs 상위에서 관리되므로, 프록시가 이 매핑을 명확하게 처리합니다. 생성 요청 시 올바른 식별자를 전달하면 SDK가 이전 맥락을 내부적으로 처리할 수 있습니다. 대화 시작 시 추가 파라미터를 요청 본문에 전달해 임의 식별자를 넘깁니다.

메시지와 세션이 준비되면 러너를 호출할 수 있습니다. 도구 호출 및 결과는 실행 중 내부 ADK 이벤트로 나타나지만, 사용자에게 보여지는 출력이 아니라 중간 오케스트레이션 단계로 처리됩니다. 따라서 도구 호출이 사용자에게 노출되는 프레임워크와 달리 별도의 필터가 필요하지 않습니다.

아래 핸들러는 세션 확인 및 생성 로직을 포함한 간단한 구현 예시입니다.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # 실제 서비스에서는 상위 시스템에서 안정적인 식별자를 사용하는 것이 좋습니다.

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # SSE 모드에서는 ADK가 부분(증분) 및 최종(완료) 이벤트를 모두 보냅니다.

            # 부분 이벤트만 전달하면 전체 텍스트 중복을 피할 수 있습니다.
            # 참고: ADK의 SSE 스트리밍은 실험적입니다. 실제 서비스에서는

            # 모델 백엔드가 부분 이벤트를 보내지 않을 경우 두 이벤트 모두 처리 필요.

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

다음은 작업 중심으로 설계된 CrewAI를 살펴봅니다.

CrewAI

CrewAI는 구조화된 작업(리서치, 작성, 요약 등) 중심의 멀티 에이전트 워크플로우 오케스트레이션을 위해 설계되었습니다. 에이전트는 역할, 목표, 배경 스토리로 정의되며, 실행은 명확한 설명과 기대 결과를 가진 Task 객체를 중심으로 이루어집니다.

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

LangGraph, ADK와 같은 에이전트 루프 모델과 달리, CrewAI는 보통 요청마다 Task와 Crew를 생성해 해당 턴의 작업 단위를 정의합니다. 대화 맥락은 이전 턴을 플레이스홀더로 다음 작업에 주입해 이어집니다. {crew_chat_messages} 변수에 대화 기록이 요청마다 채워지고, 실행 시 작업 설명에 삽입됩니다. 또한 중간 추적 패턴(Thought, Action, Action Input, Observation)을 명시적으로 걸러내고, 최종 답변 텍스트만 전달해 음성에 적합한 깔끔한 텍스트를 생성합니다.

아래 핸들러는 요청별 작업 생성, 기록 삽입, Crew 단위 스트리밍, 추적 필터링, 출력 포맷팅을 모두 결합한 예시입니다.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # Task와 Crew는 요청마다 생성(시작 시점에 미리 생성하지 않음).

    task = Task(

        description=(

            "대화 기록:\n{crew_chat_messages}\n\n"

            "사용자의 최신 메시지에 답변하세요."

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=True로 CrewStreamingOutput을 반환(단일 CrewOutput이 아님).

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # 비텍스트 이벤트(예: 도구 호출)는 건너뜀.

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # "Final Answer:" 마커 이후 텍스트만 전달

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # CrewAI 출력의 불필요한 마크다운 기호 정리.

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # "Final Answer:" 마커가 없는 짧은 응답 처리용 Fallback

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

다음은 이벤트 기반 스트리밍 모델에 집중하는 LlamaIndex를 살펴봅니다.

LlamaIndex

이 글에서 다룬 다른 프레임워크와 달리, LlamaIndex는 LLM을 외부 데이터 소스(문서 저장소, 인덱스, 검색 파이프라인 등)와 연결하기 위해 설계되었습니다. 에이전트 계층인 FunctionAgent는 이 기반 위에서 구조화된 맥락을 검색하고 추론하는 역할을 하며, 오픈 대화나 작업 실행보다는 맥락 처리에 집중합니다.

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

대화 연속성을 유지하기 위해, 프록시는 들어오는 메시지를 LlamaIndex 챗 메시지로 변환한 뒤, 최신 사용자 턴(user_msg)과 이전 턴(chat_history)으로 분리합니다. 각 AgentStream 이벤트의 event.delta 필드에는 다음 텍스트 조각이 들어 있으며, 이는 OpenAI 스타일의 delta.content 청크로 바로 매핑됩니다. 비어 있지 않은 델타는 그대로 전달할 수 있어, 이 가이드에서 가장 간단한 스트리밍 브릿지입니다. 스트림에는 오케스트레이션 이벤트(도구 호출, 결과)와 음성 이벤트(어시스턴트 텍스트 델타)가 모두 포함됩니다. 음성 출력을 깔끔하게 유지하기 위해, 프록시는 AgentStream 이벤트만 남기고 비어 있는 델타는 건너뜁니다.

[1] AgentStream (delta='')       ← 무시

[2] ToolCall                     ← 무시

[3] ToolCallResult               ← 무시

[4] AgentStream (delta='It')     ← 전달 ✓

[5] AgentStream (delta=' costs') ← 전달 ✓

[6] AgentStream (delta=' $49.99')← 전달 ✓

이렇게 하면 중간 도구 처리 내용은 음성 출력에서 제외되고, 지연 없는 점진적 음성 생성이 가능합니다. 아래 핸들러는 이 과정을 하나로 모은 예시입니다.

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # 마지막 메시지는 항상 문자열로 된 사용자 턴이라고 가정합니다.

    # 실제 서비스에서는 비텍스트 페이로드에 대한 역할/내용 방어 코드 필요.

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndex는 내장 오케스트레이션 계층이 강한 프레임워크에 비해, 대화 런타임 패턴에 대한 제약이 적습니다. 실제 서비스에서는 세션 관리, 응답 가이드라인, 도구 오케스트레이션, 추적 등을 직접 구현해야 할 수 있습니다.

결론

이 가이드의 모든 프레임워크는 OpenAI 스타일의 Completions 또는 Responses 요청을 받아 SSE 청크로 스트리밍하는 동일한 방식으로 ElevenLabs와 연결됩니다. 이를 통해 기존 에이전트 구현 위에 음성 오케스트레이션을 최소한의 변경만으로 추가할 수 있어, 이미 구축한 시스템을 그대로 유지하면서 실시간 대화형 AI의 이점을 누릴 수 있습니다. 이러한 모듈성은 ElevenAgents 플랫폼의 핵심 원칙입니다. 기존 에이전트를 확장하든, 처음부터 음성 네이티브로 구축하든, ElevenAgent의 음성 오케스트레이션은 다양한 상황에 맞게 설계되어 있습니다.

이미 오픈소스 프레임워크로 에이전트를 운영 중이고 음성 기능을 추가하고 싶다면, 이 방식을 직접 적용해보고 의견을 들려주세요.

ElevenLabs 팀의 다른 글 보기

최고 품질의 AI 오디오로 창작하세요