コンテンツにスキップ

実践ガイド:オープンソースエージェントフレームワークとElevenAgents

オープンソースエージェントフレームワークをCustom LLM経由でElevenLabsの音声と接続する方法。

Black square with some squiggly lines.

前回の記事「外部エージェントとElevenLabs音声オーケストレーションの統合」では、既存のテキストベースのエージェントオーケストレーションをElevenLabsに接続する方法を、カスタムLLMを使ってご紹介しました。今回はその基礎をもとに、主要なオープンソースエージェントフレームワークをCustom LLMインターフェースの裏側で適用・展開する方法を解説します。これにより、状態管理やツールのオーケストレーション、アプリ固有の制御を損なうことなく、成熟したエージェントシステムに音声を柔軟に重ねることができます。どのフレームワークでも、生成リクエストの作成→最終テキストレスポンスの抽出→OpenAI互換のServer-Sent Events(SSE)形式への変換、という3ステップの流れは共通です。ElevenLabsは、Chat Completionsレスポンスの両フォーマットに対応しています。本ガイドでは4つの主要フレームワークを扱いますが、このパターンはOpenAI互換のストリーミング出力を生成できるあらゆるランタイムに応用できます。

A proxy layer translates between ElevenLabs voice orchestration and an agent framework, converting OpenAI-style messages into framework inputs and streaming SSE chunks back as agent voice output.

全体のセットアップ

このセクションの例ではPythonとFastAPIを使っていますが、HTTP POSTリクエストとSSEストリーミングレスポンスを扱える技術スタックなら何でも利用可能です。ElevenLabsの音声オーケストレーションが発話の区切りを検知すると、設定されたCustom LLMエンドポイントに生成リクエストを送信します。このセクションでは、その翻訳レイヤー(音声オーケストレーションとエージェントフレームワークをつなぐブリッジやプロキシ)の主要コンポーネントを解説します。

各フレームワークは、ユーザーの慣れや特定の目的に応じて選ばれることが多いです。たとえばLlamaIndexはRAG(検索拡張生成)のセットアップを簡単にするために開発され、CrewAIはエージェント時代のタスク自動化を目的に作られました。設計思想が異なればレスポンス構造も異なり、それぞれに合わせた処理が必要です。LLMが生成するたびにチャンクをストリーミングすることで、テキスト読み上げ(TTS)モデルが早く音声生成を開始でき、体感遅延を減らせます。ここではLangGraph、Google ADK、CrewAI、LlamaIndexの4つの人気フレームワークを中心に解説します。

共通コードについて

各フレームワークはOpenAI互換のSSEチャンクとしてレスポンスをストリーミングする必要があります。ここでは、各例で使う小さなヘルパー関数を紹介します。

def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:

    payload = {

この基礎が整ったところで、まずはLangGraphから始めましょう。

LangGraph

LangGraphは、エージェントをグラフとしてモデル化します。ノードが個々のステップを表し、エッジがその間の制御フローを定義します。最小限のセットアップはシンプルで、チャットモデルの初期化、エージェントツールの定義、エージェントグラフランタイムの作成だけです。

    }

    return f"data: {json.dumps(payload)}\n\n"

各生成リクエストごとに、LangGraphエージェントは会話履歴全体を受け取り、必要な状態を内部で保持できます。LangGraphはサーバーサイドでの永続化を

状態管理ができたら、次はLangGraph特有のストリーミングモードの選択です。LangGraphでは、用途に応じて2つのオプションがあります:

LangGraphは、エージェントをグラフとしてモデル化し、ノードが個々のステップ、エッジがその間の制御フローを表します。最小限のセットアップはシンプルで、チャットモデルの初期化、エージェントツールの定義、エージェントグラフランタイムの作成を行います。

from langchain.agents import create_agent

from langchain_openai import ChatOpenAI

from langchain_core.tools import tool

llm = ChatOpenAI(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY"),

)

agent = create_agent(

    llm,
    tools=tool_list,
    system_prompt=system_prompt,
)

各生成リクエストごとに、LangGraphエージェントは会話履歴全体を受け取り、必要な状態を内部で保持できます。LangGraphはサーバー側の永続化としてチェックポイントもサポートしていますが、ここでは実装をシンプルにするため割愛します。

状態管理ができたら、次はLangGraph特有のストリーミングモードの選択です。LangGraphには用途に応じて2つのオプションがあります:

  • stream_mode="values" はグラフ状態のスナップショットを返します。実装は簡単ですが、各レスポンスに完全なメッセージ状態が含まれるため、リアルタイム会話では遅延が増えます。
  • 追加パラメータ

メッセージとセッションの準備ができたら、Runnerを呼び出せます。ツール呼び出しやその結果は実行中に内部ADKイベントとして現れますが、ユーザー向け出力ではなく中間的なオーケストレーションステップとして扱われます。そのため、ツール呼び出しがユーザー向けテキストとして現れる他のフレームワークと比べ、手動のフィルタは不要です。

以下のハンドラーは、セッション解決と作成ロジックを含むシンプルな実装例です。

[2] ツールが実行されデータを返す(result="$24.99")

[3] モデルが結果を使ってレスポンスを生成(content="It costs $24.99")

次は、よりタスク中心に設計されたCrewAIを見ていきます。

CrewAI

CrewAIは、自由な対話ループではなく、構造化されたタスク(調査、執筆、要約など)を複数エージェントでオーケストレーションするために設計されました。エージェントは役割、目標、バックストーリーで定義されます。実行は、明確な説明と期待される出力を持つTaskオブジェクトを中心に進みます。

    input = {"messages": req.messages}

    async def stream():

LangGraphやADKのようなエージェントループモデルと異なり、CrewAIでは通常、リクエストごとにTaskとCrewを構築し、そのターンの作業単位を定義します。会話の文脈は、前のターンをプレースホルダー経由で次のタスクに挿入することで引き継ぎます。{crew_chat_messages}変数にはリクエストごとに会話履歴が格納され、実行時にタスク説明へ挿入されます。また、中間的なトレースパターン(Thought、Action、Action Input、Observation)を明示的に除外し、最終的な回答テキストのみを出力することで、音声化しやすいクリーンなテキストを目指しています。

以下のハンドラーは、リクエストごとのタスク構築、履歴の挿入、Crew単位のストリーミング、トレースのフィルタリング、出力フォーマットをまとめたものです。

        async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):

            # モデルのテキストチャンクのみ転送。ツール更新や非テキストイベントはスキップ。

次は、イベント駆動型のストリーミングモデルに特化したLlamaIndexを見ていきます。

LlamaIndex

この投稿で紹介した他のフレームワークと異なり、LlamaIndexはLLMを外部データソース(ドキュメントストア、インデックス、検索パイプライン)に接続するために設計されています。エージェント層であるFunctionAgentは、その基盤の上に構築され、自由対話やタスク実行ではなく、構造化されたコンテキストの取得と推論を行います。

            if not content:

                continue

会話の連続性を保つため、プロキシは受信メッセージをLlamaIndexチャットメッセージに変換し、最新のユーザーターン(user_msg)と過去のターン(chat_history)に分割します。各AgentStreamイベントのevent.deltaフィールドには次のテキスト断片が入り、OpenAI形式のdelta.contentチャンクに直接対応します。空でないデルタはそのまま転送できるため、このガイドで最もシンプルなストリーミングブリッジとなります。ストリームにはオーケストレーションイベント(ツール呼び出し、結果)と音声イベント(アシスタントテキストのデルタ)が含まれます。音声出力をクリーンに保つため、プロキシはAgentStreamイベントのみを残し、空のデルタはスキップします。

会話の連続性を保つため、プロキシは受信メッセージをLlamaIndexのチャットメッセージに変換し、最新のユーザー発話(user_msg)とそれ以前の履歴(chat_history)に分割します。各AgentStreamイベントのevent.deltaフィールドには次のテキスト断片が入り、OpenAI形式のdelta.contentチャンクに直接対応します。空でないdeltaはそのまま転送できるため、このガイド内で最もシンプルなストリーミングブリッジです。ストリームにはオーケストレーションイベント(ツール呼び出しや結果)と音声イベント(アシスタントのテキストdelta)が含まれます。音声出力をクリアに保つため、プロキシはAgentStreamイベントのみ保持し、空のdeltaはスキップします。

[1] AgentStream(delta='')       ← 無視

この分離により、中間的なツール処理が音声出力に混ざらず、低遅延の増分音声が維持されます。下記のハンドラーでこれらの手順をまとめています。

            yield sse_chunk(response_id, {"content": content})

LlamaIndexは、他のオーケストレーション層が重いフレームワークほど会話ランタイムのパターンを厳密に定めていません。実運用では、セッション管理、レスポンスのガードレール、ツールのオーケストレーション、トレースの実装がユーザー側で必要になることが多いです。

LlamaIndexは、より重厚なオーケストレーション層を持つフレームワークと比べて、会話ランタイムのパターンについて厳密な指定はありません。実運用では、セッション管理や応答のガードレール、ツールのオーケストレーション、トレーシングなどをユーザー側で実装する必要があります。

まとめ

このガイドで紹介した各フレームワークは、ElevenLabsと同じ契約で接続します。つまり、OpenAI形式のCompletionsまたはResponsesリクエストを受け取り、SSEチャンクをストリームで返します。これにより、既存のエージェント実装に最小限の変更で音声オーケストレーションを追加でき、今まで構築したものを活かしつつリアルタイム会話型AIを実現できます。このモジュール性はElevenAgentsプラットフォームの重要な特徴です。既存エージェントの拡張でも、最初から音声ネイティブで構築する場合でも、ElevenAgentsの音声オーケストレーションは柔軟に対応します。

すでにオープンソースフレームワークでエージェントを運用していて音声対応を検討している場合は、ぜひこの方法をお試しください。ご意見もお待ちしています。

次は、GoogleのAgent Development Kit(ADK)を使う際のポイントを見ていきます。

Google ADK

GoogleのADKは、エージェント、Runner、SessionServiceというコア要素でランタイムループを抽象化します。ADKのRunnerはHTTP層とエージェント定義の間に位置し、メッセージルーティング、ツールオーケストレーション、セッション管理、イベントストリーミングを担当します。

from google.adk.agents import Agent

from google.adk.runners import Runner

from google.adk.agents.run_config import RunConfig, StreamingMode

from google.adk.sessions import InMemorySessionService

from google.genai import types as genai_types

agent = Agent(

    name=name,

    model=model,

    instruction=instruction,

    tools=[tool_list],

)

session_service = InMemorySessionService()

runner = Runner(

agent=agent,

app_name=app_name,

session_service=session_service

)

エージェント、セッションバックエンド、Runnerを初期化したら、プロキシはリクエストごとにADKセッションを解決または作成します。ADKではsession_idがメモリの永続化を制御し、同じsession_idを使い回すことで履歴やツール呼び出し、過去のレスポンスが自動的に引き継がれます。会話の識別はElevenLabs側で管理されているため、プロキシがこのマッピングを明示的に行います。生成リクエストに正しい識別子を渡すことで、SDKが過去のコンテキストを内部で処理できます。会話開始時に追加パラメータとしてリクエストボディに渡します。

メッセージとセッションの準備ができたら、Runnerを呼び出します。ツール呼び出しやその結果は実行中に内部ADKイベントとして現れますが、ユーザー向け出力ではなく中間的なオーケストレーションステップとして扱われます。そのため、ツール呼び出しがユーザー向けテキストとして現れる他のフレームワークと違い、手動でのフィルタリングは不要です。

以下のハンドラーは、セッション解決とget-or-createロジックを含む簡易実装例です。

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest, request: Request):

    # 本番環境では、上流システムから安定した識別子を使うのがおすすめです。

    session_id = req.elevenlabs_extra_body.arbitrary_identifier

    session = await session_service.get_session(

        app_name="elevenlabs", user_id="user", session_id=session_id

    )

    if not session:

        session = await session_service.create_session(

            app_name="elevenlabs", user_id="user", session_id=session_id

        )

    user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")

    content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])

   async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        async for event in runner.run_async(

            user_id="user",

            session_id=session.id,

            new_message=content,

            run_config=RunConfig(streaming_mode=StreamingMode.SSE),

        ):

            if not event.content or not event.content.parts:

                continue

            # SSEモードでは、ADKは部分(インクリメンタル)イベントと最終(完全)イベントを出力します。

            # 部分イベントのみ転送することで、全文の重複送信を防ぎます。
            # 注意:SSEストリーミングはADKで実験的機能です。本番ではモデルバックエンドが部分イベントを出さない場合に備えて両方のイベント型を考慮してください。

            #

            if not getattr(event, "partial", False):

                continue

            text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)

            if not text:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": text})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

次は、よりタスク指向の設計であるCrewAIを見ていきます。

CrewAI

CrewAIは、オープンエンドな対話ループではなく、構造化されたタスク(調査、執筆、要約など)を中心に複数エージェントのワークフローをオーケストレーションするために設計されました。エージェントは役割、目標、バックストーリーで定義され、各タスクオブジェクトには明確な説明と期待される出力が設定されます。

from crewai import Agent, Task, Crew, Process, LLM

from crewai.tools import tool

from crewai.types.streaming import StreamChunkType

llm = LLM(

    model=model_id,

    api_key=os.getenv("OPENAI_API_KEY")

)

store_agent = Agent(

    role=role,

    goal=goal,

    backstory=backstory,

    tools=tools,

    llm=llm,

    verbose=False,

)

LangGraphやADKのようなエージェントループモデルとは異なり、CrewAIでは通常、リクエストごとにTaskとCrewを構築し、そのターンの作業単位を定義します。会話の文脈は、前回までのターンをプレースホルダー経由で次のタスクに注入することで引き継ぎます。{crew_chat_messages}変数に会話履歴を毎回格納し、実行時にタスク説明へ挿入します。また、「Thought」「Action」「Action Input」「Observation」などの中間トレースパターンを明示的に除外し、最終回答テキストのみを音声用に出力するようにしています。

以下のハンドラーは、リクエストごとのタスク構築、履歴挿入、Crewレベルのストリーミング、トレースフィルタリング、出力フォーマットをまとめたものです。

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # TaskとCrewはリクエストごとに組み立てます(起動時ではありません)。

    task = Task(

        description=(

            "会話履歴:\n{crew_chat_messages}\n\n"

            "ユーザーの最新メッセージに返答してください。"

        ),

        expected_output=expected_output,

        agent=store_agent,

    )

    # stream=TrueでCrewStreamingOutputが返ります(単一CrewOutputではありません)。

    crew = Crew(

        agents=[store_agent],

        tasks=[task],

        process=Process.sequential,

        verbose=False,

        stream=True,

    )

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        sent_role = False

        final_marker = "final answer:"

        marker_buffer = ""

        marker_found = False

        emitted_any_content = False

        streaming = await crew.kickoff_async(

            inputs={"crew_chat_messages": json.dumps(req.messages)}

        )

       async for chunk in streaming:

            # テキスト以外のイベント(例:ツール呼び出し)はスキップ。

            if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:

                continue

            # "Final Answer:"マーカー以降のテキストのみ転送

            if not marker_found:

                marker_buffer += chunk.content

                idx = marker_buffer.lower().find("final answer:")

                if idx == -1:

                    continue

                marker_found = True

                content = marker_buffer[idx + 13:].lstrip()

                marker_buffer = ""

            else:

                content = chunk.content

            # CrewAI出力の末尾のマークダウン記号などを除去

            content = content.rstrip("`").rstrip()

            if not content:

                continue

            if not sent_role:

                yield sse_chunk(response_id, {"role": "assistant"})

                sent_role = True

            yield sse_chunk(response_id, {"content": content})

        # "Final Answer:"マーカーがない短いレスポンスへのフォールバック

        if not sent_role:

            raw = getattr(streaming, "result", None)

            fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()

            if fallback:

                yield sse_chunk(response_id, {"role": "assistant"})

                yield sse_chunk(response_id, {"content": fallback})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

次は、独自のイベント駆動型ストリーミングモデルに特化したLlamaIndexを見ていきます。

LlamaIndex

本記事で紹介した他のフレームワークと異なり、LlamaIndexはLLMを外部データソース(ドキュメントストア、インデックス、検索パイプラインなど)と接続するために設計されています。そのエージェント層であるFunctionAgentは、オープンな対話やタスク実行ではなく、構造化されたコンテキストの取得と推論に特化しています。

from llama_index.llms.openai import OpenAI

from llama_index.core.agent.workflow import FunctionAgent, AgentStream

from llama_index.core.base.llms.types import ChatMessage, MessageRole

llm = OpenAI(

    model=model,

    api_key=os.getenv("OPENAI_API_KEY")

)

agent = FunctionAgent(

    tools=[list_inventory, get_item_price],

    llm=llm,

    system_prompt=system_prompt,

)

会話の連続性を保つため、プロキシは受信メッセージをLlamaIndexのチャットメッセージ形式に変換し、最新のユーザーターン(user_msg)と過去の履歴(chat_history)に分割します。各AgentStreamイベントのevent.deltaフィールドには次のテキスト断片が入り、OpenAI形式のdelta.contentチャンクにそのままマッピングできます。空でないdeltaはそのまま転送できるため、本ガイドで最もシンプルなストリーミングブリッジです。ストリームにはオーケストレーションイベント(ツール呼び出しや結果)と音声イベント(アシスタントテキストdelta)が混在しますが、プロキシはAgentStreamイベントのみを残し、空のdeltaはスキップします。

[1] AgentStream(delta='')       ← 無視

[2] ToolCall                     ← 無視

[3] ToolCallResult               ← 無視

[4] AgentStream(delta='It')     ← 転送 ✓

[5] AgentStream(delta=' costs') ← 転送 ✓

[6] AgentStream(delta=' $49.99')← 転送 ✓

この分離により、中間的なツール処理を音声出力から除外しつつ、低遅延のインクリメンタル音声を実現できます。以下のドロップインハンドラーでこれらの手順をまとめています。

@app.post("/chat/completions")

async def chat_completions(req: ChatCompletionRequest):

    # 最後のメッセージが常にユーザーターンかつ文字列である前提です。

    # 本番では非テキストペイロードへの防御的なrole/content処理を追加してください。

    chat_history = [

        ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")

        for m in req.messages

    ]

    user_text = chat_history.pop().content

    async def stream():

        response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"

        handler = agent.run(user_msg=user_text, chat_history=chat_history)

        async for event in handler.stream_events():

            if not isinstance(event, AgentStream):

                continue

            if not event.delta:

                continue

            yield sse_chunk(response_id, {"content": event.delta})

        yield sse_chunk(response_id, {}, finish_reason="stop")

        yield "data: [DONE]\n\n"

    return StreamingResponse(stream(), media_type="text/event-stream")

LlamaIndexは、他のフレームワークのような強力なオーケストレーション層よりも、会話ランタイムパターンに対して柔軟です。そのため、本番運用ではセッション管理やレスポンスのガードレール、ツールオーケストレーション、トレースの実装がユーザー側に求められることが多いです。

まとめ

本ガイドで紹介した各フレームワークは、OpenAI形式のCompletionsまたはResponsesリクエストを受け取り、SSEチャンクをストリーミングで返すという同じ仕組みでElevenLabsと連携します。これにより、既存のエージェント実装に最小限の変更で音声オーケストレーションを重ねることができ、今まで構築したものを活かしつつリアルタイム会話型AIを実現できます。このモジュール性はElevenAgentsプラットフォームの重要な特徴です。既存エージェントの拡張でも、最初から音声ネイティブで構築する場合でも、ElevenAgentの音声オーケストレーションは柔軟に対応します。

すでにオープンソースフレームワークでエージェントを運用していて音声対応を検討中の方は、ぜひこの方法をお試しください。ご意見もお待ちしています。

ElevenLabsチームによる記事をもっと見る

最高品質のAIオーディオで創造する