W poprzednim poradniku omówiliśmy, jak zbudować agenta AI zdolnego do odpowiadania na pytania poprzez przeszukiwanie internetu. Jednak w kontekście bardziej złożonych i długoterminowych zadań, kluczowe stają się dwa aspekty: utrzymywanie stanu (persistence) oraz strumieniowanie danych (streaming). Utrzymywanie stanu pozwala na zapisanie aktualnego stanu agenta, co umożliwia wznowienie pracy w dowolnym momencie w przyszłości. Z kolei strumieniowanie daje możliwość dostarczania w czasie rzeczywistym informacji o działaniach agenta, co zapewnia przejrzystość i kontrolę nad jego działaniami. W tym artykule pokażemy, jak wzbogacić agenta o te funkcje w kontekście bardziej zaawansowanych zastosowań.

Konfiguracja agenta

Na początek odtworzymy naszego agenta, instalując niezbędne biblioteki, konfigurując środowisko oraz definiując strukturę agenta. Agent zostanie wyposażony w narzędzie do przeszukiwania zasobów online, a jego architektura umożliwi dynamiczne przełączanie się między różnymi zadaniami.

Kod instalacji bibliotek:

shell
pip install langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1

Konfiguracja środowiska:

python
import os
os.environ['TAVILY_API_KEY'] = ""
os.environ['GROQ_API_KEY'] = ""

Kluczowym elementem agenta jest jego zdolność do interpretowania wiadomości i podejmowania działań na podstawie dostarczonych danych. Wykorzystując odpowiednie biblioteki, zdefiniowano agenta jako obiekt zdolny do obsługi różnych narzędzi i podejmowania logicznych decyzji w oparciu o zdefiniowane reguły.

Dodanie funkcji utrzymywania stanu

Aby umożliwić zapis i przywracanie stanu agenta, wykorzystamy funkcję checkpointer dostępną w bibliotece LangGraph. Jest to narzędzie pozwalające na zapisywanie stanu agenta pomiędzy poszczególnymi etapami jego działania. Do tego celu zastosujemy prostą bazę danych SQLite:

Kod inicjalizacji SQLite:

python
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

sqlite_conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
memory = SqliteSaver(sqlite_conn)

W nowej implementacji agent zostanie zmodyfikowany tak, aby przyjąć obiekt checkpointera, co umożliwi zapisanie historii działań. Dzięki temu, nawet w przypadku przerwania procesu, możliwe będzie wznowienie pracy agenta w dowolnym momencie.

Przykład inicjalizacji agenta z utrzymywaniem stanu:

python
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.graph = graph.compile(checkpointer=checkpointer)

prompt = "Jesteś inteligentnym asystentem badawczym. Korzystaj z wyszukiwarki, by znajdować informacje."
model = ChatGroq(model="Llama-3.3-70b-Specdec")
bot = Agent(model, [tool], system=prompt, checkpointer=memory)

Dodanie funkcji strumieniowania

Aby poprawić przejrzystość działań agenta, wprowadzimy funkcję strumieniowania, która pozwoli na monitorowanie wyników w czasie rzeczywistym. Strumieniowanie może obejmować dwa typy danych:

1. Strumieniowanie wiadomości – pozwala na obserwowanie decyzji AI oraz wyników działań.
2. Strumieniowanie tokenów – umożliwia wyświetlanie odpowiedzi modelu w postaci przepływu pojedynczych jednostek tekstowych.

Przykład strumieniowania wiadomości:

python
messages = [HumanMessage(content="Jaka jest pogoda w Teksasie?")]
thread = {"configurable": {"thread_id": "1"}}

for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v['messages'])

Wynik działania agenta to seria zdarzeń, które najpierw wskazują decyzję o wykonaniu wyszukiwania, a następnie dostarczają szczegółowe wyniki.

Zarządzanie wieloma wątkami konwersacji

Aby umożliwić agentowi prowadzenie wielu równoczesnych konwersacji, można wykorzystać unikalne identyfikatory wątków (thread_id). Dzięki temu każde zapytanie jest przypisane do odpowiedniego kontekstu, co pozwala na oddzielanie interakcji użytkowników.

Przykład konwersacji z tym samym wątkiem:

python
messages = [HumanMessage(content="A co z pogodą w Los Angeles?")]
thread = {"configurable": {"thread_id": "1"}}

for event in bot.graph.stream({"messages": messages}, thread):
    for v in event.values():
        print(v)

W tym przypadku agent jest w stanie zachować kontekst rozmowy. Jednak zmiana thread_id spowoduje, że agent zapomni poprzednie pytania:

python
messages = [HumanMessage(content="Które miasto jest cieplejsze?")]
thread = {"configurable": {"thread_id": "2"}}

Wynik: „Potrzebuję więcej informacji, proszę doprecyzować pytanie.”

Strumieniowanie tokenów

Aby uzyskać pełny wgląd w proces generowania odpowiedzi, można zastosować asynchroniczne strumieniowanie tokenów:

Przykład kodu:

python
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":memory:") as checkpointer:
    abot = Agent(model, [tool], system=prompt, checkpointer=checkpointer)
    messages = [HumanMessage(content="Jaka jest pogoda w San Francisco?")]
    thread = {"configurable": {"thread_id": "4"}}
    
    async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
        kind = event["event"]
        if kind == "on_chat_model_stream":
            content = event["data"]["chunk"].content
            if content:
                print(content, end="")

Podsumowanie

Wprowadzenie funkcji utrzymywania stanu oraz strumieniowania danych znacząco zwiększa funkcjonalność agenta AI. Dzięki utrzymywaniu stanu agent jest w stanie efektywnie zarządzać długoterminowymi zadaniami i wieloma równoległymi konwersacjami. Strumieniowanie natomiast umożliwia bieżący wgląd w działania agenta, co jest szczególnie przydatne w produkcyjnych aplikacjach AI.

Rozwiązania te są kluczowe dla budowy zaawansowanych systemów, które współpracują z użytkownikami w czasie rzeczywistym. Kolejny krok to implementacja interakcji człowiek-maszyna, które jeszcze bardziej zoptymalizują współpracę pomiędzy użytkownikiem a sztuczną inteligencją.