Streaming
Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.
Overviewβ
Generating full responses from LLMs often incurs a delay of several seconds, which becomes more noticeable in complex applications with multiple model calls. Fortunately, LLMs generate responses iteratively, allowing for intermediate results to be displayed as they are produced. By streaming these intermediate outputs, LangChain enables smoother UX in LLM-powered apps and offers built-in support for streaming at the core of its design.
In this guide, we'll discuss streaming in LLM applications and explore how LangChain's streaming APIs facilitate real-time output from various components in your application.
What to stream in LLM applicationsβ
In applications involving LLMs, several types of data can be streamed to improve user experience by reducing perceived latency and increasing transparency. These include:
1. Streaming LLM outputsβ
The most common and critical data to stream is the output generated by the LLM itself. LLMs often take time to generate full responses, and by streaming the output in real-time, users can see partial results as they are produced. This provides immediate feedback and helps reduce the wait time for users.
2. Streaming pipeline or workflow progressβ
Beyond just streaming LLM output, itβs useful to stream progress through more complex workflows or pipelines, giving users a sense of how the application is progressing overall. This could include:
-
In LangGraph Workflows: With LangGraph, workflows are composed of nodes and edges that represent various steps. Streaming here involves tracking changes to the graph state as individual nodes request updates. This allows for more granular monitoring of which node in the workflow is currently active, giving real-time updates about the status of the workflow as it progresses through different stages.
-
In LCEL Pipelines: Streaming updates from an LCEL pipeline involves capturing progress from individual sub-runnables. For example, as different steps or components of the pipeline execute, you can stream which sub-runnable is currently running, providing real-time insight into the overall pipeline's progress.
Streaming pipeline or workflow progress is essential in providing users with a clear picture of where the application is in the execution process.
3. Streaming custom dataβ
In some cases, you may need to stream custom data that goes beyond the information provided by the pipeline or workflow structure. This custom information is injected within a specific step in the workflow, whether that step is a tool or a LangGraph node. For example, you could stream updates about what a tool is doing in real-time or the progress through a LangGraph node. This granular data, which is emitted directly from within the step, provides more detailed insights into the execution of the workflow and is especially useful in complex processes where more visibility is needed.
Streaming APIsβ
LangChain two main APIs for streaming output in real-time. These APIs are supported by any component that implements the Runnable Interface, including LLMs, compiled LangGraph graphs, and any Runnable generated with LCEL.
- sync stream and async astream: Use to stream outputs from individual Runnables (e.g., a chat model) as they are generated or stream any workflow created with LangGraph.
- The async only astream_events: Use this API to get access to custom events and intermediate outputs from LLM applications built entirely with LCEL. Note that this API is available, but not needed when working with LangGraph.
In addition, there is a legacy async astream_log API. This API is not recommended for new projects it is more complex and less feature-rich than the other streaming APIs.
stream()
and astream()
β
The stream()
method returns an iterator that yields chunks of output synchronously as they are produced. You can use a for
loop to process each chunk in real-time. For example, when using an LLM, this allows the output to be streamed incrementally as it is generated, reducing the wait time for users.
The type of chunk yielded by the stream()
and astream()
methods depends on the component being streamed. For example, when streaming from an LLM each component will be an AIMessageChunk; however, for other components, the chunk may be different.
The stream()
method returns an iterator that yields these chunks as they are produced. For example,
for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)
The asynchronous version, astream()
, works similarly but is designed for non-blocking workflows. You can use it in asynchronous code to achieve the same real-time streaming behavior.