Skip to content

ReAct Agent

The ReActAgent is the most powerful agent type in the framework, implementing the Reasoning and Acting loop.

How it Works

The ReAct agent operates in a loop:

  1. Context Construction: It gathers the conversation history and a special System Prompt that defines its operational rules (parallel execution, thinking first, etc.).

  2. Tool Discovery: It looks at the list of available_agents (tools) and converts them into function definitions for the LLM.

  3. Generation: It sends the context to the LLM.

  4. Execution:

    • If the LLM calls the Thinking Tool (parallel to others), the thought is recorded.
    • If the LLM calls other tools, they are executed in parallel (up to a concurrency limit).
  5. Recursion: The results are added back to the context, and the loop repeats until the LLM produces a final answer or the max iterations are reached.

Implementation Details

When you subclass ReActAgent, you typically only need to define: - get_llm(): Which LLM to use. - available_agents(): Which tools (other BaseAgents) this agent can use.

class MyOrchestrator(ReActAgent):
    def get_llm(self, user_id: str):
        return GeminiLLM()

    def available_agents(self, user_id: str):
        return [
            SearchAgent(),
            DatabaseAgent(),
            # ...
        ]

By default, the ReActAgent will use the ThinkingAgent as the thinking tool. Its implementation is streightforward, and is necessary to improve the performance of the reasoning loop. It is possible to override its implementation by subclassing it and providing a custom implementation of the get_thinking_agent() method. For example:

class ResponsiveThinkingAgent(ThinkingAgent):
    async def execute(
        self, user_id: str, context: Context, input_args: ThinkingInput
    ) -> ThoughtResponse:
        context.emit_feedback(input_args.reasoning)
        return await super().execute(user_id, context, input_args)

And then use it in your ReActAgent:

class MyOrchestrator(ReActAgent):
    def get_thinking_agent(self):
        return ResponsiveThinkingAgent()

In this example, the ResponsiveThinkingAgent emits a feedback message to the user, showing the reasoning process in real-time.

API Reference

agentswarm.agents.ReActAgent

Bases: BaseAgent[InputType, OutputType]

Source code in src/agentswarm/agents/react_agent.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class ReActAgent(BaseAgent[InputType, OutputType]):

    def __init__(self, max_iterations: int = 100, max_concurrent_agents: int = 5):
        self.max_iterations = max_iterations
        self.max_concurrent_agents = max_concurrent_agents

    @abstractmethod
    def get_llm(self, user_id: str) -> LLM:
        """
        Returns the LLM to be used by the agent.
        """
        pass

    @abstractmethod
    def prompt(self, user_id: str) -> str:
        """
        Returns the prompt to be used by the agent.
        """
        pass

    def get_thinking_agent(self):
        """
        Returns the thinking agent to be used by the agent.
        """
        return ThinkingAgent()

    def get_default_agents(self) -> List[BaseAgent]:
        """
        Returns the default agents for implementing the ReActAgent loop.
        """
        return [
            self.get_thinking_agent(),
            GatheringAgent(),
            TransformerAgent(),
            MergeAgent(),
        ]

    @abstractmethod
    def available_agents(self, user_id: str) -> List[BaseAgent]:
        """
        Returns the available agents to be used by the agent.
        """
        pass

    def generate_function_calls(self, user_id: str) -> List[LLMFunction]:
        functions = []
        for agent in self.available_agents(user_id):
            try:
                functions.append(
                    LLMFunction(
                        name=agent.id(),
                        description=agent.description(user_id),
                        parameters=agent.input_parameters(),
                    )
                )
            except Exception as e:
                print(f"Error generating function call for agent {agent.id()}: {e}")
                print(traceback.format_exc())
        return functions

    async def agent_execution(
        self, user_id: str, context: Context, function: LLMFunction
    ):
        agent = next(
            (
                agent
                for agent in self.available_agents(user_id)
                if agent.id() == function.name
            ),
            None,
        )
        if agent is None:
            raise Exception(f"Agent {function.name} not found")

        input_type = agent._get_generic_type(0)
        if input_type and isinstance(function.arguments, dict):
            validated_input = input_type(**function.arguments)

            # Create a new context for the agent to support tracing hierarchy
            new_context = context.copy_for_execution()

            # Trace the agent execution
            context.tracing.trace_agent(new_context, agent.id(), function.arguments)

            try:
                result = await agent.execute(user_id, new_context, validated_input)
                context.tracing.trace_agent_result(new_context, agent.id(), result)
                return result
            except Exception as e:
                context.tracing.trace_agent_error(new_context, agent.id(), e)
                raise e
        raise Exception(f"Invalid arguments for agent {function.name}")

    def generate_messages_context(
        self, user_id: str, context: Context, input: InputType = None
    ) -> List[Message]:
        all: List[Message] = []
        all.append(Message(type="system", content=REACT_SYS_PROMPT))
        all.append(
            Message(
                type="system",
                content=f"Use the tool '{self.get_thinking_agent().id()}' IN PARALLEL to the other tools to explain your plan.",
            )
        )
        all.append(Message(type="system", content=self.prompt(user_id)))
        all.extend(context.messages)
        return all

    async def gather_with_concurrency(self, n, *tasks):
        """
        Runs tasks with a concurrency limit of n.
        """
        semaphore = asyncio.Semaphore(n)

        async def sem_task(task):
            async with semaphore:
                return await task

        return await asyncio.gather(*(sem_task(task) for task in tasks))

    async def execute(
        self, user_id: str, context: Context, input: InputType = None
    ) -> OutputType:

        current_context = self.generate_messages_context(user_id, context, input)
        iteration = 0
        from_prev_iteration = False

        while iteration < self.max_iterations:

            # Create an iteration step ID
            iteration_step_id = f"{context.step_id}_iter_{iteration}"

            # Trace the iteration start
            iter_context = context.copy_for_iteration(
                iteration_step_id, current_context
            )

            context.tracing.trace_loop_step(iter_context, f"Iteration {iteration}")

            tmp_context = current_context
            if from_prev_iteration:
                tmp_context = tmp_context + [
                    Message(type="user", content="Elaborate the results of the agents")
                ]

            response = await self.get_llm(user_id).generate(
                tmp_context,
                functions=self.generate_function_calls(user_id),
                feedback=iter_context.feedback,
            )
            iter_context.add_usage(response.usage)

            if response.function_calls is None or len(response.function_calls) == 0:
                return [Message(type="assistant", content=response.text)]

            has_execution_tool = False
            output = []

            # Prepare tasks for parallel execution
            tasks = []

            for function_call in response.function_calls:
                if function_call.name != self.get_thinking_agent().id():
                    has_execution_tool = True

                # We wrap the execution in a task, capturing the necessary context
                task = self.execute_and_handle_result(
                    user_id, iter_context, function_call, context
                )
                tasks.append(task)

            # Execute all tasks in parallel with concurrency limit
            results = await self.gather_with_concurrency(
                self.max_concurrent_agents, *tasks
            )

            # Flatten results into output list
            for res in results:
                if res:
                    if res.type == "completion":
                        return [res]
                    output.append(res)

            if not has_execution_tool and len(response.text) > 0:
                output.append(Message(type="assistant", content=response.text))
                return output

            current_context = current_context + output
            iteration += 1
            from_prev_iteration = True

        raise Exception("Max iterations reached")

    async def execute_and_handle_result(
        self,
        user_id: str,
        iter_context: Context,
        function_call: LLMFunction,
        context: Context,
    ) -> Message:
        try:
            result = await self.agent_execution(user_id, iter_context, function_call)

            if isinstance(result, Message):
                return result
            elif isinstance(result, ThoughtResponse):
                context.thoughts.append(result.thought)
                return Message(type="assistant", content=f"Thought: {result.thought}")
            elif isinstance(result, VoidResponse) or result is None:
                return Message(
                    type="user",
                    content=f"Agent {function_call.name} executed successfully.",
                )
            elif isinstance(result, KeyStoreResponse):
                return Message(
                    type="user",
                    content=f"Agent {function_call.name} executed and stored {result.description} in the store with key {result.key}.",
                )
            elif isinstance(result, StrResponse):
                return Message(
                    type="user",
                    content=f"Agent {function_call.name} executed and returned {result.value}.",
                )
            elif isinstance(result, CompletionResponse):
                return Message(type="completion", content=result.value)
            else:
                return Message(
                    type="user",
                    content=f"Result of agent {function_call.name} execution: {result}",
                )
        except Exception as e:
            return Message(
                type="user", content=f"Error executing agent {function_call.name}: {e}"
            )

available_agents(user_id) abstractmethod

Returns the available agents to be used by the agent.

Source code in src/agentswarm/agents/react_agent.py
64
65
66
67
68
69
@abstractmethod
def available_agents(self, user_id: str) -> List[BaseAgent]:
    """
    Returns the available agents to be used by the agent.
    """
    pass

gather_with_concurrency(n, *tasks) async

Runs tasks with a concurrency limit of n.

Source code in src/agentswarm/agents/react_agent.py
135
136
137
138
139
140
141
142
143
144
145
async def gather_with_concurrency(self, n, *tasks):
    """
    Runs tasks with a concurrency limit of n.
    """
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))

get_default_agents()

Returns the default agents for implementing the ReActAgent loop.

Source code in src/agentswarm/agents/react_agent.py
53
54
55
56
57
58
59
60
61
62
def get_default_agents(self) -> List[BaseAgent]:
    """
    Returns the default agents for implementing the ReActAgent loop.
    """
    return [
        self.get_thinking_agent(),
        GatheringAgent(),
        TransformerAgent(),
        MergeAgent(),
    ]

get_llm(user_id) abstractmethod

Returns the LLM to be used by the agent.

Source code in src/agentswarm/agents/react_agent.py
33
34
35
36
37
38
@abstractmethod
def get_llm(self, user_id: str) -> LLM:
    """
    Returns the LLM to be used by the agent.
    """
    pass

get_thinking_agent()

Returns the thinking agent to be used by the agent.

Source code in src/agentswarm/agents/react_agent.py
47
48
49
50
51
def get_thinking_agent(self):
    """
    Returns the thinking agent to be used by the agent.
    """
    return ThinkingAgent()

prompt(user_id) abstractmethod

Returns the prompt to be used by the agent.

Source code in src/agentswarm/agents/react_agent.py
40
41
42
43
44
45
@abstractmethod
def prompt(self, user_id: str) -> str:
    """
    Returns the prompt to be used by the agent.
    """
    pass