W poprzednim poradniku omówiliśmy, jak zbudować agenta AI zdolnego do odpowiadania na pytania poprzez przeszukiwanie internetu. Jednak w kontekście bardziej złożonych i długoterminowych zadań, kluczowe stają się dwa aspekty: utrzymywanie stanu (persistence) oraz strumieniowanie danych (streaming). Utrzymywanie stanu pozwala na zapisanie aktualnego stanu agenta, co umożliwia wznowienie pracy w dowolnym momencie w przyszłości. Z kolei strumieniowanie daje możliwość dostarczania w czasie rzeczywistym informacji o działaniach agenta, co zapewnia przejrzystość i kontrolę nad jego działaniami. W tym artykule pokażemy, jak wzbogacić agenta o te funkcje w kontekście bardziej zaawansowanych zastosowań.
—
Konfiguracja agenta
Na początek odtworzymy naszego agenta, instalując niezbędne biblioteki, konfigurując środowisko oraz definiując strukturę agenta. Agent zostanie wyposażony w narzędzie do przeszukiwania zasobów online, a jego architektura umożliwi dynamiczne przełączanie się między różnymi zadaniami.
Kod instalacji bibliotek:
shell
pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
Konfiguracja środowiska:
python
import os
os.environ['TAVILY_API_KEY'] = ""
os.environ['GROQ_API_KEY'] = ""
Kluczowym elementem agenta jest jego zdolność do interpretowania wiadomości i podejmowania działań na podstawie dostarczonych danych. Wykorzystując odpowiednie biblioteki, zdefiniowano agenta jako obiekt zdolny do obsługi różnych narzędzi i podejmowania logicznych decyzji w oparciu o zdefiniowane reguły.
—
Dodanie funkcji utrzymywania stanu
Aby umożliwić zapis i przywracanie stanu agenta, wykorzystamy funkcję checkpointer dostępną w bibliotece LangGraph. Jest to narzędzie pozwalające na zapisywanie stanu agenta pomiędzy poszczególnymi etapami jego działania. Do tego celu zastosujemy prostą bazę danych SQLite:
Kod inicjalizacji SQLite:
python
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(sqlite_conn)
W nowej implementacji agent zostanie zmodyfikowany tak, aby przyjąć obiekt checkpointera, co umożliwi zapisanie historii działań. Dzięki temu, nawet w przypadku przerwania procesu, możliwe będzie wznowienie pracy agenta w dowolnym momencie.
Przykład inicjalizacji agenta z utrzymywaniem stanu:
python
class Agent:
def __init__(self, model, tools, checkpointer, system=""):
self.graph = graph.compile(checkpointer=checkpointer)
prompt = "Jesteś inteligentnym asystentem badawczym. Korzystaj z wyszukiwarki, by znajdować informacje."
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)
—
Dodanie funkcji strumieniowania
Aby poprawić przejrzystość działań agenta, wprowadzimy funkcję strumieniowania, która pozwoli na monitorowanie wyników w czasie rzeczywistym. Strumieniowanie może obejmować dwa typy danych:
1. Strumieniowanie wiadomości – pozwala na obserwowanie decyzji AI oraz wyników działań.
2. Strumieniowanie tokenów – umożliwia wyświetlanie odpowiedzi modelu w postaci przepływu pojedynczych jednostek tekstowych.
Przykład strumieniowania wiadomości:
python
messages = [HumanMessage(content="Jaka jest pogoda w Teksasie?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v['messages'])
Wynik działania agenta to seria zdarzeń, które najpierw wskazują decyzję o wykonaniu wyszukiwania, a następnie dostarczają szczegółowe wyniki.
—
Zarządzanie wieloma wątkami konwersacji
Aby umożliwić agentowi prowadzenie wielu równoczesnych konwersacji, można wykorzystać unikalne identyfikatory wątków (thread_id). Dzięki temu każde zapytanie jest przypisane do odpowiedniego kontekstu, co pozwala na oddzielanie interakcji użytkowników.
Przykład konwersacji z tym samym wątkiem:
python
messages = [HumanMessage(content="A co z pogodą w Los Angeles?")]
thread = {"configurable": {"thread_id": "1"}}
for event in bot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)
W tym przypadku agent jest w stanie zachować kontekst rozmowy. Jednak zmiana thread_id
spowoduje, że agent zapomni poprzednie pytania:
python
messages = [HumanMessage(content="Które miasto jest cieplejsze?")]
thread = {"configurable": {"thread_id": "2"}}
Wynik: „Potrzebuję więcej informacji, proszę doprecyzować pytanie.”
—
Strumieniowanie tokenów
Aby uzyskać pełny wgląd w proces generowania odpowiedzi, można zastosować asynchroniczne strumieniowanie tokenów:
Przykład kodu:
python
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
messages = [HumanMessage(content="Jaka jest pogoda w San Francisco?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="")
—
Podsumowanie
Wprowadzenie funkcji utrzymywania stanu oraz strumieniowania danych znacząco zwiększa funkcjonalność agenta AI. Dzięki utrzymywaniu stanu agent jest w stanie efektywnie zarządzać długoterminowymi zadaniami i wieloma równoległymi konwersacjami. Strumieniowanie natomiast umożliwia bieżący wgląd w działania agenta, co jest szczególnie przydatne w produkcyjnych aplikacjach AI.
Rozwiązania te są kluczowe dla budowy zaawansowanych systemów, które współpracują z użytkownikami w czasie rzeczywistym. Kolejny krok to implementacja interakcji człowiek-maszyna, które jeszcze bardziej zoptymalizują współpracę pomiędzy użytkownikiem a sztuczną inteligencją.