
{
“type”: “function”,
“function”: {
“name”: “task_update”,
“description”: “Update the status of a task. Use ‘in_progress’ when starting, ‘completed’ when done.”,
“parameters”: {
“type”: “object”,
“properties”: {
“task_id”: {“type”: “string”, “description”: “The task ID”},
“status”: {“type”: “string”, “enum”: [“in_progress”, “completed”, “failed”]},
“result”: {“type”: “string”, “description”: “Result or output of the task”},
},
“required”: [“task_id”, “status”],
},
},
},
{
“type”: “function”,
“function”: {
“name”: “inbox_send”,
“description”: “Send a message to another agent (e.g., ‘leader’ or a worker name).”,
“parameters”: {
“type”: “object”,
“properties”: {
“to”: {“type”: “string”, “description”: “Recipient agent name”},
“message”: {“type”: “string”, “description”: “Message content”},
},
“required”: [“to”, “message”],
},
},
},
{
“type”: “function”,
“function”: {
“name”: “inbox_receive”,
“description”: “Check and consume all messages in your inbox.”,
“parameters”: {
“type”: “object”,
“properties”: {},
},
},
},
{
“type”: “function”,
“function”: {
“name”: “task_list”,
“description”: “List tasks assigned to you or all team tasks.”,
“parameters”: {
“type”: “object”,
“properties”: {
“owner”: {“type”: “string”, “description”: “Filter by owner name (optional)”},
},
},
},
},
]
class SwarmAgent:
def __init__(
self,
name: str,
role: str,
system_prompt: str,
task_board: TaskBoard,
inbox: InboxSystem,
registry: TeamRegistry,
):
self.name = name
self.role = role
self.system_prompt = system_prompt
self.task_board = task_board
self.inbox = inbox
self.registry = registry
self.conversation_history: list[dict] = []
self.inbox.register(name)
self.registry.register(name, role)
def _build_system_prompt(self) -> str:
coord_protocol = f”””
## Coordination Protocol (auto-injected — you are agent ‘{self.name}’)
You are part of an AI agent swarm. Your role: {self.role}
Your name: {self.name}
Available tools (equivalent to ClawTeam CLI):
– task_list: Check your assigned tasks (like `clawteam task list`)
– task_update: Update task status to in_progress/completed/failed (like `clawteam task update`)
– inbox_send: Send messages to other agents (like `clawteam inbox send`)
– inbox_receive: Check your inbox for messages (like `clawteam inbox receive`)
WORKFLOW:
1. Check your tasks with task_list
2. Mark a task as in_progress when you start
3. Do the work (think, analyze, produce output)
4. Mark the task as completed with your result
5. Send a summary message to ‘leader’ when done
“””
return self.system_prompt + “\n” + coord_protocol
def _handle_tool_call(self, tool_name: str, args: dict) -> str:
if tool_name == “task_update”:
status = TaskStatus(args[“status”])
result = args.get(“result”, “”)
self.task_board.update_status(args[“task_id”], status, result)
if status == TaskStatus.COMPLETED:
self.registry.increment_completed(self.name)
return json.dumps({“ok”: True, “task_id”: args[“task_id”], “new_status”: args[“status”]})
elif tool_name == “inbox_send”:
self.inbox.send(self.name, args[“to”], args[“message”])
return json.dumps({“ok”: True, “sent_to”: args[“to”]})
elif tool_name == “inbox_receive”:
msgs = self.inbox.receive(self.name)
if not msgs:
return json.dumps({“messages”: [], “note”: “No new messages”})
return json.dumps({
“messages”: [
{“from”: m.sender, “content”: m.content, “time”: m.timestamp}
for m in msgs
]
})
elif tool_name == “task_list”:
owner = args.get(“owner”, self.name)
tasks = self.task_board.get_tasks(owner=owner)
return json.dumps({“tasks”: [t.to_dict() for t in tasks]})
return json.dumps({“error”: f”Unknown tool: {tool_name}”})
def run(self, user_message: str, max_iterations: int = 6) -> str:
self.conversation_history.append({“role”: “user”, “content”: user_message})
for iteration in range(max_iterations):
try:
response = client.chat.completions.create(
model=MODEL,
messages=[
{“role”: “system”, “content”: self._build_system_prompt()},
*self.conversation_history,
],
tools=SWARM_TOOLS,
tool_choice=”auto”,
temperature=0.4,
)
except Exception as e:
return f”[API Error] {e}”
choice = response.choices[0]
msg = choice.message
assistant_msg = {“role”: “assistant”, “content”: msg.content or “”}
if msg.tool_calls:
assistant_msg[“tool_calls”] = [
{
“id”: tc.id,
“type”: “function”,
“function”: {“name”: tc.function.name, “arguments”: tc.function.arguments},
}
for tc in msg.tool_calls
]
self.conversation_history.append(assistant_msg)
if not msg.tool_calls:
return msg.content or “(No response)”
for tc in msg.tool_calls:
fn_name = tc.function.name
fn_args = json.loads(tc.function.arguments)
result = self._handle_tool_call(fn_name, fn_args)
self.conversation_history.append({
“role”: “tool”,
“tool_call_id”: tc.id,
“content”: result,
})
return “(Agent reached max iterations)”
