Advanced Data Transformation in LangChain: Mastering RunnableAssign for Dynamic Pipeline Construction
Advanced Data Transformation in LangChain: Mastering RunnableAssign for Dynamic Pipeline Construction
In the world of LLM application development, data manipulation and transformation are critical operations that can make or break your application’s functionality. LangChain provides a powerful tool for these operations: RunnableAssign
. This component allows developers to elegantly transform and augment data within their LangChain pipelines, making it an essential tool for advanced LangChain applications.
What is RunnableAssign?
RunnableAssign
is a specialized runnable in LangChain that takes dictionary inputs, applies transformations through a RunnableParallel
instance, and then combines the transformed data with the original input. In essence, it allows you to maintain your original data while adding new key-value pairs based on your specified transformations.
This class inherits from RunnableSerializable
and implements the standard Runnable Interface, giving it access to all the powerful methods available to runnables, such as with_config
, with_retry
, bind
, and more.
Why Use RunnableAssign?
Before diving into implementation details, let’s understand why RunnableAssign
is so valuable:
- Data Preservation: It maintains the original input data while adding new fields
- Parallel Processing: Leverages parallel execution for efficiency
- Pipeline Integration: Seamlessly fits into LangChain’s composable pipeline architecture
- Flexibility: Works with any transformation that can be expressed as a runnable
Basic Usage
Let’s start with a simple example to demonstrate how RunnableAssign
works:
from langchain_core.runnables import RunnableAssign, RunnableParallel
# Create a simple transformation that adds a greeting
def create_greeting(data):
return f"Hello, {data['name']}!"
# Create a RunnableParallel that will add a greeting field
greeting_mapper = RunnableParallel(greeting=create_greeting)
# Create a RunnableAssign that will add the greeting to the input
greeting_assigner = RunnableAssign(greeting_mapper)
# Use the RunnableAssign
result = greeting_assigner.invoke({"name": "Alice", "age": 30})
print(result)
# Output: {"name": "Alice", "age": 30, "greeting": "Hello, Alice!"}
In this example, the original input dictionary with name
and age
is preserved, and a new greeting
field is added based on the transformation.
Advanced Transformations
RunnableAssign
becomes even more powerful when combined with other LangChain components. Here’s an example using a language model to generate summaries:
from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
# Create a chat model
model = ChatOpenAI()
# Create a summarizer function
def summarize_text(data):
prompt = f"Summarize the following text in one sentence: {data['text']}"
return model.invoke(prompt)
# Create a RunnableParallel that will add a summary field
summary_mapper = RunnableParallel(summary=summarize_text)
# Create a RunnableAssign that will add the summary to the input
summary_assigner = RunnableAssign(summary_mapper)
# Use the RunnableAssign
result = summary_assigner.invoke({
"text": "Machine learning is a field of study that gives computers the ability to learn without being explicitly programmed. It focuses on developing algorithms that can access data and use it to learn for themselves.",
"source": "Introduction to ML"
})
print(result)
# Output includes original data plus a "summary" field
Chaining Multiple Transformations
One of the strengths of LangChain’s pipeline architecture is the ability to chain operations. You can chain multiple RunnableAssign
instances to perform sequential transformations:
from langchain_core.runnables import RunnableAssign, RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
# First transformation: Add a summary
summary_mapper = RunnableParallel(summary=lambda x: model.invoke(f"Summarize: {x['content']}"))
summary_assigner = RunnableAssign(summary_mapper)
# Second transformation: Add sentiment analysis
sentiment_mapper = RunnableParallel(sentiment=lambda x: model.invoke(f"What's the sentiment of this text? {x['content']}"))
sentiment_assigner = RunnableAssign(sentiment_mapper)
# Chain the transformations
pipeline = summary_assigner | sentiment_assigner
# Use the pipeline
result = pipeline.invoke({"content": "I absolutely loved the new movie. The plot was engaging and the characters were well-developed."})
print(result)
# Output includes original content plus summary and sentiment fields
Integrating with LLM Chains
RunnableAssign
truly shines when integrated into larger LLM chains. Here’s an example of a document processing pipeline:
from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
# Create components
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template(
"Extract the main topics from this document: {document}"
)
# Create a chain for topic extraction
topic_chain = prompt | model | StrOutputParser()
# Create a mapper for adding topics
topic_mapper = RunnableParallel(topics=lambda x: topic_chain.invoke({"document": x["document"]}))
topic_assigner = RunnableAssign(topic_mapper)
# Create a mapper for adding metadata
metadata_mapper = RunnableParallel(
metadata=lambda x: {
"word_count": len(x["document"].split()),
"processed_date": "2023-11-15"
}
)
metadata_assigner = RunnableAssign(metadata_mapper)
# Combine into a document processing pipeline
document_pipeline = topic_assigner | metadata_assigner
# Process a document
result = document_pipeline.invoke({
"document": "Artificial intelligence is transforming industries worldwide. From healthcare to finance, AI applications are creating new opportunities and challenges.",
"source": "AI Trends Report"
})
print(result)
# Output includes original document, extracted topics, and metadata
Advanced Features of RunnableAssign
Beyond the basic functionality, RunnableAssign
inherits all the powerful features of the Runnable interface:
Asynchronous Execution
For high-performance applications, you can use the async methods:
import asyncio
from langchain_core.runnables import RunnableAssign, RunnableParallel
# Define an async transformation
async def async_transform(data):
await asyncio.sleep(1) # Simulate async work
return f"Processed {data['field']}"
# Create an async-compatible RunnableParallel
async_mapper = RunnableParallel(result=async_transform)
async_assigner = RunnableAssign(async_mapper)
# Use the async method
async def main():
result = await async_assigner.ainvoke({"field": "test data"})
print(result)
asyncio.run(main())
Streaming Support
RunnableAssign
also supports streaming for real-time data processing:
from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI(streaming=True)
def generate_content(data):
return model.invoke(f"Write a short story about {data['topic']}")
content_mapper = RunnableParallel(content=generate_content)
content_assigner = RunnableAssign(content_mapper)
# Stream the results
for chunk in content_assigner.stream({"topic": "space exploration"}):
print(chunk)
Error Handling and Retries
You can add robust error handling using the with_retry
method:
from langchain_core.runnables import RunnableAssign, RunnableParallel
def flaky_operation(data):
# Simulate an operation that might fail
import random
if random.random() < 0.5:
raise Exception("Operation failed")
return f"Successfully processed {data['input']}"
mapper = RunnableParallel(result=flaky_operation)
assigner = RunnableAssign(mapper)
# Add retry logic
robust_assigner = assigner.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
# The operation will retry up to 3 times if it fails
result = robust_assigner.invoke({"input": "test data"})
print(result)
Practical Use Cases
Let’s explore some practical scenarios where RunnableAssign
shines:
Document Processing Pipeline
from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
# Create a document enrichment pipeline
enrichment_pipeline = (
RunnableAssign(RunnableParallel(
summary=lambda x: model.invoke(f"Summarize this document: {x['text']}") | StrOutputParser()
))
| RunnableAssign(RunnableParallel(
keywords=lambda x: model.invoke(f"Extract 5 keywords from this document: {x['text']}") | StrOutputParser()
))
| RunnableAssign(RunnableParallel(
sentiment=lambda x: model.invoke(f"What's the sentiment of this text? {x['text']}") | StrOutputParser()
))
)
# Process a document
document = {
"id": "doc123",
"text": "Climate change is accelerating, with global temperatures rising at an unprecedented rate. Scientists warn that immediate action is necessary to prevent catastrophic consequences.",
"source": "Environmental Report 2023"
}
enriched_document = enrichment_pipeline.invoke(document)
print(enriched_document)
User Query Enhancement
from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
# Create a query enhancement pipeline
query_enhancer = RunnableAssign(RunnableParallel(
expanded_query=lambda x: model.invoke(f"Expand this search query with relevant terms: {x['query']}"),
query_intent=lambda x: model.invoke(f"What is the likely intent behind this search query: {x['query']}"),
query_categories=lambda x: model.invoke(f"Categorize this query into up to 3 topics: {x['query']}")
))
# Enhance a user query
enhanced_query = query_enhancer.invoke({"query": "climate solutions renewable"})
print(enhanced_query)
Conclusion
RunnableAssign
is a powerful tool in the LangChain ecosystem that enables sophisticated data transformation and manipulation within your pipelines. By preserving original data while adding new fields, it allows for incremental enrichment of your data as it flows through your application.
Whether you’re building document processing systems, conversational agents, or data analysis pipelines, mastering RunnableAssign
will give you the flexibility to create more dynamic and powerful LangChain applications.
The combination of parallel processing capabilities, seamless integration with other LangChain components, and the full power of the Runnable interface makes RunnableAssign
an essential tool for advanced LangChain development.
By incorporating RunnableAssign
into your LangChain toolkit, you’ll be able to build more sophisticated, maintainable, and powerful AI applications that effectively transform and manipulate data throughout your processing pipelines.
This post was originally written in my native language and then translated using an LLM. I apologize if there are any grammatical inconsistencies.