# LangGraph Channels **Version:** 1.0.0 **Last Updated:** 2026-02-23 --- ## Overview Channels are LangGraph's mechanism for inter-node communication and state storage. Each channel is a typed container with specific semantics for how values are written and read. --- ## Channel Types ### From `channels/__init__.py` ```python __all__ = [ "BaseChannel", "LastValue", "AnyValue", "Topic", "NamedBarrier", "BinOp", "EphemeralValue", "UntrackedValue", ] ``` --- ## Channel Implementation Details ### 1. LastValue Channel **File:** `channels/last_value.py` **Behavior:** Most recent write wins. Reading returns the last value written. ```python class LastValue(Generic[Value]): """Channel that keeps the last value written.""" def __init__(self, typ: type[Value]): self.typ = typ self.value = None def get(self) -> Value: if self.value is None: raise EmptyChannelError() return self.value def update(self, values: Sequence[Value]) -> bool: if values: self.value = values[-1] # Last wins return True return False ``` **Use Case:** Single-value state fields, like `counter`, `status`, `current_step`. --- ### 2. AnyValue Channel **File:** `channels/any_value.py` **Behavior:** First non-empty value wins. Reading returns the first value that was written and is still available. ```python class AnyValue(Generic[Value]): """Channel that returns the first available value.""" def get(self) -> Value: if self.value is None: raise EmptyChannelError() return self.value def update(self, values: Sequence[Value]) -> bool: if values and self.value is None: self.value = values[0] # First wins return True return False ``` **Use Case:** Optional fields, fallback values. --- ### 3. Topic Channel **File:** `channels/topic.py` **Behavior:** Pub/sub. Nodes can publish to topics, subscribers receive all messages. ```python class Topic(Generic[Value]): """Pub/sub channel for broadcasting.""" def __init__(self, typ: type[Value], selector: Callable = None): self.typ = typ self.selector = selector or (lambda x: x) self.subscriptions: dict[str, set] = defaultdict(set) def get(self) -> list[Value]: # Return all values for subscribed topic ... def update(self, values: Sequence[Value]) -> bool: # Add values to topic ... ``` **Use Case:** Broadcasting to multiple nodes, event systems. --- ### 4. NamedBarrier Channel **File:** `channels/named_barrier_value.py` **Behavior:** Blocks until all named tasks complete. Used for synchronization. ```python class NamedBarrier: """Synchronization point - blocks until all expected tasks arrive.""" def get(self) -> None: # Block until all tasks arrive ... def update(self, values: Sequence[str]) -> bool: # Register task completion ... ``` **Use Case:** Wait for parallel branches to complete. --- ### 5. BinOp Channel **File:** `channels/binop.py` **Behavior:** Applies binary operation to combine values. ```python class BinOp(Generic[Value]): """Binary operation channel.""" def __init__(self, typ: type[Value], op: Callable[[Value, Value], Value]): self.op = op self.value = None def get(self) -> Value: return self.value def update(self, values: Sequence[Value]) -> bool: for v in values: if self.value is None: self.value = v else: self.value = self.op(self.value, v) return True ``` **Use Case:** Aggregations (sum, max, min, union). --- ### 6. EphemeralValue Channel **File:** `channels/ephemeral_value.py` **Behavior:** One-time use. Value is consumed after reading. ```python class EphemeralValue: """One-time use value - consumed after read.""" def get(self) -> Value: value = self.value self.value = None # Consume return value ``` **Use Case:** One-time signals, commands. --- ### 7. UntrackedValue Channel **File:** `channels/untracked_value.py` **Behavior:** Value is not checkpointed. Used for transient data. ```python class UntrackedValue: """Value that doesn't participate in checkpointing.""" pass ``` **Use Case:** Temporary data, debugging info. --- ## Channel Configuration ### Declaring Channels ```python from langgraph.graph import StateGraph from typing import TypedDict class GraphState(TypedDict): messages: list counter: int graph = StateGraph(GraphState) # Default channels: # - list fields -> LastValue[list] # - other fields -> LastValue[type] ``` ### Custom Channels ```python from langgraph.channels import BaseChannel class Accumulate(BaseChannel): def __init__(self, typ: type): self.typ = typ self.values = [] def get(self) -> list: return self.values def update(self, values) -> bool: self.values.extend(values) return True ``` --- ## Channel vs State | Concept | Description | |---------|-------------| | **State** | TypedDict defining all fields | | **Channel** | Storage mechanism per field | | **Reducer** | How updates are merged | --- ## Checkpointing Channels ### What Gets Persisted - All channel values are checkpointed - Except `UntrackedValue` channels - Checkpoint includes `channel_values` and `channel_versions` ### Checkpoint Format ```python checkpoint = { "channel_values": { "messages": [...], "counter": 5, }, "channel_versions": { "messages": 3, "counter": 5, }, "metadata": {...} } ``` --- *Generated from source code analysis*