Adding Streaming support to Vercel AI Swarm – An Agentic AI Framework

Martin Barreto
Tech entrepreneur, builder
In this post, I will explain how to implement and enable streaming in Vercel AI Swarm
, a Vercel AI experimental framework for building AI agents.
I will assume you are familiar with LLMs, Vercel AI SDK, AI Agents, Vercel AI Swarm experimental framework and how web streaming works. If not, please take some time to clearly understand these concepts and tools before continuing.
Chat app using Vercel AI SDK UI and streaming with streamSwarmWhy are agents important?
AI agents are able to handle complex tasks or workflows, preventing large unwieldy prompts that degrade LLM performance. By delegating tasks among multiple agents, each with task-specific prompts and limited tools, we gain a more modular, maintainable, and powerful system that reduces errors and fosters efficient AI applications.
Why is streaming important?
Streaming enables real-time, incremental response generation in AI LLM calls, delivering faster, more interactive user experiences.
Since the Swarm framework runs several LLM prompt calls taking more time to complete, streaming became a must-have feature. It allows sending the response in chunks, so the end-user is not blocked by the LLM response time. Additionally, we can show LLM steps activity like tool call invocations, agent handoffs, reasoning information, and ultimately stream the final text response.
About the streaming implementation
From the beginning, I aimed to follow a similar approach to the streamText
function in the Vercel AI SDK, trying to reuse the SDK types as much as possible and seamlessly integrate with the AI SDK UI, which provides frontend helpers to integrate AI functionality like useChat
and useCompletion
.
My app displays a chatbox interface in Shopify stores, making strong use of the Vercel AI SDK UI, so I ensured the streaming capability was fully compatible with it.
To implement this API, I delved into the Vercel AI SDK's codebase and had to understand some of their internals, apart from taking runSwarm
implementation as a starting point. Check out the feature request on GitHub and its corresponding runSwarm implementation for more details.
Just for clarification, runSwarm
is the AI function we aim to enhance with streaming capability. It is an experimental feature in the Vercel AI SDK that replicates OpenAI's Swarm, but currently, it does not support streaming.
The streamSwarm
API
The new streamSwarm
API returns a stream of TextStreamPart<any>
, enabling us to send the data to the client as soon as it gets generated by the LLM model.
function streamSwarm<CONTEXT = any>({
agent: activeAgent,
prompt,
context,
model,
maxSteps = 100,
toolChoice,
abortSignal,
debug = false,
onChunk,
onStepFinish,
onAgentFinish,
onFinish,
dataStream,
}: {...}): AsyncIterableStream<TextStreamPart<any>> {
//..
//.
}
agent: Agent
Initial active agent. Normally an agent that triangulates with other agents and does not perform any useful task by itself.
prompt: CoreMessage[] | string
Previous message thread. Assistant, User, Tool messages mainly.
context?: CONTEXT
These are context values that we can pass to agents and they can eventually update it.
model: LanguageModel
Set up the LLM model to use (we can overide the default model in agent configuration).
maxSteps?: number
Maximum number of steps to run. We don't want this to run forever.
toolChoice?: CoreToolChoice<{ [k: string]: CoreTool }>
Set up the tool choice configuration for the agents.
abortSignal?: AbortSignal
Signal to abort the LLM requests, for instance when the useChat
http request ends prematurely. End-user leaves the page or cancels the request.
debug?: boolean
Never mind, we never debug printing. 🥳
onChunk?: (event: { chunk: SwarmChunk }) => Promise<void> | void;
type SwarmChunk = Extract<
TextStreamPart<any>,
{
type: "text-delta" | "reasoning" | "tool-call" | "tool-call-streaming-start" | "tool-call-delta" | "tool-result";
}
> & { agent: Agent };
Callback that is called for each chunk of the stream. The stream processing will pause until the callback promise is resolved.
Callback that is called for each chunk of the stream. SwarmChunk
provides agent data in addition to Vercel AI TextStreamPart
onStepFinish?: (event: SwarmStepResult<any>) => Promise<void> | void;
type SwarmStepResult<TOOLS extends Record<string, CoreTool>> = StepResult<TOOLS> & { agent: Agent };
Callback that is called when a step is finished. As parameter it receives the step result and the agent instance that generated the step.
onAgentFinish?: (
event: Omit<StepResult<any>, "stepType" | "isContinued"> & { readonly steps: StepResult<any>[] } & { agent: Agent }
) => Promise<void> | void;
Callback that gets invoked whenever an agent finishes its execution, regardless of the stop reason (handoff, error, response, etc). Agent information is available in the event object.
onFinish?: (events: (Omit<StepResult<any>, "stepType" | "isContinued"> & { readonly steps: StepResult<any>[] } & { agent: Agent })[]) => void;
Callback that gets invoked whenever a swarm finishes its execution. Useful for saving response messages to the database, for instance. The agent that ran last is available in the event object.
dataStream?: DataStreamWriter
A data stream writer to merge additional data with the response stream. This can be used to add overall data or annotations to the streaming messages.
Important Considerations
-
While this API can be enhanced, my primary objective was to make it functional for my application rather than creating a reusable and thoroughly tested API.
-
As you may have noticed, the result type is a stream of
TextStreamPart<any>
.TextStreamPart
is the type used by the AI SDK to represent LLM streaming chunks. It encompasses more than just the chunk, but that detail is not crucial at the moment. -
With the Vercel AI SDK, we can effortlessly convert the
TextStreamPart
stream into aDataStreamString
stream. This format is used to send each chunk to the frontend and integrates seamlessly with the AI SDK UI. For those interested, you can explore the format specification here: stream protocol docs. -
I conducted some tests using
dataStream
to append additional information to the streaming messages. However, I encountered performance issues with stream delivery time compared to not using it. -
I have tested this with OpenAI models. It should be compatible with other providers that support function calls, but I cannot guarantee it.
How to use streamSwarm
Create the initial agent
Main agent can specify another agents as tools and handover to them.
export const mainAgent = new Agent<ChatContext>({
name: "agent-name",
system: (context) => `
// agent system prompt
`,
tools: {
transferToTaskSpecificAgent1: handoverTool<ChatContext>({
type: "handover",
description: `
// tool description
`,
parameters: z.object({}),
execute: ({ context }) => ({ agent: taskSpecificAgentInstance, context }),
}),
transferToTaskSpecificAgent2: handoverTool<ChatContext>({
type: "handover",
description: `
// tool description
`,
parameters: z.object({}),
execute: ({ context }) => ({ agent: taskSpecificAgentInstance2, context }),
}),
functionCallName: functionTool<ChatContext>({
description: `
// tool description
`,
parameters: z.object({
productId: z.number().describe("bla bla bla...")
}),
})
// ... other tools, likely non-handover tools too.
},
toolChoice: "auto",
});
const taskSpecificAgentInstance = new Agent<ChatContext>({....});
const taskSpecificAgentInstance2 = new Agent<ChatContext>({....});
Call StreamSwarm
const context: ChatContext = {
// ...
}
const stream: AsyncIterableStream<TextStreamPart<any>> = streamSwarm<ChatContext>({
agent: mainAgent,
prompt: coreMessages,
model: openaiModel,
abortSignal: req.signal,
dataStream,
context: {
...context,
},
maxSteps: 5,
onAgentFinish: async (result) => {
console.log("onAgentFinish", result);
},
onFinish: async (result) => {
// flat all messages
const responseMessages: ResponseMessage[] = result.flatMap((event) =>
event.steps.flatMap((step) => step.response.messages)
);
// save to DB
const allMessages = [...coreMessages, ...responseMessages];
const sanitizedMessages = sanitizeResponseMessages({ messages: allMessages, reasoning: undefined });
saveToDB(chatSession, sanitizedMessages);
},
});
AsyncIterableStream
is a wrapper around theReadableStream
that allows to easily iterate over the stream.type AsyncIterableStream<T> = AsyncIterable<T> & ReadableStream<T>;
Transform the TextStreamPart
stream into a DataStreamString
const dataStreamSwarm = stream.pipeThrough(
getStreamPartsTransformer({ getErrorMessage: (error) => `An error occurred: ${error.message || "Unknown error"}` })
);
getStreamPartsTransformer
converts the stream into a stream ofTextStreamPart<any>
so we can merge it with the response stream.
Return a data stream response from route handler (nextjs 15+, Vercel AI 4+)
If you don't need to add additional data to the streaming, you can just return the stream directly.
return new Response(dataStreamSwarm, {
status: 200,
statusText: "OK",
headers: {
"Content-Type": "text/plain; charset=utf-8",
"X-Vercel-AI-Data-Stream": "v1",
},
});
Thats it on how you can use streamSwarm
API.
Alternatively, you can do all steps (the invocation and stream tranformation) inside the execute callback of createDataStreamResponse
, which is useful when we need to append extra stream data to the response stream.
return createDataStreamResponse({
execute: (dataStream: DataStreamWriter) => {
const stream: ReadableStream<any> = streamSwarm<ChatContext>({
// ..
onAgentFinish: async (result) => {
dataStream.writeData("Some data");
},
onFinish: async (result) => {
dataStream.writeMessageAnnotation("Message Annotation");
},
});
const dataStreamSwarm = stream.pipeThrough(
getStreamPartsTransformer({ getErrorMessage: (error) => `An error occurred: ${error.message || "Unknown error"}` })
);
dataStream.merge(dataStreamSwarm);
},
});
Implementation details
In this section I will explain some implementation details of the streamSwarm
API. It's hard to get into every technical decision I made but I will try to cover the most important ones. You can see the complete implementation on a github gist in my github account here.
Create a stream that streamSwarm
function returns
StreamSwarm returns a stream of TextStreamPart<any>
, basically all LLM calls chunks. Same type as native streamText
stream.
//..
// create a response stream
const response = new ReadableStream<TextStreamPart<any>>({
async start(controller) {
// ...
// ...
// ...
}
})
return createAsyncIterableStream(response);
We actually return a
AsyncIterableStream
that is a wrapper around theReadableStream
. Just for convenience and to be able to easily iterate over the stream if needed.
Invoke the active agent until the finish reason is not a handoff to another agent
At any point in time we have an active agent that basically executes streamText
Vercel AI helper which returns a StreamTextResult<any, any>
that contains the full stream of chunks for the LLM call.
We enqueue chunks to the controller
variable.
outerLoop: while (responseMessages.length < maxSteps) {
// ...
// ...
// ...
lastResult = streamText({ model: activeAgent.model ?? model,
system: typeof activeAgent.system === "function"
? activeAgent.system(context)
: activeAgent.system,
messages: responseMessages,
// ...
// ...
// ...
const fullStreamReader: ReadableStreamReader<TextStreamPart<any>> = lastResult.fullStream.getReader();
// ...
// ...
// ...
else if (value.type === "finish" && value.finishReason !== "tool-calls") {
// we are done, add finish message to the response messages
controller.enqueue(value);
break outerLoop;
}
// ...
// ...
}
Callbacks implementation
onChunk({ chunk }) {
return onChunk?.({ chunk: { ...chunk, agent: activeAgent } }) ?? Promise.resolve();
},
Very straight forward. We just add agent information to the chunk.
onStepFinish: (event: StepResult<any>) => {
// add step messages to the response messages
// We will use responseMessages as messages in the next `streamText` invocation (corresponding to the next agent). Nothing to do with the callback implementation this next line.
responseMessages.push(event.response.messages);
return onStepFinish?.({ ...event, agent: activeAgent }) ?? Promise.resolve();
},
Again, we just add agent information to the onStepFinish.
onFinish
.... Here things get wild.
- We have to hand over to another agent if step finish reason is a tool call.
- We have to stream tool result to agent tool invocation.
- We have not just to stream it, but also to fix response messages in order to perform next iteration with valid messages.
- We need to save intermediate onFinish
stremText
message events to be able to call finalstreamSwarm
on finish callback with the right data. - We need to call onFinish if step finish reason is not a tool call.
onFinish: (event: Omit<StepResult<any>, "stepType" | "isContinued"> & { readonly steps: StepResult<any>[] }) => {
// adding messages to the response messages
const lastStep = event.steps.at(-1);
const previousActiveAgent = activeAgent;
if (lastStep?.finishReason === "tool-calls") {
// the generation stopped with an unhandled tool call
const { toolCalls, toolResults } = lastStep;
const unhandledHandoverCalls = getUnhandledHandoverCalls(toolCalls, toolResults, activeAgent);
// take the first handover call (other handover calls are ignored)
let handoverToolResult: ToolResultPart | undefined = undefined;
// process handover calls
if (unhandledHandoverCalls.length > 0) {
const handoverTool = activeAgent.tools?.[unhandledHandoverCalls[0].toolName]! as AgentHandoverTool<CONTEXT, any>;
const result = handoverTool.execute(unhandledHandoverCalls[0].args, {
context: context as any,
abortSignal,
});
activeAgent = result.agent;
context = result.context ?? context;
handoverToolResult = {
type: "tool-result",
toolCallId: unhandledHandoverCalls[0].toolCallId,
toolName: unhandledHandoverCalls[0].toolName,
result: `Handing over to agent ${activeAgent.name}`,
};
// stream tool result of the handover call
controller.enqueue({ ...handoverToolResult, args: unhandledHandoverCalls[0].args });
// update responseMessages to add tool result of the handover call
const toolMessage: CoreToolMessage | undefined =
responseMessages.at(-1)?.at(-1)?.role === "tool" ? (responseMessages.at(-1)?.at(-1) as CoreToolMessage) : undefined;
const assistantMessage: CoreAssistantMessage = responseMessages
.at(-1)
?.at(toolMessage === undefined ? -1 : -2) as CoreAssistantMessage;
// add handover tool result
toolMessage?.content.push(handoverToolResult) ??
responseMessages.at(-1)?.push({ role: "tool", content: [handoverToolResult], id: generateId() });
// clean out unused handover tool calls
if (typeof assistantMessage.content !== "string") {
const unusedHandoverToolCallIds = unhandledHandoverCalls.filter((_, index) => index > 0).map((call) => call.toolCallId);
assistantMessage.content = assistantMessage.content.filter((part) => {
return part.type === "tool-call" ? !unusedHandoverToolCallIds.includes(part.toolCallId) : true;
});
}
}
}
// just fix the return to call onFinish callback
// get all event.steps until last and create the last using responseMessages
const steps = event.steps.slice(0, -1);
const lastStepToFix = event.steps.at(-1)!;
const lastStepResult: StepResult<any> = {
...lastStepToFix,
response: { ...lastStepToFix.response, messages: responseMessages.at(-1)! },
};
// create new event with the fixed last step
const newEvent: Omit<StepResult<any>, "stepType" | "isContinued"> & { readonly steps: StepResult<any>[] } & { agent: Agent } = {
...event,
steps: [...steps, lastStepResult],
agent: previousActiveAgent,
};
allFinishEvents.push(newEvent);
const ret = onAgentFinish?.(newEvent) ?? Promise.resolve();
if (lastStep?.finishReason !== "tool-calls") {
onFinish?.(allFinishEvents);
}
return ret;
}
Process each streamText
result
const fullStreamReader: ReadableStreamReader<TextStreamPart<any>> = lastResult.fullStream.getReader();
We need to store the tool calls and tool results in order to append missing streaming parts like handover tool result.
// store tool calls and results
let recordedToolCalls: Array<CoreToolCallUnion<any>> = [];
let recordedToolResults: Array<CoreToolResultUnion<any>> = [];
We process streamText
as its generated
innerLoop: while (true) {
// read the full stream
const { done, value } = await fullStreamReader.read();
if (done) {
break innerLoop;
}
//..
//
}
Saving the tool calls and tool results is important to be able to append missing streaming parts like handover tool result.
// ...
// save tool calls and results
if (value.type === "tool-call") {
recordedToolCalls.push(value);
} else if (value.type === "tool-result") {
recordedToolResults.push(value);
}
//..
//.
we have to check if finish reason is a tool call
// ...
if (value.type === "step-finish" && value.finishReason === "tool-calls") {
//process tool calls
const handoverCalls = getUnhandledHandoverCalls(recordedToolCalls, recordedToolResults, activeAgent);
// take the first handover call (other handover calls are ignored)
let handoverToolResult: ToolResultPart | undefined = undefined;
if (handoverCalls.length > 0) {
const handoverToolResultTextStreamPart: TextStreamPart<any> = {
type: "tool-result",
toolCallId: handoverCalls[0].toolCallId,
toolName: handoverCalls[0].toolName,
result: `Handing over to agent ${handoverCalls[0].toolName}`,
args: handoverCalls[0].args,
};
// add tool response message to the response messages
controller.enqueue({ ...value, isContinued: true });
controller.enqueue(handoverToolResultTextStreamPart);
} else {
controller.enqueue({ ...value });
}
}
// ...
// ..
Notice that we are just streaming the handover tool result as a tool call.
isContinued: true
is used to makeuseChat
works properly, by settings this the new result part is appended to the last message.
And I will leave it here, these are the main ideas behind the implementation and you should be able to understand the rest of the code checking the gist.
Catch You in My Next Post!
I'm excited to share the insights I've gathered while building and scaling AI SaaS solutions using Vercel AI SDK, Next.js, AWS, and the Shopify API. In my upcoming posts, I'll dive into the real challenges I faced—those tricky problems that today's AI hasn't quite mastered yet. No fluff, no recycled content—just genuine insights.
Here's a sneak peek at what's coming up:
- Handy tips for using Prisma ORM and managing database environments in a Next.js SaaS app hosted on Vercel.
- A deep dive into Shopify OAuth implementation, authentication, and API integration.
- How I leverage ReactFlow to provide workflow templates and allow users to configure them, including tips on creating automatic layouts and custom edges, nodes, and actions.
- Strategies for managing free trial subscriptions with Stripe, and how I make it easy for users to upgrade to a paid plan.
- Vite configuration for building Shopify app extensions that let you refresh your e-commerce website and see changes instantly.
- Exploring the Shopify GraphQL API.
- And so much more...
I also offer personalized, short-term technical consulting on AI product development using the Vercel AI SDK and Next.js. Let's make your AI dreams a reality!
You can find me on X @mtnBarreto or connect with me on LinkedIn. I'd love to hear about what you're building!