
Praktyczny przewodnik: open-source agent frameworks i ElevenAgents
Jak połączyć open-source agent frameworks z głosem ElevenLabs przez Custom LLM.
W naszym poprzednim wpisie o Integracja zewnętrznych agentów z ElevenLabs Voice Orchestration pokazaliśmy, jak zespoły mogą połączyć swoje istniejące tekstowe orkiestracje agentów z ElevenLabs przez Własny LLM. Na tej bazie ten przewodnik pokazuje, jak popularne open-source agent frameworks można dostosować i uruchomić za Custom LLM. Dzięki temu powstaje elastyczna architektura, w której głos nakłada się na dojrzałe systemy agentów bez utraty kontroli nad stanem, narzędziami czy logiką aplikacji. Niezależnie od frameworka, zawsze stosujemy ten sam schemat: tworzymy zapytanie, wyciągamy końcową odpowiedź tekstową i formatujemy ją do OpenAI-kompatybilnego formatu Server-Sent Events (SSE). ElevenLabs obsługuje zarówno formaty Chat Completionsi Odpowiedzi. Choć opisujemy cztery popularne frameworki, ten schemat sprawdzi się w każdym środowisku, które potrafi generować OpenAI-kompatybilny streaming.
.webp&w=3840&q=95)
Konfiguracja ogólna
Przykłady w tej sekcji używają Pythona i FastAPI, ale dowolny stack obsługujący HTTP POST i streaming SSE też się sprawdzi. Gdy ElevenLabs wykryje koniec wypowiedzi, wysyła zapytanie do skonfigurowanego endpointu Custom LLM. W tej części omawiamy kluczowe elementy tej warstwy tłumaczącej – mostu, który sprawia, że voice orchestration i agent framework rozumieją się nawzajem.
Każdy framework może być wybierany przez klientów ze względu na znajomość lub konkretne zastosowanie. LlamaIndex powstał, by uprościć Retrieval-Augmented Generation (RAG), a CrewAI do automatyzacji zadań w erze agentów. Różne cele projektowe oznaczają różne struktury odpowiedzi i wymagają indywidualnego podejścia. Streamowanie fragmentów odpowiedzi na bieżąco, zamiast czekać na całość, pozwala modelowi Text-to-Speech (TTS) szybciej zacząć generować mowę i skraca opóźnienie. Skupiamy się na czterech popularnych frameworkach: LangGraph, Google ADK, CrewAI i LlamaIndex.
Wspólna funkcja pomocnicza
Każdy framework musi streamować odpowiedzi jako OpenAI-kompatybilne fragmenty SSE. Poniżej pokazujemy prostą funkcję pomocniczą, która buduje takie fragmenty.
def sse_chunk(response_id: str, delta: dict, finish_reason=None) -> str:
payload = {
"id": response_id,
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
return f"data: {json.dumps(payload)}\n\n"
Mając to gotowe, zaczynamy od LangGraph.
LangGraph
LangGraph modeluje agentów jako grafy, gdzie węzły to poszczególne kroki, a krawędzie określają przepływ między nimi. Minimalna konfiguracja jest prosta: inicjalizujesz model czatu, definiujesz narzędzia i tworzysz runtime grafu agenta.
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
api_key=os.getenv("OPENAI_API_KEY"),
llm, tools=tool_list, system_prompt=system_prompt,)
Przy każdym zapytaniu LangGraph Agent dostaje całą historię rozmowy, więc może samodzielnie zarządzać stanem. LangGraph obsługuje też zapisywanie stanu przez Checkpointy, ale nie omawiamy tego tutaj, żeby uprościć przykład.
Po stronie LangGraph kolejną decyzją jest tryb streamowania – są dwa warianty, każdy do innego zastosowania:
- stream_mode="values" daje snapshoty stanu grafu. Łatwiej to wdrożyć, ale każda odpowiedź zawiera pełny stan, co zwiększa opóźnienie w rozmowie na żywo.
- stream_mode="messages" streamuje kolejne fragmenty wiadomości z modelu. To lepsze do rozmów głosowych w czasie rzeczywistym, bo skraca czas do pierwszego dźwięku w ElevenLabs.
W praktyce, tryb messages w pętli agenta zawiera też kroki pośrednie, np. wywołania narzędzi, których nie powinno się czytać na głos. Proxy filtruje je i przekazuje do TTS tylko tekst dla użytkownika. Poniżej przykład tury z narzędziem.
[1] Model decyduje się wywołać narzędzie (tool_calls=["get_price"])
[2] Narzędzie zwraca dane (result="$24.99")
[3] Model generuje odpowiedź z wynikiem (content="It costs $24.99")
Oczywiście, tylko fragmenty z kroku 3 powinny trafić do streamu SSE. W praktyce dwa warunki w pętli streamingowej to zapewniają: jeden przepuszcza tylko zdarzenia langgraph_node == "model", drugi pomija puste treści. Dzięki temu do ElevenLabs trafia tylko tekst dla użytkownika. Poniżej lekka implementacja proxy.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
input = {"messages": req.messages}
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for message_chunk, metadata in agent.astream(input, stream_mode="messages"):
# Przekazuj tylko tekst modelu; pomiń aktualizacje narzędzi i inne zdarzenia.
if metadata.get("langgraph_node") != "model":
continue
content = getattr(message_chunk, "content", None)
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
# Wysyłaj kolejne fragmenty do ElevenLabs w formacie OpenAI.
yield sse_chunk(response_id, {"content": content})
# Sygnał zakończenia przed finish_reason: "stop" [DONE]
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
To sprawia, że do ElevenLabs trafiają tylko fragmenty tekstu dla użytkownika. Ponieważ LangGraph pokazuje wykonanie narzędzi w streamie stanu, filtrowanie jest jawne i kontrolowane przez proxy.
Teraz przechodzimy do pracy z Google Agent Development Kit (ADK)
Google ADK
Google ADK ukrywa pętlę wykonawczą za kilkoma podstawowymi elementami: Agent, Runner i SessionService. Runner ADK jest pośrednikiem między HTTP a agentem. Obsługuje przekazywanie wiadomości, narzędzia, cykl sesji i streaming zdarzeń.
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.sessions import InMemorySessionService
from google.genai import types as genai_types
agent = Agent(
name=name,
model=model,
instruction=instruction,
tools=[tool_list],
)
session_service = InMemorySessionService()
runner = Runner(
agent=agent,
app_name=app_name,
session_service=session_service
)
Po zainicjowaniu agenta, backendu sesji i runnera, proxy rozpoznaje lub tworzy sesję ADK dla każdego zapytania. W ADK session_id kontroluje zapamiętywanie historii: użycie tego samego session_id w kolejnych turach automatycznie przenosi historię, wywołania narzędzi i odpowiedzi. Ponieważ tożsamość rozmowy jest po stronie ElevenLabs, proxy jawnie mapuje identyfikatory. Przekazując właściwy identyfikator w zapytaniu, SDK obsługuje kontekst wewnętrznie. Identyfikator przekazujemy przy rozpoczęciu rozmowy przez dodatkowe parametry przekazywane w ciele zapytania.
Gdy wiadomość i sesja są gotowe, można wywołać runnera. Wywołania narzędzi i ich wyniki pojawiają się jako wewnętrzne zdarzenia ADK, traktowane jako kroki pośrednie, a nie tekst dla użytkownika. Dzięki temu nie trzeba ręcznie filtrować, jak w frameworkach, gdzie wywołania narzędzi są widoczne w odpowiedzi.
Poniżej uproszczony handler z obsługą sesji i logiką get-or-create.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest, request: Request):
# W produkcji lepiej użyć stabilnego identyfikatora z systemu nadrzędnego.
session_id = req.elevenlabs_extra_body.arbitrary_identifier
session = await session_service.get_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
if not session:
session = await session_service.create_session(
app_name="elevenlabs", user_id="user", session_id=session_id
)
user_text = next((m["content"] for m in reversed(req.messages) if m["role"] == "user"), "")
content = genai_types.Content(role="user", parts=[genai_types.Part(text=user_text)])
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
async for event in runner.run_async(
user_id="user",
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE),
):
if not event.content or not event.content.parts:
continue
# W trybie SSE ADK emituje fragmenty (partial) i całość (final).
# Przekazujemy tylko partial, żeby nie powielać tekstu. # Uwaga: streaming SSE w ADK jest eksperymentalny. W produkcji obsłuż oba typy zdarzeń, jeśli backend nie emituje partial.
#
if not getattr(event, "partial", False):
continue
text = "".join((getattr(p, "text", "") or "") for p in event.content.parts)
if not text:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": text})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
Teraz CrewAI, które z założenia skupia się na zadaniach.
CrewAI
CrewAI powstało do orkiestracji pracy wielu agentów wokół konkretnych zadań (research, pisanie, podsumowanie), a nie do otwartych rozmów. Agenci mają rolę, cel i tło. Wykonanie opiera się na obiektach Task z opisem i oczekiwanym wynikiem.
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import tool
from crewai.types.streaming import StreamChunkType
llm = LLM(
model=model_id,
api_key=os.getenv("OPENAI_API_KEY")
)
store_agent = Agent(
role=role,
goal=goal,
backstory=backstory,
tools=tools,
llm=llm,
verbose=False,
)
W przeciwieństwie do pętli agenta w LangGraph i ADK, CrewAI zwykle tworzy Task i Crew na każde zapytanie, by określić jednostkę pracy na daną turę. Kontekst rozmowy przekazujemy, wstawiając poprzednie tury do kolejnego zadania przez placeholder. Zmienna {crew_chat_messages} jest uzupełniana przy każdym zapytaniu historią rozmowy i interpolowana do opisu zadania. Dodatkowo filtrujemy pośrednie wzorce (Thought, Action, Action Input, Observation), by przekazywać tylko końcową odpowiedź gotową do mowy.
Poniższy handler łączy budowanie zadania na żądanie, wstawianie historii, streaming na poziomie Crew, filtrowanie śladów i formatowanie wyjścia.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Task i Crew są tworzone na każde żądanie (nie przy starcie).
task = Task(
description=(
"Historia rozmowy:\n{crew_chat_messages}\n\n"
"Odpowiedz na ostatnią wiadomość użytkownika."
),
expected_output=expected_output,
agent=store_agent,
)
# stream=True zwraca CrewStreamingOutput zamiast pojedynczego CrewOutput.
crew = Crew(
agents=[store_agent],
tasks=[task],
process=Process.sequential,
verbose=False,
stream=True,
)
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
sent_role = False
final_marker = "final answer:"
marker_buffer = ""
marker_found = False
emitted_any_content = False
streaming = await crew.kickoff_async(
inputs={"crew_chat_messages": json.dumps(req.messages)}
)
async for chunk in streaming:
# Pomijaj zdarzenia inne niż tekst (np. wywołania narzędzi).
if chunk.chunk_type != StreamChunkType.TEXT or not chunk.content:
continue
# Przekazuj tekst dopiero po markerze "Final Answer:"
if not marker_found:
marker_buffer += chunk.content
idx = marker_buffer.lower().find("final answer:")
if idx == -1:
continue
marker_found = True
content = marker_buffer[idx + 13:].lstrip()
marker_buffer = ""
else:
content = chunk.content
# Usuwamy końcowe znaki markdown z odpowiedzi CrewAI.
content = content.rstrip("`").rstrip()
if not content:
continue
if not sent_role:
yield sse_chunk(response_id, {"role": "assistant"})
sent_role = True
yield sse_chunk(response_id, {"content": content})
# Fallback na krótkie odpowiedzi bez markera "Final Answer:"
if not sent_role:
raw = getattr(streaming, "result", None)
fallback = (raw.raw if raw else marker_buffer).strip().rstrip("`").rstrip()
if fallback:
yield sse_chunk(response_id, {"role": "assistant"})
yield sse_chunk(response_id, {"content": fallback})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
Na koniec LlamaIndex, które stawia na natywny model streamowania zdarzeń.
LlamaIndex
W przeciwieństwie do innych frameworków, LlamaIndex powstał, by łączyć LLM z zewnętrznymi źródłami danych (bazy dokumentów, indeksy, pipeline'y wyszukiwania). Warstwa agenta, FunctionAgent, pozwala korzystać z kontekstu strukturalnego, a nie tylko z dialogu czy zadań.
from llama_index.llms.openai import OpenAI
from llama_index.core.agent.workflow import FunctionAgent, AgentStream
from llama_index.core.base.llms.types import ChatMessage, MessageRole
llm = OpenAI(
model=model,
api_key=os.getenv("OPENAI_API_KEY")
)
agent = FunctionAgent(
tools=[list_inventory, get_item_price],
llm=llm,
system_prompt=system_prompt,
)
Aby zachować ciągłość rozmowy, proxy zamienia przychodzące wiadomości na chat messages LlamaIndex, a potem dzieli je na ostatnią turę użytkownika (user_msg) i wcześniejsze (chat_history). Każde event.delta z AgentStream to kolejny fragment tekstu, który trafia bezpośrednio do chunku OpenAI delta.content. Niepuste delty przekazujemy bez zmian, co czyni ten most streamingowy najprostszym w tym przewodniku. Stream zawiera zarówno zdarzenia orkiestracji (wywołania narzędzi, wyniki), jak i mowę (fragmenty tekstu asystenta). Żeby głos był czysty, proxy przepuszcza tylko eventy AgentStream i pomija puste delty.
[1] AgentStream (delta='') ← pomijane
[2] ToolCall ← pomijane
[3] ToolCallResult ← pomijane
[4] AgentStream (delta='It') ← przekazane ✓
[5] AgentStream (delta=' costs') ← przekazane ✓
[6] AgentStream (delta=' $49.99')← przekazane ✓
To rozdzielenie usuwa pośrednie mechanizmy narzędzi z mowy, a jednocześnie pozwala na szybkie generowanie dźwięku. Poniżej gotowy handler łączący te kroki.
@app.post("/chat/completions")
async def chat_completions(req: ChatCompletionRequest):
# Zakładamy, że ostatnia wiadomość to zawsze tura użytkownika z tekstem.
# W produkcji dodaj obsługę wyjątków dla innych typów payload.
chat_history = [
ChatMessage(role=MessageRole(m["role"]), content=m.get("content") or "")
for m in req.messages
]
user_text = chat_history.pop().content
async def stream():
response_id = f"chatcmpl-{uuid.uuid4().hex[:12]}"
handler = agent.run(user_msg=user_text, chat_history=chat_history)
async for event in handler.stream_events():
if not isinstance(event, AgentStream):
continue
if not event.delta:
continue
yield sse_chunk(response_id, {"content": event.delta})
yield sse_chunk(response_id, {}, finish_reason="stop")
yield "data: [DONE]\n\n"
return StreamingResponse(stream(), media_type="text/event-stream")
LlamaIndex mniej narzuca schemat rozmowy end-to-end niż frameworki z rozbudowaną orkiestracją. W produkcji zwykle trzeba samodzielnie zaimplementować obsługę sesji, zabezpieczenia odpowiedzi, narzędzia i śledzenie.
Podsumowanie
Każdy framework w tym przewodniku łączy się z ElevenLabs przez ten sam kontrakt: przyjmuje zapytanie Completions lub Responses w stylu OpenAI i streamuje fragmenty SSE. Dzięki temu można dodać voice orchestration do istniejących agentów bez dużych zmian – zachowując to, co już działa, a jednocześnie zyskując rozmowy AI w czasie rzeczywistym. Ta modułowość to podstawa ElevenAgents. Niezależnie czy rozwijasz istniejącego agenta, czy budujesz voice-native od zera, voice orchestration ElevenAgents dopasuje się do twoich potrzeb.
Jeśli masz już agenta na open-source frameworku i chcesz dodać głos, spróbuj tego podejścia i daj nam znać, jak poszło.



