Files
langgraph-architecture/CHANNELS.md
T

284 lines
5.7 KiB
Markdown
Raw Normal View History

2026-02-23 13:11:51 -06:00
# LangGraph Channels
**Version:** 1.0.0
**Last Updated:** 2026-02-23
---
## Overview
Channels are LangGraph's mechanism for inter-node communication and state storage. Each channel is a typed container with specific semantics for how values are written and read.
---
## Channel Types
### From `channels/__init__.py`
```python
__all__ = [
"BaseChannel",
"LastValue",
"AnyValue",
"Topic",
"NamedBarrier",
"BinOp",
"EphemeralValue",
"UntrackedValue",
]
```
---
## Channel Implementation Details
### 1. LastValue Channel
**File:** `channels/last_value.py`
**Behavior:** Most recent write wins. Reading returns the last value written.
```python
class LastValue(Generic[Value]):
"""Channel that keeps the last value written."""
def __init__(self, typ: type[Value]):
self.typ = typ
self.value = None
def get(self) -> Value:
if self.value is None:
raise EmptyChannelError()
return self.value
def update(self, values: Sequence[Value]) -> bool:
if values:
self.value = values[-1] # Last wins
return True
return False
```
**Use Case:** Single-value state fields, like `counter`, `status`, `current_step`.
---
### 2. AnyValue Channel
**File:** `channels/any_value.py`
**Behavior:** First non-empty value wins. Reading returns the first value that was written and is still available.
```python
class AnyValue(Generic[Value]):
"""Channel that returns the first available value."""
def get(self) -> Value:
if self.value is None:
raise EmptyChannelError()
return self.value
def update(self, values: Sequence[Value]) -> bool:
if values and self.value is None:
self.value = values[0] # First wins
return True
return False
```
**Use Case:** Optional fields, fallback values.
---
### 3. Topic Channel
**File:** `channels/topic.py`
**Behavior:** Pub/sub. Nodes can publish to topics, subscribers receive all messages.
```python
class Topic(Generic[Value]):
"""Pub/sub channel for broadcasting."""
def __init__(self, typ: type[Value], selector: Callable = None):
self.typ = typ
self.selector = selector or (lambda x: x)
self.subscriptions: dict[str, set] = defaultdict(set)
def get(self) -> list[Value]:
# Return all values for subscribed topic
...
def update(self, values: Sequence[Value]) -> bool:
# Add values to topic
...
```
**Use Case:** Broadcasting to multiple nodes, event systems.
---
### 4. NamedBarrier Channel
**File:** `channels/named_barrier_value.py`
**Behavior:** Blocks until all named tasks complete. Used for synchronization.
```python
class NamedBarrier:
"""Synchronization point - blocks until all expected tasks arrive."""
def get(self) -> None:
# Block until all tasks arrive
...
def update(self, values: Sequence[str]) -> bool:
# Register task completion
...
```
**Use Case:** Wait for parallel branches to complete.
---
### 5. BinOp Channel
**File:** `channels/binop.py`
**Behavior:** Applies binary operation to combine values.
```python
class BinOp(Generic[Value]):
"""Binary operation channel."""
def __init__(self, typ: type[Value], op: Callable[[Value, Value], Value]):
self.op = op
self.value = None
def get(self) -> Value:
return self.value
def update(self, values: Sequence[Value]) -> bool:
for v in values:
if self.value is None:
self.value = v
else:
self.value = self.op(self.value, v)
return True
```
**Use Case:** Aggregations (sum, max, min, union).
---
### 6. EphemeralValue Channel
**File:** `channels/ephemeral_value.py`
**Behavior:** One-time use. Value is consumed after reading.
```python
class EphemeralValue:
"""One-time use value - consumed after read."""
def get(self) -> Value:
value = self.value
self.value = None # Consume
return value
```
**Use Case:** One-time signals, commands.
---
### 7. UntrackedValue Channel
**File:** `channels/untracked_value.py`
**Behavior:** Value is not checkpointed. Used for transient data.
```python
class UntrackedValue:
"""Value that doesn't participate in checkpointing."""
pass
```
**Use Case:** Temporary data, debugging info.
---
## Channel Configuration
### Declaring Channels
```python
from langgraph.graph import StateGraph
from typing import TypedDict
class GraphState(TypedDict):
messages: list
counter: int
graph = StateGraph(GraphState)
# Default channels:
# - list fields -> LastValue[list]
# - other fields -> LastValue[type]
```
### Custom Channels
```python
from langgraph.channels import BaseChannel
class Accumulate(BaseChannel):
def __init__(self, typ: type):
self.typ = typ
self.values = []
def get(self) -> list:
return self.values
def update(self, values) -> bool:
self.values.extend(values)
return True
```
---
## Channel vs State
| Concept | Description |
|---------|-------------|
| **State** | TypedDict defining all fields |
| **Channel** | Storage mechanism per field |
| **Reducer** | How updates are merged |
---
## Checkpointing Channels
### What Gets Persisted
- All channel values are checkpointed
- Except `UntrackedValue` channels
- Checkpoint includes `channel_values` and `channel_versions`
### Checkpoint Format
```python
checkpoint = {
"channel_values": {
"messages": [...],
"counter": 5,
},
"channel_versions": {
"messages": 3,
"counter": 5,
},
"metadata": {...}
}
```
---
*Generated from source code analysis*