4.3 オーケストレーション

実行が始まる場所

メインのオーケストレーションは plan_travel_internal() で行われます

def plan_travel_internal(
    origin: str,
    destination: str,
    user_request: str,
    travellers: int,
    ) -> Dict[str, object]:
    session_id = str(uuid4())
    departure, return_date = _compute_dates()

    initial_state: PlannerState = {
        "messages": [HumanMessage(content=user_request)],
        "user_request": user_request,
        "session_id": session_id,
        "origin": origin,
        "destination": destination,
        "departure": departure,
        "return_date": return_date,
        "travellers": travellers,
        "flight_summary": None,
        "hotel_summary": None,
        "activities_summary": None,
        "final_itinerary": None,
        "current_agent": "start",
    }

    workflow = build_workflow()
    compiled_app = workflow.compile()

    for step in compiled_app.stream(initial_state, config):
        node_name, node_state = next(iter(step.items()))
        final_state = node_state

この関数は以下のアプリケーションライフサイクルを実装しています

  • 初期ステートを構築する
  • グラフを構築する
  • コンパイルする
  • ステップごとにストリーム実行する

知識チェック

質問 1

なぜコードは単にグラフを一度呼び出して最終結果を取得するのではなく、compiled_app.stream(initial_state, config) を使用しているのですか?

ここをクリックして回答を確認

ストリーミングはワークフローを各ノードの実行ごとにステップバイステップで実行するためです。これにより、アプリケーションは中間ステートを観察し、どのノードが実行中かを追跡し、最終出力を待つだけでなくリアルタイムでワークフローを監視できます。

質問 2

なぜグラフを実行する前に initial_state を作成するのですか?

ここをクリックして回答を確認

LangGraph のワークフローは共有ステートオブジェクト上で動作するためです。initial_state は、ワークフローの進行に伴ってノードが読み取り、更新し、受け渡す開始データを提供します。