Source code for ragflow_async_sdk.apis.agent

# Copyright 2026 Oliver
# Licensed under the Apache License, Version 2.0
# See LICENSE file for details.

from typing import Optional, Any, AsyncGenerator

from .mixins import SessionMixin
from ..apis.base import BaseAPI
from ..exceptions import RAGFlowValidationError
from ..exceptions.api import RAGFlowAPIError
from ..models.agent import Agent, AgentCompletionResult
from ..models.session import AgentSession
from ..types import OrderBy
from ..utils.entity_helpers import get_single_or_raise
from ..utils.validators import require_params, resolve_unique_field


[docs] class AgentAPI(SessionMixin[AgentSession], BaseAPI): """API for managing agents and their sessions.""" _parent_type = "agents" _session_model = AgentSession
[docs] async def create_agent( self, title: str, dsl: dict, *, description: Optional[str] = None, ) -> None: """ Create a new agent. The create_agent method returns None because the server does not provide the created object. Use get_agent to retrieve it if needed. Args: title: Agent title. dsl: Agent DSL configuration (graph, components, retrieval, etc.). description: Optional description of the agent. Returns: None. Raises: RAGFlowAPIError: If creation fails. RAGFlowResponseError: If agent cannot be retrieved after creation. """ require_params(title=title, dsl=dsl) payload = { "title": title, "description": description, "dsl": dsl, } payload = self._normalize_request(payload) resp = await self._client.post("/agents", json=payload) resp = self._handle_response(resp) if not resp.get("data"): raise RAGFlowAPIError("Create agent failed") return None
[docs] async def list_agents( self, *, page: int = 1, page_size: int = 30, orderby: OrderBy | str = OrderBy.CREATE_TIME, desc: bool = True, agent_id: Optional[str] = None, title: Optional[str] = None, ) -> tuple[list[Agent], int]: """ List agents with optional filters. Args: page: Page number. page_size: Number of items per page. orderby: Field to sort by. desc: Sort descending if True. agent_id: Optional filter by agent ID. title: Optional filter by agent title. Returns: Tuple containing a list of Agent instances and total count. """ params = { "page": page, "page_size": page_size, "orderby": orderby, "desc": desc, "id": agent_id, "title": title, } params = self._normalize_request(params) resp = await self._client.get("/agents", params=params) resp = self._handle_response(resp) data = resp.get("data", []) agents = [Agent.from_raw(item) for item in data] return agents, len(agents)
[docs] async def get_agent( self, *, agent_id: Optional[str] = None, title: Optional[str] = None, ) -> Optional[Agent]: """ Retrieve a single agent by ID or title. Exactly one of `agent_id` or `title` must be provided. Args: agent_id: Optional agent ID. title: Optional agent title. Returns: Agent instance Raises: RAGFlowValidationError: If both or neither of `agent_id` and `title` are provided. RAGFlowConflictError: If multiple agents match the query. """ param_name, param_value = resolve_unique_field(agent_id=agent_id, title=title) agents, _ = await self.list_agents( page=1, page_size=2, agent_id=agent_id, title=title ) return get_single_or_raise( items=agents, key_name=param_name, key_value=param_value, entity_name="Agent", )
[docs] async def update_agent( self, agent_id: str, *, title: Optional[str] = None, description: Optional[str] = None, dsl: Optional[dict[str, Any]] = None, ) -> None: """ Update an existing agent. Args: agent_id: Agent ID (required). title: Optional new title for the agent. description: Optional new description. dsl: Optional updated canvas DSL object. Raises: ValueError: If no fields are provided for update. """ require_params(agent_id=agent_id) payload = { "title": title, "description": description, "dsl": dsl, } payload = self._normalize_request(payload) if not payload: raise RAGFlowValidationError("No fields provided to update.") url = f"/agents/{agent_id}" resp = await self._client.put(url, json=payload) self._handle_response(resp, require_data=False)
[docs] async def delete_agent(self, agent_id: str) -> bool: """ Delete an agent by ID. Args: agent_id: Agent ID. Returns: True if deletion succeeded, False otherwise. """ require_params(agent_id=agent_id) resp = await self._client.delete(f"/agents/{agent_id}") resp = self._handle_response(resp) return bool(resp.get("data", False))
[docs] async def create_session( self, agent_id: str, *, name: str = "New session", user_id: Optional[str] = None, ) -> AgentSession: """ Create a new session under an agent. Args: agent_id: Agent ID. name: Optional session name. user_id: Optional user ID. Returns: AgentSession instance. """ return await super().create_session(parent_id=agent_id, name=name, user_id=user_id)
[docs] async def list_sessions( self, agent_id: str, *, page: int = 1, page_size: int = 30, orderby: OrderBy | str = OrderBy.CREATE_TIME, desc: bool = True, name: Optional[str] = None, session_id: Optional[str] = None, user_id: Optional[str] = None, ) -> tuple[list[AgentSession], int]: """ List sessions for an agent. Args: agent_id: Agent ID. page: Page number. page_size: Number of items per page. orderby: Field to sort by. desc: Sort descending if True. name: Optional session name filter. session_id: Optional session ID filter. user_id: Optional user ID filter. Returns: Tuple containing a list of AgentSession instances and total count. """ return await super().list_sessions( parent_id=agent_id, page=page, page_size=page_size, orderby=orderby, desc=desc, name=name, session_id=session_id, user_id=user_id, )
[docs] async def get_agent_session( self, agent_id: str, *, session_id: Optional[str] = None, name: Optional[str] = None, ) -> Optional[AgentSession]: return await self.get_session( parent_id=agent_id, session_id=session_id, name=name, )
[docs] async def update_session( self, agent_id: str, session_id: str, *, name: Optional[str] = None, user_id: Optional[str] = None, ) -> None: """ Update an agent session. Args: agent_id: Agent ID. session_id: Session ID. name: Optional new session name. user_id: Optional new user ID. """ await super().update_session(parent_id=agent_id, session_id=session_id, name=name, user_id=user_id)
[docs] async def delete_sessions( self, agent_id: str, session_ids: Optional[str | list[str]] = None, ) -> None: """ Delete one or more agent sessions. Args: agent_id: Agent ID. session_ids: Single or list of session IDs. If None, delete all sessions. """ await super().delete_sessions(parent_id=agent_id, session_ids=session_ids)
[docs] async def ask( self, agent_id: str, session_id: str, prompt: str, *, stream: bool = False, **kwargs, ) -> AgentCompletionResult | AsyncGenerator[AgentCompletionResult, None]: """ Ask a question in an agent session. Args: agent_id: Agent ID. session_id: Session ID. prompt: User question. stream: Whether to return streaming results (default False). **kwargs: Additional parameters such as temperature, top_p, etc. Returns: ``AgentCompletionResult`` instance if ``stream=False``, or an async generator yielding ``AgentCompletionResult`` items if ``stream=True``. Raises: RAGFlowValidationError: If required parameters are missing. RAGFlowAPIError: If API request fails. Examples: :: # Non-streaming result = await client.agents.ask( agent_id="agent_123", session_id="sess_456", prompt="Hello AI" ) print(result.text) # Streaming async for chunk in client.agents.ask( agent_id="agent_123", session_id="sess_456", prompt="Hello AI", stream=True ): print(chunk.text) """ return await super().ask( parent_id=agent_id, session_id=session_id, prompt=prompt, session_type="agents", stream=stream, **kwargs, )