Sub-Agents
Decompose work across multiple specialized agents with a visible delegation log.
"""AG2 agent for the Sub-Agents demo.Demonstrates multi-agent delegation with a visible delegation log.A top-level "supervisor" ConversableAgent orchestrates three specializedsub-agents — each itself a ConversableAgent — exposed as supervisor tools: - `research_agent` — gathers facts - `writing_agent` — drafts prose - `critique_agent` — reviews draftsEvery delegation appends an entry to the `delegations` slot in sharedagent state (via AG2's ContextVariables + ReplyResult), so the UI canrender a live "delegation log" as the supervisor fans work out andcollects results. This is the canonical AG2 sub-agents-as-tools pattern,adapted to surface delegation events to the frontend via AG-UI'sshared-state channel."""import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel): """One entry in the delegation log shown by the UI.""" id: str sub_agent: SubAgentName task: str status: DelegationStatus = "completed" result: str = ""class SubagentsSnapshot(BaseModel): """Shape of the shared `delegations` state slot rendered by the UI.""" delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent( name="research_sub_agent", system_message=dedent( """ You are a research sub-agent. Given a topic, produce a concise bulleted list of 3-5 key facts. No preamble, no closing. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent( name="writing_sub_agent", system_message=dedent( """ You are a writing sub-agent. Given a brief and optional source facts, produce a polished 1-paragraph draft. Be clear and concrete. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent( name="critique_sub_agent", system_message=dedent( """ You are an editorial critique sub-agent. Given a draft, produce 2-3 crisp, actionable critiques. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)async def _invoke_sub_agent(sub_agent: ConversableAgent, task: str) -> str: """Run a sub-agent on `task` and return its final reply text. `generate_reply` produces a single LLM completion against a one-shot user message. AG2's ``generate_reply`` is synchronous and performs a blocking LLM round-trip, so we offload it to a worker thread to keep the asyncio event loop responsive while the call is in flight. """ reply = await asyncio.to_thread( sub_agent.generate_reply, messages=[{"role": "user", "content": task}], ) if reply is None: return "" if isinstance(reply, dict): # ConversableAgent.generate_reply may return {"content": "..."}. return str(reply.get("content") or "") return str(reply)def _load_snapshot(context_variables: ContextVariables) -> SubagentsSnapshot: """Best-effort load of the SubagentsSnapshot from context variables. Logs at WARNING when state fails validation so silent corruption is visible in server logs instead of degrading to an empty snapshot without a trace. """ data = context_variables.data or {} try: return SubagentsSnapshot.model_validate(data) except Exception as exc: logger.warning( "subagents: failed to validate SubagentsSnapshot from context " "variables (%s: %s); falling back to empty snapshot", exc.__class__.__name__, exc, ) return SubagentsSnapshot()def _record_delegation( context_variables: ContextVariables, sub_agent: SubAgentName, task: str, result: str, status: DelegationStatus = "completed",) -> ReplyResult: """Append a delegation entry to shared state and return ReplyResult.""" snapshot = _load_snapshot(context_variables) snapshot.delegations.append( Delegation( id=str(uuid.uuid4()), sub_agent=sub_agent, task=task, status=status, result=result, ) ) context_variables.update(snapshot.model_dump()) return ReplyResult( message=result, context_variables=context_variables, )async def _run_delegation( context_variables: ContextVariables, sub_agent_name: SubAgentName, sub_agent: ConversableAgent, task: str,) -> ReplyResult: """Invoke a sub-agent and record the outcome (completed or failed). If the underlying ``generate_reply`` raises (transport error, quota, SDK bug, ...), we record the delegation with ``status='failed'`` and return a sane ReplyResult so the supervisor can recover instead of crashing the turn. The full traceback is logged server-side; the user-facing ``result`` text only mentions the exception class to avoid leaking internals. """ try: result = await _invoke_sub_agent(sub_agent, task) except Exception as exc: logger.exception( "subagents: sub-agent %s failed while handling task", sub_agent_name ) failure_message = ( f"sub-agent call failed: {exc.__class__.__name__} (see server logs)" ) return _record_delegation( context_variables, sub_agent_name, task, failure_message, status="failed", ) return _record_delegation( context_variables, sub_agent_name, task, result, status="completed", )# ---------------------------------------------------------------------------# Supervisor tools (each tool delegates to one sub-agent)# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call asynchronously runs the# matching sub-agent, records the delegation into shared state via# ContextVariables, and returns a ReplyResult the supervisor reads as# its tool output on the next step.@tool()async def research_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a research task to the research sub-agent. Use for: gathering facts, background, definitions, statistics. Returns a bulleted list of key facts. Args: task: The specific research question or topic to investigate. """ return await _run_delegation( context_variables, "research_agent", _research_agent, task )@tool()async def writing_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a drafting task to the writing sub-agent. Use for: producing a polished paragraph, draft, or summary. Pass relevant facts from prior research inside ``task``. Args: task: The brief plus any relevant facts the writer should use. """ return await _run_delegation( context_variables, "writing_agent", _writing_agent, task )@tool()async def critique_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a critique task to the critique sub-agent. Use for: reviewing a draft and suggesting concrete improvements. Args: task: The draft to critique (paste it directly into ``task``). """ return await _run_delegation( context_variables, "critique_agent", _critique_agent, task )# ---------------------------------------------------------------------------# Supervisor (the agent we export)# ---------------------------------------------------------------------------supervisor = ConversableAgent( name="supervisor", system_message=dedent( """ You are a supervisor agent that coordinates three specialized sub-agents to produce high-quality deliverables. Available sub-agents (call them as tools): - research_agent: gathers facts on a topic. - writing_agent: turns facts + a brief into a polished draft. - critique_agent: reviews a draft and suggests improvements. For most non-trivial user requests, delegate in sequence: research -> write -> critique. Pass the relevant facts/draft through the `task` argument of each tool. Keep your own messages short — explain the plan once, delegate, then return a concise summary once done. The UI shows the user a live log of every sub-agent delegation, so don't repeat sub-agent output verbatim in your final reply — just summarize. """ ).strip(), llm_config=LLMConfig({"model": "gpt-4o-mini", "stream": True}), human_input_mode="NEVER", # Limit supervisor steps to bound delegation fan-out. max_consecutive_auto_reply=8, functions=[research_agent, writing_agent, critique_agent],)stream = AGUIStream(supervisor)subagents_app = FastAPI()subagents_app.mount("", stream.build_asgi())What is this?#
Sub-agents are the canonical multi-agent pattern: a top-level supervisor LLM orchestrates one or more specialized sub-agents by exposing each of them as a tool. The supervisor decides what to delegate, the sub-agents do their narrow job, and their results flow back up to the supervisor's next step.
This is fundamentally the same shape as tool-calling, but each "tool" is itself a full-blown agent with its own system prompt and (often) its own tools, memory, and model.
When should I use this?#
Reach for sub-agents when a task has distinct specialized sub-tasks that each benefit from their own focus:
- Research → Write → Critique pipelines, where each stage needs a different system prompt and temperature.
- Router + specialists, where one agent classifies the request and dispatches to the right expert.
- Divide-and-conquer — any problem that fits cleanly into parallel or sequential sub-problems.
The example below uses the Research → Write → Critique shape as the canonical example.
Setting up sub-agents#
Each sub-agent is a full create_agent(...) call with its own model,
its own system prompt, and (optionally) its own tools. They don't share
memory or tools with the supervisor; the supervisor only ever sees
what the sub-agent returns.
import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel): """One entry in the delegation log shown by the UI.""" id: str sub_agent: SubAgentName task: str status: DelegationStatus = "completed" result: str = ""class SubagentsSnapshot(BaseModel): """Shape of the shared `delegations` state slot rendered by the UI.""" delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent( name="research_sub_agent", system_message=dedent( """ You are a research sub-agent. Given a topic, produce a concise bulleted list of 3-5 key facts. No preamble, no closing. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent( name="writing_sub_agent", system_message=dedent( """ You are a writing sub-agent. Given a brief and optional source facts, produce a polished 1-paragraph draft. Be clear and concrete. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent( name="critique_sub_agent", system_message=dedent( """ You are an editorial critique sub-agent. Given a draft, produce 2-3 crisp, actionable critiques. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)Keep sub-agent system prompts narrow and focused. The point of this pattern is that each one does one thing well. If a sub-agent needs to know the whole user context to do its job, that's a signal the boundary is wrong.
Exposing sub-agents as tools#
The supervisor delegates by calling tools. Each tool is a thin wrapper
around sub_agent.invoke(...) that:
- Runs the sub-agent synchronously on the supplied
taskstring. - Records the delegation into a
delegationsslot in shared agent state (so the UI can render a live log). - Returns the sub-agent's final message as a
ToolMessage, which the supervisor sees as a normal tool result on its next turn.
import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel): """One entry in the delegation log shown by the UI.""" id: str sub_agent: SubAgentName task: str status: DelegationStatus = "completed" result: str = ""class SubagentsSnapshot(BaseModel): """Shape of the shared `delegations` state slot rendered by the UI.""" delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent( name="research_sub_agent", system_message=dedent( """ You are a research sub-agent. Given a topic, produce a concise bulleted list of 3-5 key facts. No preamble, no closing. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent( name="writing_sub_agent", system_message=dedent( """ You are a writing sub-agent. Given a brief and optional source facts, produce a polished 1-paragraph draft. Be clear and concrete. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent( name="critique_sub_agent", system_message=dedent( """ You are an editorial critique sub-agent. Given a draft, produce 2-3 crisp, actionable critiques. No preamble. """ ).strip(), llm_config=_SUB_LLM_CONFIG, human_input_mode="NEVER", max_consecutive_auto_reply=1,)async def _invoke_sub_agent(sub_agent: ConversableAgent, task: str) -> str: """Run a sub-agent on `task` and return its final reply text. `generate_reply` produces a single LLM completion against a one-shot user message. AG2's ``generate_reply`` is synchronous and performs a blocking LLM round-trip, so we offload it to a worker thread to keep the asyncio event loop responsive while the call is in flight. """ reply = await asyncio.to_thread( sub_agent.generate_reply, messages=[{"role": "user", "content": task}], ) if reply is None: return "" if isinstance(reply, dict): # ConversableAgent.generate_reply may return {"content": "..."}. return str(reply.get("content") or "") return str(reply)def _load_snapshot(context_variables: ContextVariables) -> SubagentsSnapshot: """Best-effort load of the SubagentsSnapshot from context variables. Logs at WARNING when state fails validation so silent corruption is visible in server logs instead of degrading to an empty snapshot without a trace. """ data = context_variables.data or {} try: return SubagentsSnapshot.model_validate(data) except Exception as exc: logger.warning( "subagents: failed to validate SubagentsSnapshot from context " "variables (%s: %s); falling back to empty snapshot", exc.__class__.__name__, exc, ) return SubagentsSnapshot()def _record_delegation( context_variables: ContextVariables, sub_agent: SubAgentName, task: str, result: str, status: DelegationStatus = "completed",) -> ReplyResult: """Append a delegation entry to shared state and return ReplyResult.""" snapshot = _load_snapshot(context_variables) snapshot.delegations.append( Delegation( id=str(uuid.uuid4()), sub_agent=sub_agent, task=task, status=status, result=result, ) ) context_variables.update(snapshot.model_dump()) return ReplyResult( message=result, context_variables=context_variables, )async def _run_delegation( context_variables: ContextVariables, sub_agent_name: SubAgentName, sub_agent: ConversableAgent, task: str,) -> ReplyResult: """Invoke a sub-agent and record the outcome (completed or failed). If the underlying ``generate_reply`` raises (transport error, quota, SDK bug, ...), we record the delegation with ``status='failed'`` and return a sane ReplyResult so the supervisor can recover instead of crashing the turn. The full traceback is logged server-side; the user-facing ``result`` text only mentions the exception class to avoid leaking internals. """ try: result = await _invoke_sub_agent(sub_agent, task) except Exception as exc: logger.exception( "subagents: sub-agent %s failed while handling task", sub_agent_name ) failure_message = ( f"sub-agent call failed: {exc.__class__.__name__} (see server logs)" ) return _record_delegation( context_variables, sub_agent_name, task, failure_message, status="failed", ) return _record_delegation( context_variables, sub_agent_name, task, result, status="completed", )# ---------------------------------------------------------------------------# Supervisor tools (each tool delegates to one sub-agent)# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call asynchronously runs the# matching sub-agent, records the delegation into shared state via# ContextVariables, and returns a ReplyResult the supervisor reads as# its tool output on the next step.@tool()async def research_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a research task to the research sub-agent. Use for: gathering facts, background, definitions, statistics. Returns a bulleted list of key facts. Args: task: The specific research question or topic to investigate. """ return await _run_delegation( context_variables, "research_agent", _research_agent, task )@tool()async def writing_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a drafting task to the writing sub-agent. Use for: producing a polished paragraph, draft, or summary. Pass relevant facts from prior research inside ``task``. Args: task: The brief plus any relevant facts the writer should use. """ return await _run_delegation( context_variables, "writing_agent", _writing_agent, task )@tool()async def critique_agent( context_variables: ContextVariables, task: str,) -> ReplyResult: """Delegate a critique task to the critique sub-agent. Use for: reviewing a draft and suggesting concrete improvements. Args: task: The draft to critique (paste it directly into ``task``). """ return await _run_delegation( context_variables, "critique_agent", _critique_agent, task )This is where CopilotKit's shared-state channel earns its keep: the
supervisor's tool calls mutate delegations as they happen, and the
frontend renders every new entry live.
Rendering a live delegation log#
On the frontend, the delegation log is just a reactive render of the
delegations slot. Subscribe with useAgent({ updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged] }),
read agent.state.delegations, and render one card per entry.
/** * Live delegation log — renders the `delegations` slot of agent state. * * Each entry corresponds to one invocation of an AG2 sub-agent. The list * grows in real time as the supervisor fans work out to its children; * each delegation is appended via the supervisor's tool returning a * ReplyResult with updated ContextVariables, which AG-UI surfaces to * the UI through agent state. */export function DelegationLog({ delegations, isRunning }: DelegationLogProps) { return ( <div data-testid="delegation-log" className="w-full h-full flex flex-col bg-white rounded-2xl shadow-sm border border-[#DBDBE5] overflow-hidden" > <div className="flex items-center justify-between px-6 py-3 border-b border-[#E9E9EF] bg-[#FAFAFC]"> <div className="flex items-center gap-3"> <span className="text-lg font-semibold text-[#010507]"> Sub-agent delegations </span> {isRunning && ( <span data-testid="supervisor-running" className="inline-flex items-center gap-1.5 px-2 py-0.5 rounded-full border border-[#BEC2FF] bg-[#BEC2FF1A] text-[#010507] text-[10px] font-semibold uppercase tracking-[0.12em]" > <span className="w-1.5 h-1.5 rounded-full bg-[#010507] animate-pulse" /> Supervisor running </span> )} </div> <span data-testid="delegation-count" className="text-xs font-mono text-[#838389]" > {delegations.length} calls </span> </div> <div className="flex-1 overflow-y-auto p-4 space-y-3"> {delegations.length === 0 ? ( <p className="text-[#838389] italic text-sm"> Ask the supervisor to complete a task. Every sub-agent it calls will appear here. </p> ) : ( delegations.map((d, idx) => { const style = SUB_AGENT_STYLE[d.sub_agent]; return ( <div key={d.id} data-testid="delegation-entry" className="border border-[#E9E9EF] rounded-xl p-3 bg-[#FAFAFC]" > <div className="flex items-center justify-between mb-2"> <div className="flex items-center gap-2"> <span className="text-xs font-mono text-[#AFAFB7]"> #{idx + 1} </span> <span className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color}`} > <span>{style.emoji}</span> <span>{style.label}</span> </span> </div> <span className={`text-[10px] uppercase tracking-[0.12em] font-semibold ${STATUS_BADGE[d.status]}`} > {d.status} </span> </div> <div className="text-xs text-[#57575B] mb-2"> <span className="font-semibold text-[#010507]">Task: </span> {d.task} </div> <div className="text-sm text-[#010507] whitespace-pre-wrap bg-white rounded-lg p-2.5 border border-[#E9E9EF]"> {d.result} </div> </div> ); }) )} </div> </div> );}The result: as the supervisor fans work out to its sub-agents, the log grows in real time, giving the user visibility into a process that would otherwise be a long opaque spinner.
Related#
- Shared State — the channel that makes the delegation log live.
- State streaming — stream individual sub-agent outputs token-by-token inside each log entry.
