LangGraphのSend APIを活用した動的並列処理 🚀

LLM

こんにちは!今回は LangGraphSend API を活用して、動的にノードを並列実行する方法 を解説します。

一般的に、LangGraphでは あらかじめ定義したノード を並列に実行できます。
しかし、処理の内容やデータ量に応じてノード数を変えたい 場合、静的な並列実行だけでは対応できません。

そこで役立つのが Send API です。Sendを使えば、動的にノードを増減しながら並列処理 を実現できます!

本記事では、
LangGraphの並列処理の基本
Send APIの動作原理
Sendを活用する具体的な方法
Send APIを使用する際の注意点と最適なユースケース

まで 徹底解説 します! 🔥

【本記事のもくじ】


1. LangGraphにおける並列実行の基本

1-1. LangGraphの標準的な並列処理

LangGraphでは、同じステップにあるノード並列に実行 されます。

例えば、以下のようなグラフを考えます。

graph_builder.add_node('start_node', start_node)
graph_builder.add_node('node_a', node_a)
graph_builder.add_node('node_b', node_b)
graph_builder.add_node('node_c', node_c)
graph_builder.add_node('end_node', end_node)

graph_builder.set_entry_point('start_node')
graph_builder.add_edge('start_node', 'node_a')
graph_builder.add_edge('start_node', 'node_b')
graph_builder.add_edge('start_node', 'node_c')
graph_builder.add_edge(['node_a', 'node_b', 'node_c'], 'end_node')
graph_builder.set_finish_point('end_node')

この場合、node_anode_bnode_c並列に実行され、結果が end_node に集約されます。

しかし、実行するノードの数が動的に変化する場合、この方法では対応できません。
そこで登場するのが Send API です!


2. Send APIとは?

2-1. Send APIの概要

Send APIは、LangGraphのノードを動的に追加・実行するための仕組み です。

通常の add_edge を使った並列実行は、事前に接続するノードを決めておく必要があります
しかし、以下のようなケースでは、実行ノードを動的に変更する必要があります

外部APIのレスポンス数に応じて処理ノードを変える
データの内容や条件によって並列処理の数を調整する
負荷を見ながら並列処理を柔軟にスケールする

こうした 柔軟な並列処理 を実現するために、Send APIを活用します。

2-2. Send APIの基本構造

Send APIの使い方はシンプルです。

Send('ノード名', {'渡したいデータ': '値'})

例えば、parallel_node を動的に3つ実行する場合は、次のように書けます。

def routing_parallel_node(state: OverallState):
    return [Send('parallel_node', {'node_path': f'parallel_node_{i}'}) for i in range(3)]

3. Send APIを活用した動的並列処理の実装

3-1. 動的に実行するノードを登録する

まず、動的に実行されるノードを add_node で登録します。

from langgraph.graph import StateGraph, Send
from typing import TypedDict
from annotated_types import Annotated
import operator

# グラフ全体のState
class OverallState(TypedDict):
    paths: Annotated[list[str], operator.add]

# 並列ノード専用のState
class ParallelState(TypedDict):
    node_path: str
    paths: Annotated[list[str], operator.add]

graph_builder = StateGraph(OverallState)

# 並列で実行されるノード
def parallel_node(state: ParallelState, config):
    print(f'parallel_node: {state}')
    return {'paths': [state['node_path']]}

graph_builder.add_node('parallel_node', parallel_node)

3-2. add_conditional_edges で Send API を使う

次に、動的にノードを実行するために add_conditional_edges を定義します。

def routing_parallel_node(state: OverallState):
    return [Send('parallel_node', state | {'node_path': f'parallel_node_{i}'}) for i in range(3)]

# 条件付きエッジを設定
graph_builder.add_conditional_edges('start_node', routing_parallel_node, ['parallel_node'])

これで start_node の実行後、3つの parallel_node が動的に並列実行 されます! 🚀


4. Send APIの注意点と最適なユースケース

4-1. Sendで実行されるノードにはStateが自動で渡されない

通常、LangGraphのノードは 前のステップのStateを引き継ぎます
しかし、Sendで実行するノードには自動的にStateが渡されません

そのため、Stateを明示的に指定する必要があります。

def routing_parallel_node(state: OverallState):
    return [Send('parallel_node', state | {'node_path': f'parallel_node_{i}'}) for i in range(3)]

4-2. Stateの更新にはReducerが必要

LangGraphでは、複数のノードが出力した結果を統合する場合、Reducerを指定する必要があります

4-2. Stateの更新にはReducerが必要

LangGraphでは、複数のノードが出力した結果を統合する場合、Reducerを指定する必要があります。

この operator.add により、各ノードの結果が統合されます。


5. まとめ

LangGraphでは通常、事前にノードを定義した並列処理を行う
Send APIを使うと、動的にノードの並列実行が可能になる
Stateの管理やReducerの設定に注意が必要


6. Send APIを今すぐ試してみよう!

LangGraphのSend APIを活用すれば、柔軟な並列処理が可能になります
APIレスポンスのデータ量に応じたノードの動的生成や、負荷分散の最適化に役立ちます。

実際にコードを動かして、あなたのプロジェクトに適用してみましょう! 🚀

最新情報をチェックしよう!