Поделиться через


Руководство по миграции AutoGen to Microsoft Agent Framework

Полное руководство по миграции из AutoGen в пакет SDK для Python для Microsoft Agent Framework.

Оглавление

Предыстория

AutoGen — это платформа для создания агентов ИИ и многоагентных систем с использованием больших языковых моделей (LLM). Он начал в качестве исследовательского проекта в Microsoft Research и начал несколько концепций в оркестрации с несколькими агентами, таких как GroupChat и среда выполнения агента на основе событий. Проект был плодотворным сотрудничеством сообщества с открытым исходным кодом, и многие важные функции пришли от внешних участников.

Microsoft Agent Framework — это новый многоязычный пакет SDK для создания агентов ИИ и рабочих процессов с помощью LLM. Он представляет собой значительную эволюцию идей, пионеров в AutoGen и включает уроки, извлеченные из реального использования. Она разработана основными командами автогена и семантического ядра в Корпорации Майкрософт и предназначена для создания приложений искусственного интеллекта.

В этом руководстве описывается практический путь миграции: он начинается с покрытия того, что остается неизменным и что изменится на первый взгляд. Затем он охватывает настройку клиента модели, функции одного агента и, наконец, оркестрацию нескольких агентов с конкретным кодом параллельно. По пути ссылки на выполняемые примеры в репозитории Agent Framework помогают проверить каждый шаг.

Основные сходства и различия

Что остается неизменным

Основы знакомы. Агенты по-прежнему создаются вокруг клиента модели, предоставляют инструкции и присоединяют средства. Обе библиотеки поддерживают средства стиля функций, потоковую передачу маркеров, мультимодальное содержимое и асинхронный ввод-вывод.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Основные различия

  1. Стиль оркестрации: AutoGen связывает ядро, управляемое событиями, с высоким уровнем Team. Агент Framework сосредоточен на типизированной графовой основе Workflow , которая направляет данные по краям и активирует исполнителей при готовности входных данных.

  2. Средства: функции автогена упаковываются с FunctionToolпомощью . Agent Framework использует @ai_functionсхемы и добавляет размещенные средства, такие как интерпретатор кода и веб-поиск.

  3. Поведение агента: AssistantAgent однократно, если вы не увеличиваете max_tool_iterations. ChatAgent по умолчанию выполняется многоэтапный вызов и продолжает вызывать средства, пока он не сможет вернуть окончательный ответ.

  4. Среда выполнения: AutoGen предлагает внедренные и экспериментальные распределенные среды выполнения. Агентная платформа ориентирована на однопроцессную композицию сегодня; Планируется распределенное выполнение.

Создание и настройка клиента модели

Обе платформы предоставляют клиентам модели для крупных поставщиков ИИ аналогичные, но не идентичные API.

Функция AutoGen Платформа агента
Клиент OpenAI OpenAIChatCompletionClient OpenAIChatClient
Клиент ответов OpenAI ❌ Недоступно OpenAIResponsesClient
Azure OpenAI AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Ответы Azure OpenAI ❌ Недоступно AzureOpenAIResponsesClient
Azure AI AzureAIChatCompletionClient AzureAIAgentClient
Антропик AnthropicChatCompletionClient 🚧 Плановый
Ollama OllamaChatCompletionClient 🚧 Плановый
Кэширование ChatCompletionCache обертка 🚧 Плановый

Клиенты модели AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Агент Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Подробные примеры см. в следующих примерах:

Поддержка API ответов (agent Framework Exclusive)

AzureOpenAIResponsesClient Платформа агента и OpenAIResponsesClient предоставляет специализированную поддержку для моделей причин и структурированных ответов, недоступных в AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Примеры API ответов см. в следующих примерах:

Сопоставление функций Single-Agent

Этот раздел сопоставляет функции с одним агентом между AutoGen и Agent Framework. На месте клиента создайте агент, подключите средства и выберите между выполнением потоковой передачи и потоковой передачи.

Базовое создание и выполнение агента

После настройки клиента модели следующий шаг — создание агентов. Обе платформы предоставляют аналогичные абстракции агента, но с различными параметрами поведения по умолчанию и параметрами конфигурации.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Ключевые отличия:

  • Поведение по умолчанию: ChatAgent автоматически выполняет итерацию с помощью вызовов инструментов, в то время как AssistantAgent требуется явный max_tool_iterations параметр
  • Конфигурация среды выполнения: ChatAgent.run() принимает tools и tool_choice параметры для настройки каждого вызова
  • Методы фабрики: Agent Framework предоставляет удобные методы фабрики непосредственно от клиентов чата
  • Управление состоянием: ChatAgent является бессерверным и не поддерживает журнал бесед между вызовами, в отличие от AssistantAgent того, что поддерживает журнал бесед в рамках его состояния

Управление состоянием беседы с помощью AgentThread

Чтобы продолжить беседы, ChatAgentиспользуйте AgentThread для управления журналом бесед:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

По умолчанию без отслеживания состояния: быстрая демонстрация

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Примеры управления потоками см. в следующих примерах:

Эквивалентность агента OpenAI Assistant

Обе платформы обеспечивают интеграцию API Помощника OpenAI:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Примеры Помощника OpenAI см. в следующих примерах:

Поддержка потоковой передачи

Обе платформы используют маркеры потоковой передачи в режиме реального времени (от клиентов и агентов), чтобы обеспечить реагирование пользовательских интерфейсов.

Автоматическая потоковая передача

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Потоковая передача agent Framework

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Совет. В агентной платформе клиенты и агенты дают одну и ту же форму обновления; Вы можете прочитать chunk.text в любом случае.

Типы сообщений и создание

Понимание работы сообщений имеет решающее значение для эффективного взаимодействия с агентом. Обе платформы предоставляют различные подходы к созданию и обработке сообщений с помощью AutoGen с помощью отдельных классов сообщений и агентной платформы с помощью единой системы сообщений.

Типы сообщений AutoGen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Типы сообщений agent Framework

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Ключевые отличия:

  • AutoGen использует отдельные классы сообщений (TextMessage,MultiModalMessage) с полем source
  • Agent Framework использует унифицированный ChatMessage с типизированными объектами содержимого и полем role
  • Сообщения Agent Framework используют Role перечисление (USER, ASSISTANT, SYSTEM, TOOL) вместо строковых источников

Создание и интеграция инструментов

Средства расширяют возможности агента за пределами создания текста. Платформы принимают различные подходы к созданию инструментов с помощью Agent Framework, обеспечивая более автоматизированное создание схемы.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Платформа агента @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Подробные примеры см. в следующих примерах:

Размещенные инструменты (agent Framework Exclusive)

Agent Framework предоставляет размещенные средства, недоступные в AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Подробные примеры см. в следующих примерах:

Требования и предостережения:

  • Размещенные средства доступны только в моделях или учетных записях, поддерживающих их. Перед включением этих средств проверьте права и поддержку модели для поставщика.
  • Конфигурация отличается от поставщика; следуйте предварительным требованиям в каждом примере для установки и разрешений.
  • Не каждая модель поддерживает все размещенные средства (например, веб-поиск и интерпретатор кода). Выберите совместимую модель в вашей среде.

Замечание

AutoGen поддерживает локальные средства выполнения кода, но эта функция планируется для будущих версий Agent Framework.

Ключевое различие. Платформа агента автоматически обрабатывает итерацию средства на уровне агента. В отличие от параметра AutoGen max_tool_iterations агенты Agent Framework продолжают выполнение средства до завершения по умолчанию с встроенными механизмами безопасности, чтобы предотвратить бесконечные циклы.

Поддержка сервера MCP

Для расширенной интеграции инструментов обе платформы поддерживают протокол контекста модели (MCP), позволяя агентам взаимодействовать с внешними службами и источниками данных. Agent Framework обеспечивает более полную встроенную поддержку.

Поддержка AutoGen MCP

AutoGen поддерживает базовую поддержку MCP через расширения (конкретные сведения о реализации зависят от версии).

Поддержка MCP agent Framework

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Примеры MCP см. в следующих примерах:

Шаблон агента как инструмента

Одним из мощных шаблонов является использование агентов в качестве инструментов, включение архитектур иерархических агентов. Обе платформы поддерживают этот шаблон с различными реализациями.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

Agent Framework as_tool()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Явное примечание о миграции. В AutoGen установите parallel_tool_calls=False на клиенте модели координатора при оболочке агентов в качестве инструментов, чтобы избежать проблем с параллелизмом при вызове одного экземпляра агента. В Agent Framework as_tool() не требуется отключение параллельных вызовов инструментов, так как агенты по умолчанию являются без отслеживания состояния.

ПО промежуточного слоя (компонент Agent Framework)

Agent Framework предоставляет возможности ПО промежуточного слоя, которые не хватает AutoGen. ПО промежуточного слоя обеспечивает мощные перекрестные проблемы, такие как ведение журнала, безопасность и мониторинг производительности.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Преимущества:

  • Безопасность: проверка входных данных и фильтрация содержимого
  • Наблюдаемость: ведение журнала, метрики и трассировка
  • Производительность: кэширование и ограничение скорости
  • Обработка ошибок: исправление и логика повторных попыток

Подробные примеры по промежуточного слоя см. в следующих примерах:

Настраиваемые агенты

Иногда вы не хотите, чтобы агент, поддерживаемый моделью, вообще не требуется детерминированный агент или агент с поддержкой API с пользовательской логикой. Обе платформы поддерживают создание пользовательских агентов, но шаблоны отличаются.

AutoGen: Subclass BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Реализуйте и возвращайте on_messages(...)Response сообщение чата.
  • При необходимости реализуется для on_reset(...) очистки внутреннего состояния между выполнением.

Agent Framework: Расширение BaseAgent (с поддержкой потоков)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThread поддерживает состояние беседы вне страны; используйте agent.get_new_thread() и передайте его run/run_streamв .
  • Вызовите self._notify_thread_of_new_messages(thread, input_messages, response_messages) так, чтобы поток имел обе стороны обмена.
  • См. полный пример: Пользовательский агент

Далее рассмотрим оркестрацию с несколькими агентами — область, в которой платформы отличаются больше всего.

Сопоставление компонентов с несколькими агентами

Обзор модели программирования

Модели программирования с несколькими агентами представляют собой наиболее значительную разницу между двумя платформами.

Подход к двойной модели AutoGen

AutoGen предоставляет две модели программирования:

  1. autogen-core: низкоуровневое программирование на основе событий и RoutedAgent подписок сообщений
  2. Team абстракция: высокоуровневая, ориентированная на выполнение модель, созданная на основе autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Проблемы:

  • Модель низкого уровня слишком сложна для большинства пользователей
  • Высокоуровневая модель может стать ограничением для сложных поведении
  • Связывание между двумя моделями повышает сложность реализации

Модель единого рабочего процесса Agent Framework

Agent Framework предоставляет единую Workflow абстракцию, которая объединяет лучшие из обоих подходов:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Подробные примеры рабочих процессов см. в следующих примерах:

Преимущества:

  • Единая модель: единая абстракция для всех уровней сложности
  • Безопасность типов: строго типизированные входные и выходные данные
  • Визуализация графа: очистка представления потока данных
  • Гибкая композиция: сочетание агентов, функций и вложенных рабочих процессов

Рабочий процесс и GraphFlow

Абстракция Agent Framework Workflow вдохновляется экспериментальной GraphFlow функцией AutoGen, но представляет собой значительную эволюцию в философии проектирования:

  • GraphFlow: поток управления, на основе которого ребра являются переходами и сообщениями передаются всем агентам; Переходы зависят от содержимого широковещательного сообщения
  • Рабочий процесс: поток данных, основанный на маршрутизации сообщений через определенные края и исполнителей, активируется ребрами с поддержкой параллельного выполнения.

Визуальный обзор

На приведенной ниже схеме сравнивается поток управления GraphFlow автогена (слева) с рабочим процессом потока данных Agent Framework (справа). Агенты GraphFlow моделируют как узлы с условными переходами и широковещательными трансляциями. Исполнители моделей рабочих процессов (агенты, функции или вложенные рабочие процессы), подключенные по типизированным краям; он также поддерживает приостановки запросов и ответов и контрольные точки.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

На практике:

  • GraphFlow использует агенты в качестве узлов и передает сообщения; края представляют условные переходы.
  • Рабочие процессы маршрутизирует типизированные сообщения по краям. Узлы (исполнителя) могут быть агентами, чистыми функциями или вложенными рабочими процессами.
  • Запрос или ответ позволяет рабочему процессу приостановить внешние входные данные; Контрольная точка сохраняет ход выполнения и включает возобновление работы.

Сравнение кода

1) Последовательный + условный
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Вентилятор + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Целевая маршрутизация (без трансляции)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

Что заметить:

  • GraphFlow передает сообщения и использует условные переходы. Поведение соединения настраивается с помощью целевой activation стороны и каждого края activation_group/activation_condition (например, группировать обе границы join_d в ).activation_condition="any"
  • Рабочий процесс направляет данные явным образом; используется target_id для выбора подчиненных исполнителей. Поведение соединения живет в принимающем исполнителях (например, при первом входе и ожидании всех) или с помощью построителей оркестрации или агрегататоров.
  • Исполнителям в рабочем процессе является бесплатная форма: оболочка ChatAgent, функция или вложенный рабочий процесс и их сочетание в одном графе.

Основные различия

В таблице ниже приведены основные различия между рабочим процессом GraphFlow и Agent Framework AutoGen:

Аспект AutoGen GraphFlow Рабочий процесс Agent Framework
Тип потока Поток управления (ребра — переходы) Поток данных (сообщения маршрутизации ребер)
Типы узлов Только агенты Агенты, функции, вложенные рабочие процессы
Активация Широковещательная передача сообщений Активация на основе пограничных вычислений
Безопасность типов Ограничено Строгая типизация на протяжении всего
Возможность создания Ограничено Высоко компонуемые

Вложенные шаблоны

Вложение команды AutoGen

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Характеристики вложения автогена:

  • Вложенная команда получает все сообщения из внешней команды
  • Вложенные сообщения команды передаются всем внешним участникам команды
  • Контекст общего сообщения на всех уровнях

Вложенный рабочий процесс Agent Framework

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Характеристики вложения агентной платформы:

  • Изолированные входные и выходные данные через WorkflowExecutor
  • Нет трансляции сообщений — данные передаются через определенные подключения
  • Независимое управление состояниями для каждого уровня рабочего процесса

Шаблоны группового чата

Шаблоны группового чата позволяют нескольким агентам совместно работать над сложными задачами. Вот как общие шаблоны преобразуют между платформами.

Шаблон RoundRobinGroupChat

Реализация AutoGen:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Реализация agent Framework:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Подробные примеры оркестрации см. в следующих примерах:

Для параллельных шаблонов выполнения агент Framework также предоставляет следующее:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Примеры параллельного выполнения см. в следующих примерах:

Шаблон MagenticOneGroupChat

Реализация AutoGen:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Реализация agent Framework:

from agent_framework import (
    MagenticBuilder, MagenticCallbackMode, WorkflowOutputEvent,
    MagenticCallbackEvent, MagenticOrchestratorMessageEvent, MagenticAgentDeltaEvent
)

# Assume we have researcher, coder, and coordinator_client from previous examples
async def on_event(event: MagenticCallbackEvent) -> None:
    if isinstance(event, MagenticOrchestratorMessageEvent):
        print(f"[ORCHESTRATOR]: {event.message.text}")
    elif isinstance(event, MagenticAgentDeltaEvent):
        print(f"[{event.agent_id}]: {event.text}", end="")

workflow = (MagenticBuilder()
           .participants(researcher=researcher, coder=coder)
           .on_event(on_event, mode=MagenticCallbackMode.STREAMING)
           .with_standard_manager(
               chat_client=coordinator_client,
               max_round_count=20,
               max_stall_count=3,
               max_reset_count=2
           )
           .build())

# Example usage (would be in async context)
async def magentic_example():
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, WorkflowOutputEvent):
            final_result = event.data

Параметры настройки agent Framework:

Рабочий процесс Magentic предоставляет широкие параметры настройки:

  • Конфигурация диспетчера: пользовательские модели оркестратора и запросы
  • Круглые ограничения: max_round_count, max_stall_countmax_reset_count
  • Обратные вызовы событий: потоковая передача в режиме реального времени с детализацией фильтрации событий
  • Специализация агента: пользовательские инструкции и инструменты для каждого агента
  • Режимы обратного вызова: STREAMING для обновлений в режиме реального времени или BATCH для окончательных результатов
  • Планирование "человек в цикле": пользовательские функции планировщика для интерактивных рабочих процессов
# Advanced customization example with human-in-the-loop
from agent_framework.openai import OpenAIChatClient
from agent_framework import MagenticBuilder, MagenticCallbackMode, MagenticPlannerContext

# Assume we have researcher_agent, coder_agent, analyst_agent, detailed_event_handler
# and get_human_input function defined elsewhere

async def custom_planner(context: MagenticPlannerContext) -> str:
    """Custom planner with human input for critical decisions."""
    if context.round_count > 5:
        # Request human input for complex decisions
        return await get_human_input(f"Next action for: {context.current_state}")
    return "Continue with automated planning"

workflow = (MagenticBuilder()
           .participants(
               researcher=researcher_agent,
               coder=coder_agent,
               analyst=analyst_agent
           )
           .with_standard_manager(
               chat_client=OpenAIChatClient(model_id="gpt-5"),
               max_round_count=15,      # Limit total rounds
               max_stall_count=2,       # Prevent infinite loops
               max_reset_count=1,       # Allow one reset on failure
               orchestrator_prompt="Custom orchestration instructions..."
           )
           .with_planner(custom_planner)  # Human-in-the-loop planning
           .on_event(detailed_event_handler, mode=MagenticCallbackMode.STREAMING)
           .build())

Подробные примеры magentic см. в следующих примерах:

Будущие шаблоны

Стратегия разработки Agent Framework включает несколько шаблонов AutoGen в настоящее время:

  • Шаблон Swarm: координация агента на основе рук
  • SelectorGroupChat: выбор динамиков на основе LLM

"Человек в цикле" с ответом на запрос

Ключевой новой функцией в Agent Framework Workflow является концепция запроса и ответа, которая позволяет рабочим процессам приостановить выполнение и ждать внешних входных данных перед продолжением. Эта возможность отсутствует в абстракции AutoGen Team и включает сложные шаблоны для человека в цикле.

Ограничения автогена

Абстракция AutoGen Team выполняется непрерывно после запуска и не предоставляет встроенные механизмы приостановки выполнения для ввода данных человека. Для всех функциональных возможностей цикла в цикле требуются пользовательские реализации за пределами платформы.

Agent Framework Request-Response API

Agent Framework предоставляет встроенные возможности ответа на запросы, в которых любой исполнитель может отправлять запросы с помощью ctx.request_info() декоратора и обрабатывать ответы.@response_handler

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Выполнение рабочих процессов "человек в цикле"

Agent Framework предоставляет API потоковой передачи для обработки цикла приостановки возобновления:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Примеры рабочих процессов в цикле см. в следующих примерах:

Контрольные точки и возобновление рабочих процессов

Еще одним ключевым преимуществом абстракции Агента Framework WorkflowTeam является встроенная поддержка контрольных точек и возобновления выполнения. Это позволяет приостановить, сохранить и возобновить рабочие процессы позже из любой контрольной точки, обеспечивая отказоустойчивость и включение длительных или асинхронных рабочих процессов.

Ограничения автогена

Абстракция AutoGen Team не предоставляет встроенные возможности контрольных точек. Любые механизмы сохраняемости или восстановления должны быть реализованы внешне, часто требуя сложного управления состоянием и логики сериализации.

Контрольная точка агента Framework

Agent Framework обеспечивает комплексную контрольную точку FileCheckpointStorage и with_checkpointing() метод WorkflowBuilder. Контрольные точки:

  • Состояние исполнителя: локальное состояние для каждого исполнителя с помощью ctx.set_executor_state()
  • Общее состояние: состояние кросс-исполнителя с помощью ctx.set_shared_state()
  • Очереди сообщений: ожидающие сообщения между исполнителями
  • Положение рабочего процесса: текущий ход выполнения и дальнейшие действия
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Возобновление с контрольных точек

Agent Framework предоставляет API для перечисления, проверки и возобновления из определенных контрольных точек:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Расширенные функции контрольной точки

Контрольная точка с интеграцией "Человек в цикле":

Контрольная точка работает без проблем с рабочими процессами в цикле, что позволяет приостановить рабочие процессы для ввода данных человека и возобновить их позже. При возобновлении с контрольной точки, содержащей ожидающие запросы, эти запросы будут повторно выдаваться в виде событий:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Ключевые преимущества

По сравнению с AutoGen контрольные точки Agent Framework предоставляют:

  • Автоматическая сохраняемость. Управление состоянием вручную не требуется
  • Детализированное восстановление: возобновление из любой границы суперстепа
  • Изоляция состояния: отдельное локальное и общее состояние исполнителя
  • Интеграция с человеком в цикле: простая приостановка возобновления с помощью человеческого ввода
  • Отказоустойчивость: надежное восстановление от сбоев или прерываний

Практические примеры

Полные примеры контрольных точек см. в следующих примерах:


Observability

AutoGen и Agent Framework предоставляют возможности наблюдаемости, но с различными подходами и функциями.

Функция автогена наблюдения

AutoGen имеет встроенную поддержку OpenTelemetry с инструментированием для:

  • Трассировка среды выполнения: SingleThreadedAgentRuntime и GrpcWorkerAgentRuntime
  • Выполнение инструмента: BaseTool с execute_tool диапазонами следующих семантических соглашений GenAI
  • Операции агента: BaseChatAgent с create_agentinvoke_agent и диапазонами
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Наблюдаемость платформы агента

Agent Framework обеспечивает комплексную наблюдаемость с помощью нескольких подходов:

  • Настройка нулевого кода: автоматическая инструментирование с помощью переменных среды
  • Настройка вручную: программная настройка с пользовательскими параметрами
  • Расширенные данные телеметрии: агенты, рабочие процессы и отслеживание выполнения инструментов
  • Выходные данные консоли: встроенное ведение журнала и визуализация консоли
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Ключевые отличия:

  • Сложность установки: Agent Framework предлагает более простые параметры настройки нулевого кода
  • Область действия. Платформа агента обеспечивает более широкий охват, включая наблюдаемость на уровне рабочего процесса
  • Визуализация: Agent Framework включает встроенные выходные данные консоли и пользовательский интерфейс разработки
  • Конфигурация: Agent Framework предлагает более гибкие параметры конфигурации

Подробные примеры наблюдаемости см. в следующих примерах:


Conclusion

Это руководство по миграции обеспечивает комплексное сопоставление между AutoGen и Microsoft Agent Framework, охватывая все, от базового создания агента до сложных рабочих процессов с несколькими агентами. Ключевые выносы для миграции:

  • Миграция с одним агентом проста, с аналогичными API и расширенными возможностями в Agent Framework
  • Для многоагентных шаблонов требуется переосмыслить подход от событий на основе архитектур на основе потока данных, но если вы уже знакомы с GraphFlow, переход будет проще.
  • Agent Framework предлагает дополнительные функции, такие как ПО промежуточного слоя, размещенные инструменты и типизированные рабочие процессы

Дополнительные примеры и подробные рекомендации по реализации см. в каталоге примеров Agent Framework .

Дополнительные примеры категорий

Платформа агента предоставляет примеры в нескольких других важных областях:

Дальнейшие шаги