こんにちは!今回は LangGraph の Send API を活用して、動的にノードを並列実行する方法 を解説します。
一般的に、LangGraphでは あらかじめ定義したノード を並列に実行できます。
しかし、処理の内容やデータ量に応じてノード数を変えたい 場合、静的な並列実行だけでは対応できません。
そこで役立つのが Send API です。Sendを使えば、動的にノードを増減しながら並列処理 を実現できます!
本記事では、
✅ LangGraphの並列処理の基本
✅ Send APIの動作原理
✅ Sendを活用する具体的な方法
✅ Send APIを使用する際の注意点と最適なユースケース
まで 徹底解説 します! 🔥
【本記事のもくじ】
1. LangGraphにおける並列実行の基本
1-1. LangGraphの標準的な並列処理
LangGraphでは、同じステップにあるノード は 並列に実行 されます。
例えば、以下のようなグラフを考えます。
graph_builder.add_node('start_node', start_node)
graph_builder.add_node('node_a', node_a)
graph_builder.add_node('node_b', node_b)
graph_builder.add_node('node_c', node_c)
graph_builder.add_node('end_node', end_node)
graph_builder.set_entry_point('start_node')
graph_builder.add_edge('start_node', 'node_a')
graph_builder.add_edge('start_node', 'node_b')
graph_builder.add_edge('start_node', 'node_c')
graph_builder.add_edge(['node_a', 'node_b', 'node_c'], 'end_node')
graph_builder.set_finish_point('end_node')
この場合、node_a
、node_b
、node_c
が 並列に実行され、結果が end_node
に集約されます。
しかし、実行するノードの数が動的に変化する場合、この方法では対応できません。
そこで登場するのが Send API です!
2. Send APIとは?
2-1. Send APIの概要
Send APIは、LangGraphのノードを動的に追加・実行するための仕組み です。
通常の add_edge
を使った並列実行は、事前に接続するノードを決めておく必要があります。
しかし、以下のようなケースでは、実行ノードを動的に変更する必要があります。
✅ 外部APIのレスポンス数に応じて処理ノードを変える
✅ データの内容や条件によって並列処理の数を調整する
✅ 負荷を見ながら並列処理を柔軟にスケールする
こうした 柔軟な並列処理 を実現するために、Send APIを活用します。
2-2. Send APIの基本構造
Send APIの使い方はシンプルです。
Send('ノード名', {'渡したいデータ': '値'})
例えば、parallel_node
を動的に3つ実行する場合は、次のように書けます。
def routing_parallel_node(state: OverallState):
return [Send('parallel_node', {'node_path': f'parallel_node_{i}'}) for i in range(3)]
3. Send APIを活用した動的並列処理の実装
3-1. 動的に実行するノードを登録する
まず、動的に実行されるノードを add_node で登録します。
from langgraph.graph import StateGraph, Send
from typing import TypedDict
from annotated_types import Annotated
import operator
# グラフ全体のState
class OverallState(TypedDict):
paths: Annotated[list[str], operator.add]
# 並列ノード専用のState
class ParallelState(TypedDict):
node_path: str
paths: Annotated[list[str], operator.add]
graph_builder = StateGraph(OverallState)
# 並列で実行されるノード
def parallel_node(state: ParallelState, config):
print(f'parallel_node: {state}')
return {'paths': [state['node_path']]}
graph_builder.add_node('parallel_node', parallel_node)
3-2. add_conditional_edges
で Send API を使う
次に、動的にノードを実行するために add_conditional_edges を定義します。
def routing_parallel_node(state: OverallState):
return [Send('parallel_node', state | {'node_path': f'parallel_node_{i}'}) for i in range(3)]
# 条件付きエッジを設定
graph_builder.add_conditional_edges('start_node', routing_parallel_node, ['parallel_node'])
これで start_node
の実行後、3つの parallel_node
が動的に並列実行 されます! 🚀
4. Send APIの注意点と最適なユースケース
4-1. Sendで実行されるノードにはStateが自動で渡されない
通常、LangGraphのノードは 前のステップのStateを引き継ぎます。
しかし、Sendで実行するノードには自動的にStateが渡されません!
そのため、Stateを明示的に指定する必要があります。
def routing_parallel_node(state: OverallState):
return [Send('parallel_node', state | {'node_path': f'parallel_node_{i}'}) for i in range(3)]
4-2. Stateの更新にはReducerが必要
LangGraphでは、複数のノードが出力した結果を統合する場合、Reducerを指定する必要があります。
4-2. Stateの更新にはReducerが必要
LangGraphでは、複数のノードが出力した結果を統合する場合、Reducerを指定する必要があります。
この operator.add
により、各ノードの結果が統合されます。
5. まとめ
✅ LangGraphでは通常、事前にノードを定義した並列処理を行う
✅ Send APIを使うと、動的にノードの並列実行が可能になる
✅ Stateの管理やReducerの設定に注意が必要
6. Send APIを今すぐ試してみよう!
LangGraphのSend APIを活用すれば、柔軟な並列処理が可能になります。
APIレスポンスのデータ量に応じたノードの動的生成や、負荷分散の最適化に役立ちます。
実際にコードを動かして、あなたのプロジェクトに適用してみましょう! 🚀