CopilotKit

Sub-Agents

Decompose work across multiple specialized agents with a visible delegation log.


"""AG2 agent for the Sub-Agents demo.Demonstrates multi-agent delegation with a visible delegation log.A top-level "supervisor" ConversableAgent orchestrates three specializedsub-agents — each itself a ConversableAgent — exposed as supervisor tools:  - `research_agent`  — gathers facts  - `writing_agent`   — drafts prose  - `critique_agent`  — reviews draftsEvery delegation appends an entry to the `delegations` slot in sharedagent state (via AG2's ContextVariables + ReplyResult), so the UI canrender a live "delegation log" as the supervisor fans work out andcollects results. This is the canonical AG2 sub-agents-as-tools pattern,adapted to surface delegation events to the frontend via AG-UI'sshared-state channel."""import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel):    """One entry in the delegation log shown by the UI."""    id: str    sub_agent: SubAgentName    task: str    status: DelegationStatus = "completed"    result: str = ""class SubagentsSnapshot(BaseModel):    """Shape of the shared `delegations` state slot rendered by the UI."""    delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent(    name="research_sub_agent",    system_message=dedent(        """        You are a research sub-agent. Given a topic, produce a concise        bulleted list of 3-5 key facts. No preamble, no closing.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent(    name="writing_sub_agent",    system_message=dedent(        """        You are a writing sub-agent. Given a brief and optional source        facts, produce a polished 1-paragraph draft. Be clear and        concrete. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent(    name="critique_sub_agent",    system_message=dedent(        """        You are an editorial critique sub-agent. Given a draft, produce        2-3 crisp, actionable critiques. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)async def _invoke_sub_agent(sub_agent: ConversableAgent, task: str) -> str:    """Run a sub-agent on `task` and return its final reply text.    `generate_reply` produces a single LLM completion against a one-shot    user message. AG2's ``generate_reply`` is synchronous and performs a    blocking LLM round-trip, so we offload it to a worker thread to keep    the asyncio event loop responsive while the call is in flight.    """    reply = await asyncio.to_thread(        sub_agent.generate_reply,        messages=[{"role": "user", "content": task}],    )    if reply is None:        return ""    if isinstance(reply, dict):        # ConversableAgent.generate_reply may return {"content": "..."}.        return str(reply.get("content") or "")    return str(reply)def _load_snapshot(context_variables: ContextVariables) -> SubagentsSnapshot:    """Best-effort load of the SubagentsSnapshot from context variables.    Logs at WARNING when state fails validation so silent corruption is    visible in server logs instead of degrading to an empty snapshot    without a trace.    """    data = context_variables.data or {}    try:        return SubagentsSnapshot.model_validate(data)    except Exception as exc:        logger.warning(            "subagents: failed to validate SubagentsSnapshot from context "            "variables (%s: %s); falling back to empty snapshot",            exc.__class__.__name__,            exc,        )        return SubagentsSnapshot()def _record_delegation(    context_variables: ContextVariables,    sub_agent: SubAgentName,    task: str,    result: str,    status: DelegationStatus = "completed",) -> ReplyResult:    """Append a delegation entry to shared state and return ReplyResult."""    snapshot = _load_snapshot(context_variables)    snapshot.delegations.append(        Delegation(            id=str(uuid.uuid4()),            sub_agent=sub_agent,            task=task,            status=status,            result=result,        )    )    context_variables.update(snapshot.model_dump())    return ReplyResult(        message=result,        context_variables=context_variables,    )async def _run_delegation(    context_variables: ContextVariables,    sub_agent_name: SubAgentName,    sub_agent: ConversableAgent,    task: str,) -> ReplyResult:    """Invoke a sub-agent and record the outcome (completed or failed).    If the underlying ``generate_reply`` raises (transport error, quota,    SDK bug, ...), we record the delegation with ``status='failed'`` and    return a sane ReplyResult so the supervisor can recover instead of    crashing the turn. The full traceback is logged server-side; the    user-facing ``result`` text only mentions the exception class to    avoid leaking internals.    """    try:        result = await _invoke_sub_agent(sub_agent, task)    except Exception as exc:        logger.exception(            "subagents: sub-agent %s failed while handling task", sub_agent_name        )        failure_message = (            f"sub-agent call failed: {exc.__class__.__name__} (see server logs)"        )        return _record_delegation(            context_variables,            sub_agent_name,            task,            failure_message,            status="failed",        )    return _record_delegation(        context_variables,        sub_agent_name,        task,        result,        status="completed",    )# ---------------------------------------------------------------------------# Supervisor tools (each tool delegates to one sub-agent)# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call asynchronously runs the# matching sub-agent, records the delegation into shared state via# ContextVariables, and returns a ReplyResult the supervisor reads as# its tool output on the next step.@tool()async def research_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a research task to the research sub-agent.    Use for: gathering facts, background, definitions, statistics. Returns    a bulleted list of key facts.    Args:        task: The specific research question or topic to investigate.    """    return await _run_delegation(        context_variables, "research_agent", _research_agent, task    )@tool()async def writing_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a drafting task to the writing sub-agent.    Use for: producing a polished paragraph, draft, or summary. Pass    relevant facts from prior research inside ``task``.    Args:        task: The brief plus any relevant facts the writer should use.    """    return await _run_delegation(        context_variables, "writing_agent", _writing_agent, task    )@tool()async def critique_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a critique task to the critique sub-agent.    Use for: reviewing a draft and suggesting concrete improvements.    Args:        task: The draft to critique (paste it directly into ``task``).    """    return await _run_delegation(        context_variables, "critique_agent", _critique_agent, task    )# ---------------------------------------------------------------------------# Supervisor (the agent we export)# ---------------------------------------------------------------------------supervisor = ConversableAgent(    name="supervisor",    system_message=dedent(        """        You are a supervisor agent that coordinates three specialized        sub-agents to produce high-quality deliverables.        Available sub-agents (call them as tools):          - research_agent: gathers facts on a topic.          - writing_agent: turns facts + a brief into a polished draft.          - critique_agent: reviews a draft and suggests improvements.        For most non-trivial user requests, delegate in sequence:        research -> write -> critique. Pass the relevant facts/draft        through the `task` argument of each tool. Keep your own messages        short — explain the plan once, delegate, then return a concise        summary once done. The UI shows the user a live log of every        sub-agent delegation, so don't repeat sub-agent output verbatim        in your final reply — just summarize.        """    ).strip(),    llm_config=LLMConfig({"model": "gpt-4o-mini", "stream": True}),    human_input_mode="NEVER",    # Limit supervisor steps to bound delegation fan-out.    max_consecutive_auto_reply=8,    functions=[research_agent, writing_agent, critique_agent],)stream = AGUIStream(supervisor)subagents_app = FastAPI()subagents_app.mount("", stream.build_asgi())

What is this?#

Sub-agents are the canonical multi-agent pattern: a top-level supervisor LLM orchestrates one or more specialized sub-agents by exposing each of them as a tool. The supervisor decides what to delegate, the sub-agents do their narrow job, and their results flow back up to the supervisor's next step.

This is fundamentally the same shape as tool-calling, but each "tool" is itself a full-blown agent with its own system prompt and (often) its own tools, memory, and model.

When should I use this?#

Reach for sub-agents when a task has distinct specialized sub-tasks that each benefit from their own focus:

  • Research → Write → Critique pipelines, where each stage needs a different system prompt and temperature.
  • Router + specialists, where one agent classifies the request and dispatches to the right expert.
  • Divide-and-conquer — any problem that fits cleanly into parallel or sequential sub-problems.

The example below uses the Research → Write → Critique shape as the canonical example.

Setting up sub-agents#

Each sub-agent is a full create_agent(...) call with its own model, its own system prompt, and (optionally) its own tools. They don't share memory or tools with the supervisor; the supervisor only ever sees what the sub-agent returns.

subagents.py
import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel):    """One entry in the delegation log shown by the UI."""    id: str    sub_agent: SubAgentName    task: str    status: DelegationStatus = "completed"    result: str = ""class SubagentsSnapshot(BaseModel):    """Shape of the shared `delegations` state slot rendered by the UI."""    delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent(    name="research_sub_agent",    system_message=dedent(        """        You are a research sub-agent. Given a topic, produce a concise        bulleted list of 3-5 key facts. No preamble, no closing.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent(    name="writing_sub_agent",    system_message=dedent(        """        You are a writing sub-agent. Given a brief and optional source        facts, produce a polished 1-paragraph draft. Be clear and        concrete. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent(    name="critique_sub_agent",    system_message=dedent(        """        You are an editorial critique sub-agent. Given a draft, produce        2-3 crisp, actionable critiques. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)

Keep sub-agent system prompts narrow and focused. The point of this pattern is that each one does one thing well. If a sub-agent needs to know the whole user context to do its job, that's a signal the boundary is wrong.

Exposing sub-agents as tools#

The supervisor delegates by calling tools. Each tool is a thin wrapper around sub_agent.invoke(...) that:

  1. Runs the sub-agent synchronously on the supplied task string.
  2. Records the delegation into a delegations slot in shared agent state (so the UI can render a live log).
  3. Returns the sub-agent's final message as a ToolMessage, which the supervisor sees as a normal tool result on its next turn.
subagents.py
import asyncioimport loggingimport uuidfrom textwrap import dedentfrom typing import List, Literalfrom autogen import ConversableAgent, LLMConfigfrom autogen.ag_ui import AGUIStreamfrom autogen.agentchat import ContextVariables, ReplyResultfrom autogen.tools import toolfrom fastapi import FastAPIfrom pydantic import BaseModel, Fieldlogger = logging.getLogger(__name__)SubAgentName = Literal["research_agent", "writing_agent", "critique_agent"]DelegationStatus = Literal["running", "completed", "failed"]class Delegation(BaseModel):    """One entry in the delegation log shown by the UI."""    id: str    sub_agent: SubAgentName    task: str    status: DelegationStatus = "completed"    result: str = ""class SubagentsSnapshot(BaseModel):    """Shape of the shared `delegations` state slot rendered by the UI."""    delegations: List[Delegation] = Field(default_factory=list)# ---------------------------------------------------------------------------# Sub-agents (real ConversableAgents under the hood)# ---------------------------------------------------------------------------## Each sub-agent is its own LLM ConversableAgent with a focused system# prompt. They don't share memory or tools with the supervisor — the# supervisor only sees what each sub-agent's final reply produces._SUB_LLM_CONFIG = LLMConfig({"model": "gpt-4o-mini", "stream": False})_research_agent = ConversableAgent(    name="research_sub_agent",    system_message=dedent(        """        You are a research sub-agent. Given a topic, produce a concise        bulleted list of 3-5 key facts. No preamble, no closing.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_writing_agent = ConversableAgent(    name="writing_sub_agent",    system_message=dedent(        """        You are a writing sub-agent. Given a brief and optional source        facts, produce a polished 1-paragraph draft. Be clear and        concrete. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)_critique_agent = ConversableAgent(    name="critique_sub_agent",    system_message=dedent(        """        You are an editorial critique sub-agent. Given a draft, produce        2-3 crisp, actionable critiques. No preamble.        """    ).strip(),    llm_config=_SUB_LLM_CONFIG,    human_input_mode="NEVER",    max_consecutive_auto_reply=1,)async def _invoke_sub_agent(sub_agent: ConversableAgent, task: str) -> str:    """Run a sub-agent on `task` and return its final reply text.    `generate_reply` produces a single LLM completion against a one-shot    user message. AG2's ``generate_reply`` is synchronous and performs a    blocking LLM round-trip, so we offload it to a worker thread to keep    the asyncio event loop responsive while the call is in flight.    """    reply = await asyncio.to_thread(        sub_agent.generate_reply,        messages=[{"role": "user", "content": task}],    )    if reply is None:        return ""    if isinstance(reply, dict):        # ConversableAgent.generate_reply may return {"content": "..."}.        return str(reply.get("content") or "")    return str(reply)def _load_snapshot(context_variables: ContextVariables) -> SubagentsSnapshot:    """Best-effort load of the SubagentsSnapshot from context variables.    Logs at WARNING when state fails validation so silent corruption is    visible in server logs instead of degrading to an empty snapshot    without a trace.    """    data = context_variables.data or {}    try:        return SubagentsSnapshot.model_validate(data)    except Exception as exc:        logger.warning(            "subagents: failed to validate SubagentsSnapshot from context "            "variables (%s: %s); falling back to empty snapshot",            exc.__class__.__name__,            exc,        )        return SubagentsSnapshot()def _record_delegation(    context_variables: ContextVariables,    sub_agent: SubAgentName,    task: str,    result: str,    status: DelegationStatus = "completed",) -> ReplyResult:    """Append a delegation entry to shared state and return ReplyResult."""    snapshot = _load_snapshot(context_variables)    snapshot.delegations.append(        Delegation(            id=str(uuid.uuid4()),            sub_agent=sub_agent,            task=task,            status=status,            result=result,        )    )    context_variables.update(snapshot.model_dump())    return ReplyResult(        message=result,        context_variables=context_variables,    )async def _run_delegation(    context_variables: ContextVariables,    sub_agent_name: SubAgentName,    sub_agent: ConversableAgent,    task: str,) -> ReplyResult:    """Invoke a sub-agent and record the outcome (completed or failed).    If the underlying ``generate_reply`` raises (transport error, quota,    SDK bug, ...), we record the delegation with ``status='failed'`` and    return a sane ReplyResult so the supervisor can recover instead of    crashing the turn. The full traceback is logged server-side; the    user-facing ``result`` text only mentions the exception class to    avoid leaking internals.    """    try:        result = await _invoke_sub_agent(sub_agent, task)    except Exception as exc:        logger.exception(            "subagents: sub-agent %s failed while handling task", sub_agent_name        )        failure_message = (            f"sub-agent call failed: {exc.__class__.__name__} (see server logs)"        )        return _record_delegation(            context_variables,            sub_agent_name,            task,            failure_message,            status="failed",        )    return _record_delegation(        context_variables,        sub_agent_name,        task,        result,        status="completed",    )# ---------------------------------------------------------------------------# Supervisor tools (each tool delegates to one sub-agent)# ---------------------------------------------------------------------------# Each @tool wraps a sub-agent invocation. The supervisor LLM "calls"# these tools to delegate work; each call asynchronously runs the# matching sub-agent, records the delegation into shared state via# ContextVariables, and returns a ReplyResult the supervisor reads as# its tool output on the next step.@tool()async def research_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a research task to the research sub-agent.    Use for: gathering facts, background, definitions, statistics. Returns    a bulleted list of key facts.    Args:        task: The specific research question or topic to investigate.    """    return await _run_delegation(        context_variables, "research_agent", _research_agent, task    )@tool()async def writing_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a drafting task to the writing sub-agent.    Use for: producing a polished paragraph, draft, or summary. Pass    relevant facts from prior research inside ``task``.    Args:        task: The brief plus any relevant facts the writer should use.    """    return await _run_delegation(        context_variables, "writing_agent", _writing_agent, task    )@tool()async def critique_agent(    context_variables: ContextVariables,    task: str,) -> ReplyResult:    """Delegate a critique task to the critique sub-agent.    Use for: reviewing a draft and suggesting concrete improvements.    Args:        task: The draft to critique (paste it directly into ``task``).    """    return await _run_delegation(        context_variables, "critique_agent", _critique_agent, task    )

This is where CopilotKit's shared-state channel earns its keep: the supervisor's tool calls mutate delegations as they happen, and the frontend renders every new entry live.

Rendering a live delegation log#

On the frontend, the delegation log is just a reactive render of the delegations slot. Subscribe with useAgent({ updates: [UseAgentUpdate.OnStateChanged, UseAgentUpdate.OnRunStatusChanged] }), read agent.state.delegations, and render one card per entry.

delegation-log.tsx
/** * Live delegation log — renders the `delegations` slot of agent state. * * Each entry corresponds to one invocation of an AG2 sub-agent. The list * grows in real time as the supervisor fans work out to its children; * each delegation is appended via the supervisor's tool returning a * ReplyResult with updated ContextVariables, which AG-UI surfaces to * the UI through agent state. */export function DelegationLog({ delegations, isRunning }: DelegationLogProps) {  return (    <div      data-testid="delegation-log"      className="w-full h-full flex flex-col bg-white rounded-2xl shadow-sm border border-[#DBDBE5] overflow-hidden"    >      <div className="flex items-center justify-between px-6 py-3 border-b border-[#E9E9EF] bg-[#FAFAFC]">        <div className="flex items-center gap-3">          <span className="text-lg font-semibold text-[#010507]">            Sub-agent delegations          </span>          {isRunning && (            <span              data-testid="supervisor-running"              className="inline-flex items-center gap-1.5 px-2 py-0.5 rounded-full border border-[#BEC2FF] bg-[#BEC2FF1A] text-[#010507] text-[10px] font-semibold uppercase tracking-[0.12em]"            >              <span className="w-1.5 h-1.5 rounded-full bg-[#010507] animate-pulse" />              Supervisor running            </span>          )}        </div>        <span          data-testid="delegation-count"          className="text-xs font-mono text-[#838389]"        >          {delegations.length} calls        </span>      </div>      <div className="flex-1 overflow-y-auto p-4 space-y-3">        {delegations.length === 0 ? (          <p className="text-[#838389] italic text-sm">            Ask the supervisor to complete a task. Every sub-agent it calls will            appear here.          </p>        ) : (          delegations.map((d, idx) => {            const style = SUB_AGENT_STYLE[d.sub_agent];            return (              <div                key={d.id}                data-testid="delegation-entry"                className="border border-[#E9E9EF] rounded-xl p-3 bg-[#FAFAFC]"              >                <div className="flex items-center justify-between mb-2">                  <div className="flex items-center gap-2">                    <span className="text-xs font-mono text-[#AFAFB7]">                      #{idx + 1}                    </span>                    <span                      className={`inline-flex items-center gap-1 px-2 py-0.5 rounded-full text-[10px] font-semibold uppercase tracking-[0.1em] border ${style.color}`}                    >                      <span>{style.emoji}</span>                      <span>{style.label}</span>                    </span>                  </div>                  <span                    className={`text-[10px] uppercase tracking-[0.12em] font-semibold ${STATUS_BADGE[d.status]}`}                  >                    {d.status}                  </span>                </div>                <div className="text-xs text-[#57575B] mb-2">                  <span className="font-semibold text-[#010507]">Task: </span>                  {d.task}                </div>                <div className="text-sm text-[#010507] whitespace-pre-wrap bg-white rounded-lg p-2.5 border border-[#E9E9EF]">                  {d.result}                </div>              </div>            );          })        )}      </div>    </div>  );}

The result: as the supervisor fans work out to its sub-agents, the log grows in real time, giving the user visibility into a process that would otherwise be a long opaque spinner.

  • Shared State — the channel that makes the delegation log live.
  • State streaming — stream individual sub-agent outputs token-by-token inside each log entry.