Other Agents
Agentswarm includes several specialized agents designed for functional composition. These agents are typically used as tools within a ReActAgent loop or composed together in a pipeline.
MapReduce Agent
The MapReduceAgent is a powerful tool for scaling operations. It takes a list of items and an "instructed agent". It applies the instruction to every item in the list in parallel (Map) and then synthesizes the results (Reduce).
Use Case: Summarizing 10 different news articles at once.
agentswarm.agents.MapReduceAgent
Bases: ReActAgent[MapReduceInput, KeyStoreResponse]
Source code in src/agentswarm/agents/map_reduce_agent.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 | class MapReduceAgent(ReActAgent[MapReduceInput, KeyStoreResponse]):
def __init__(self, max_iterations: int = 100, agents: List[BaseAgent] = []):
super().__init__(max_iterations)
self.agents = agents
def get_llm(self, user_id: str) -> LLM:
# TODO: Better LLM consiguration
return GeminiLLM(api_key=os.getenv("GEMINI_API_KEY"))
def id(self) -> str:
return "map-reduce"
def description(self, user_id: str) -> str:
return """USE THIS AGENT FOR:
1. COMPLEX CONTENT GENERATION: Creating multi-part artifacts like software libraries, full books, or extensive reports where each chapter/module needs to be generated in parallel.
2. RECURSIVE TASKS: Exploration of hierarchical structures (directories, org charts) or recursive problem solving.
3. LARGE DATASETS: Processing data that exceeds context limits by splitting (Map) and summarizing (Reduce).
4. PARALLEL EXECUTION: Any task requiring massive parallel execution of sub-tasks.
DO NOT USE FOR: Simple, linear tasks or single-file creation that fit in a single context window.
"""
def prompt(self, user_id: str) -> str:
return """You are a recursive Map-Reduce agent. Your GOAL is to solve the given 'task' completely and return the FINAL RESULT.
ALGORITHM:
1. ANALYZE: Understand the task (e.g., analyze a folder).
2. MAP: Break it down into sub-tasks (e.g., read files in current dir, recursively call 'map-reduce' for sub-directories).
3. EXECUTE: Run these sub-tasks using the available tools. Parallelize where possible.
4. REDUCE: Collect ALL results from the sub-tasks.
5. SYNTHESIZE: Combine them into a single, comprehensive final report.
CRITICAL RULES:
- ANTI-RECURSION: If your assigned task is T, DO NOT call 'map-reduce' with task T again. You must BREAK DOWN task T into smaller sub-tasks (t1, t2...) and delegate THOSE.
- You are the one responsible for executing the atomic actions for T (e.g., gathering data, listing items) before delegating sub-parts.
- Do NOT return "I am working on it". You must work on it UNTIL IT IS DONE.
- Do NOT return partial results unless you have hit a hard limit.
- Your final 'assistant' message MUST contain the complete answer/report.
"""
def available_agents(self, user_id: str) -> List[BaseAgent]:
# IMPORTANT: Return a NEW instance of MapReduceAgent instead of self.
return [MapReduceAgent(max_iterations=self.max_iterations, agents=self.agents)] + self.agents
def generate_messages_context(self, user_id: str, context: Context, input: MapReduceInput = None) -> List[Message]:
msgs = super().generate_messages_context(user_id, context, input)
msgs.append(Message(type="user", content=f"CURRENT TASK: {input.task}\n\nWARNING: Do not call 'map-reduce' with this exact same task '{input.task}' again, as it would cause an infinite loop. decompose it into smaller sub-tasks."))
return msgs
|
Gathering Agent
The GatheringAgent is designed to extract information present in the Context. Unlike other agents that might search externally or generate new content, this agent focuses on reading the message history and extracting specific structured data.
Use Case: Extracting a user's name and email from a long conversation history.
agentswarm.agents.GatheringAgent
Bases: BaseAgent[GatheringAgentInput, StrResponse]
Source code in src/agentswarm/agents/gathering_agent.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | class GatheringAgent(BaseAgent[GatheringAgentInput, StrResponse]):
def id(self) -> str:
return "gathering-agent"
def description(self, user_id: str) -> str:
return """
I'm able to gather an information from the store and add to the current context.
USE THIS AGENT ONLY WHEN YOU NEED TO DISPLAY THE INFORMATION TO THE USER.
Whenever possible, if you need to filter or process the data, use the "transformer-agent".
"""
async def execute(
self, user_id: str, context: Context, input: GatheringAgentInput
) -> StrResponse:
if not context.store.has(input.key):
raise Exception(
f"Information from the store with key {input.key} not found"
)
value = context.store.get(input.key)
return StrResponse(value=value)
|
Merge Agent
The MergeAgent is a simple utility to combine multiple text inputs or results into a single coherent response.
Use Case: Combining the results of three different search queries into a single paragraph.
agentswarm.agents.MergeAgent
Bases: BaseAgent[MergeAgentInput, KeyStoreResponse]
Source code in src/agentswarm/agents/merge_agent.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | class MergeAgent(BaseAgent[MergeAgentInput, KeyStoreResponse]):
def id(self) -> str:
return "merge-agent"
def description(self, user_id: str) -> str:
return "I'm able to merge different informations into a single one."
async def execute(
self, user_id: str, context: Context, input: MergeAgentInput
) -> KeyStoreResponse:
not_found_keys = [key for key in input.keys if not context.store.has(key)]
if not_found_keys:
raise Exception(f"Keys {not_found_keys} not found in the store")
values = [context.store.get(key) for key in input.keys]
value = "\n".join(values)
key = f"merged_{uuid.uuid4()}"
context.store.set(key, value)
return KeyStoreResponse(
key=key, description=f"Merged information from keys {input.keys}"
)
|
The TransformerAgent transforms data from one format to another (e.g., JSON to CSV, or text to structured data) based on natural language instructions.
Use Case: Converting a messy text list into a clean JSON array.
Bases: BaseAgent[TransformerAgentInput, KeyStoreResponse]
Source code in src/agentswarm/agents/transformer_agent.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 | class TransformerAgent(BaseAgent[TransformerAgentInput, KeyStoreResponse]):
def id(self) -> str:
return "transformer-agent"
def description(self, user_id: str) -> str:
return """
I'm able to transform an information from the store into a new information in the store.
I can be used to apply complex llm-based task to the stored data, in order to optimize the general context.
"""
async def execute(
self, user_id: str, context: Context, input: TransformerAgentInput
) -> KeyStoreResponse:
if not context.store.has(input.key):
raise ValueError(f"Key {input.key} not found in store")
value = context.store.get(input.key)
all = [Message(type="user", content=f"{value}")]
all.append(
Message(
type="user",
content=f"Filter the previous data using this command:\n{input.cmd}\n. The ouput should be a new data, not a prompt or a python code. If not specified, you can optimize the output for your internal use.",
)
)
llm = context.default_llm
if llm is None:
raise ValueError("Default LLM not set")
response = await llm.generate(all)
context.add_usage(response.usage)
new_key = f"transformer_{uuid.uuid4()}"
context.store.set(new_key, response.text)
return KeyStoreResponse(
key=new_key,
description=f"Transformed information from key {input.key} with command {input.cmd}",
)
|
Thinking Agent
The ThinkingAgent is unique. It is not usually meant to be called by the user directly. Instead, it is injected into the ReActAgent loop to allow the LLM to "think out loud" before taking actions.
It enables the Chain of Thought prompting technique within the agent execution flow.
agentswarm.agents.ThinkingAgent
Bases: BaseAgent[ThinkingInput, ThoughtResponse]
Source code in src/agentswarm/agents/thinking_agent.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | class ThinkingAgent(BaseAgent[ThinkingInput, ThoughtResponse]):
def id(self) -> str:
return "thinking_tool"
def description(self, user_id: str) -> str:
return (
"Use this tool to plan your actions. "
"You MUST call this tool AND ALL other necessary action tools IN THE SAME TURN. "
"Do not wait for the next turn to act. "
"Plan for parallel execution where possible."
)
async def execute(self, user_id: str, context: Context, input_args: ThinkingInput) -> ThoughtResponse:
part = ""
if input_args.self_correction:
part += f" Self-correction: {input_args.self_correction}"
return ThoughtResponse(thought=f"{input_args.reasoning}.{part}")
|