6 minute read

Advanced Data Transformation in LangChain: Mastering RunnableAssign for Dynamic Pipeline Construction

In the world of LLM application development, data manipulation and transformation are critical operations that can make or break your application’s functionality. LangChain provides a powerful tool for these operations: RunnableAssign. This component allows developers to elegantly transform and augment data within their LangChain pipelines, making it an essential tool for advanced LangChain applications.

What is RunnableAssign?

RunnableAssign is a specialized runnable in LangChain that takes dictionary inputs, applies transformations through a RunnableParallel instance, and then combines the transformed data with the original input. In essence, it allows you to maintain your original data while adding new key-value pairs based on your specified transformations.

This class inherits from RunnableSerializable and implements the standard Runnable Interface, giving it access to all the powerful methods available to runnables, such as with_config, with_retry, bind, and more.

Why Use RunnableAssign?

Before diving into implementation details, let’s understand why RunnableAssign is so valuable:

  1. Data Preservation: It maintains the original input data while adding new fields
  2. Parallel Processing: Leverages parallel execution for efficiency
  3. Pipeline Integration: Seamlessly fits into LangChain’s composable pipeline architecture
  4. Flexibility: Works with any transformation that can be expressed as a runnable

Basic Usage

Let’s start with a simple example to demonstrate how RunnableAssign works:

from langchain_core.runnables import RunnableAssign, RunnableParallel

# Create a simple transformation that adds a greeting
def create_greeting(data):
    return f"Hello, {data['name']}!"

# Create a RunnableParallel that will add a greeting field
greeting_mapper = RunnableParallel(greeting=create_greeting)

# Create a RunnableAssign that will add the greeting to the input
greeting_assigner = RunnableAssign(greeting_mapper)

# Use the RunnableAssign
result = greeting_assigner.invoke({"name": "Alice", "age": 30})
print(result)
# Output: {"name": "Alice", "age": 30, "greeting": "Hello, Alice!"}

In this example, the original input dictionary with name and age is preserved, and a new greeting field is added based on the transformation.

Advanced Transformations

RunnableAssign becomes even more powerful when combined with other LangChain components. Here’s an example using a language model to generate summaries:

from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI

# Create a chat model
model = ChatOpenAI()

# Create a summarizer function
def summarize_text(data):
    prompt = f"Summarize the following text in one sentence: {data['text']}"
    return model.invoke(prompt)

# Create a RunnableParallel that will add a summary field
summary_mapper = RunnableParallel(summary=summarize_text)

# Create a RunnableAssign that will add the summary to the input
summary_assigner = RunnableAssign(summary_mapper)

# Use the RunnableAssign
result = summary_assigner.invoke({
    "text": "Machine learning is a field of study that gives computers the ability to learn without being explicitly programmed. It focuses on developing algorithms that can access data and use it to learn for themselves.",
    "source": "Introduction to ML"
})
print(result)
# Output includes original data plus a "summary" field

Chaining Multiple Transformations

One of the strengths of LangChain’s pipeline architecture is the ability to chain operations. You can chain multiple RunnableAssign instances to perform sequential transformations:

from langchain_core.runnables import RunnableAssign, RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

# First transformation: Add a summary
summary_mapper = RunnableParallel(summary=lambda x: model.invoke(f"Summarize: {x['content']}"))
summary_assigner = RunnableAssign(summary_mapper)

# Second transformation: Add sentiment analysis
sentiment_mapper = RunnableParallel(sentiment=lambda x: model.invoke(f"What's the sentiment of this text? {x['content']}"))
sentiment_assigner = RunnableAssign(sentiment_mapper)

# Chain the transformations
pipeline = summary_assigner | sentiment_assigner

# Use the pipeline
result = pipeline.invoke({"content": "I absolutely loved the new movie. The plot was engaging and the characters were well-developed."})
print(result)
# Output includes original content plus summary and sentiment fields

Integrating with LLM Chains

RunnableAssign truly shines when integrated into larger LLM chains. Here’s an example of a document processing pipeline:

from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

# Create components
model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template(
    "Extract the main topics from this document: {document}"
)

# Create a chain for topic extraction
topic_chain = prompt | model | StrOutputParser()

# Create a mapper for adding topics
topic_mapper = RunnableParallel(topics=lambda x: topic_chain.invoke({"document": x["document"]}))
topic_assigner = RunnableAssign(topic_mapper)

# Create a mapper for adding metadata
metadata_mapper = RunnableParallel(
    metadata=lambda x: {
        "word_count": len(x["document"].split()),
        "processed_date": "2023-11-15"
    }
)
metadata_assigner = RunnableAssign(metadata_mapper)

# Combine into a document processing pipeline
document_pipeline = topic_assigner | metadata_assigner

# Process a document
result = document_pipeline.invoke({
    "document": "Artificial intelligence is transforming industries worldwide. From healthcare to finance, AI applications are creating new opportunities and challenges.",
    "source": "AI Trends Report"
})
print(result)
# Output includes original document, extracted topics, and metadata

Advanced Features of RunnableAssign

Beyond the basic functionality, RunnableAssign inherits all the powerful features of the Runnable interface:

Asynchronous Execution

For high-performance applications, you can use the async methods:

import asyncio
from langchain_core.runnables import RunnableAssign, RunnableParallel

# Define an async transformation
async def async_transform(data):
    await asyncio.sleep(1)  # Simulate async work
    return f"Processed {data['field']}"

# Create an async-compatible RunnableParallel
async_mapper = RunnableParallel(result=async_transform)
async_assigner = RunnableAssign(async_mapper)

# Use the async method
async def main():
    result = await async_assigner.ainvoke({"field": "test data"})
    print(result)

asyncio.run(main())

Streaming Support

RunnableAssign also supports streaming for real-time data processing:

from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI(streaming=True)

def generate_content(data):
    return model.invoke(f"Write a short story about {data['topic']}")

content_mapper = RunnableParallel(content=generate_content)
content_assigner = RunnableAssign(content_mapper)

# Stream the results
for chunk in content_assigner.stream({"topic": "space exploration"}):
    print(chunk)

Error Handling and Retries

You can add robust error handling using the with_retry method:

from langchain_core.runnables import RunnableAssign, RunnableParallel

def flaky_operation(data):
    # Simulate an operation that might fail
    import random
    if random.random() < 0.5:
        raise Exception("Operation failed")
    return f"Successfully processed {data['input']}"

mapper = RunnableParallel(result=flaky_operation)
assigner = RunnableAssign(mapper)

# Add retry logic
robust_assigner = assigner.with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=True
)

# The operation will retry up to 3 times if it fails
result = robust_assigner.invoke({"input": "test data"})
print(result)

Practical Use Cases

Let’s explore some practical scenarios where RunnableAssign shines:

Document Processing Pipeline

from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI()

# Create a document enrichment pipeline
enrichment_pipeline = (
    RunnableAssign(RunnableParallel(
        summary=lambda x: model.invoke(f"Summarize this document: {x['text']}") | StrOutputParser()
    ))
    | RunnableAssign(RunnableParallel(
        keywords=lambda x: model.invoke(f"Extract 5 keywords from this document: {x['text']}") | StrOutputParser()
    ))
    | RunnableAssign(RunnableParallel(
        sentiment=lambda x: model.invoke(f"What's the sentiment of this text? {x['text']}") | StrOutputParser()
    ))
)

# Process a document
document = {
    "id": "doc123",
    "text": "Climate change is accelerating, with global temperatures rising at an unprecedented rate. Scientists warn that immediate action is necessary to prevent catastrophic consequences.",
    "source": "Environmental Report 2023"
}

enriched_document = enrichment_pipeline.invoke(document)
print(enriched_document)

User Query Enhancement

from langchain_core.runnables import RunnableAssign, RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

# Create a query enhancement pipeline
query_enhancer = RunnableAssign(RunnableParallel(
    expanded_query=lambda x: model.invoke(f"Expand this search query with relevant terms: {x['query']}"),
    query_intent=lambda x: model.invoke(f"What is the likely intent behind this search query: {x['query']}"),
    query_categories=lambda x: model.invoke(f"Categorize this query into up to 3 topics: {x['query']}")
))

# Enhance a user query
enhanced_query = query_enhancer.invoke({"query": "climate solutions renewable"})
print(enhanced_query)

Conclusion

RunnableAssign is a powerful tool in the LangChain ecosystem that enables sophisticated data transformation and manipulation within your pipelines. By preserving original data while adding new fields, it allows for incremental enrichment of your data as it flows through your application.

Whether you’re building document processing systems, conversational agents, or data analysis pipelines, mastering RunnableAssign will give you the flexibility to create more dynamic and powerful LangChain applications.

The combination of parallel processing capabilities, seamless integration with other LangChain components, and the full power of the Runnable interface makes RunnableAssign an essential tool for advanced LangChain development.

By incorporating RunnableAssign into your LangChain toolkit, you’ll be able to build more sophisticated, maintainable, and powerful AI applications that effectively transform and manipulate data throughout your processing pipelines.

This post was originally written in my native language and then translated using an LLM. I apologize if there are any grammatical inconsistencies.

Categories:

Updated: