Skip to content

Feedback System

The Feedback System provides a mechanism for agents and LLMs to emit real-time notifications or streaming data during execution. This is particularly useful for notifying users about the agent's progress or for streaming LLM responses.

Overview

The feedback system is based on an asynchronous push-subscribe pattern. Agents push Feedback objects to the system, and subscribers receive them via callbacks.

Feedback Object

A Feedback object consists of:

  • source: A string identifying the origin of the feedback (e.g., "llm" or an agent ID).
  • payload: The content of the feedback, which can be any data type.

FeedbackSystem Interface

The FeedbackSystem is an abstract base class that defines the protocol for feedback:

class FeedbackSystem(ABC):
    @abstractmethod
    def push(self, feedback: Feedback):
        """Pushes a new feedback event."""
        pass

    @abstractmethod
    def subscribe(self, callback: Callable[[Feedback], None]):
        """Subscribes to feedback events."""
        pass

LocalFeedbackSystem

Agentswarm includes a LocalFeedbackSystem, which is a simple in-memory implementation of the FeedbackSystem. It manages a set of local callbacks and notifies them whenever feedback is pushed.

Usage

In Context

The Context object optionally contains a FeedbackSystem. You can emit feedback directly from the context:

context.emit_feedback(payload="Starting task...", source="my_agent")

LLM Streaming

The LLM.generate method can optionally receive a FeedbackSystem. When provided, the LLM can use it to stream chunks of the generated response:

feedback_system = LocalFeedbackSystem()
feedback_system.subscribe(lambda fb: print(fb.payload, end="", flush=True))

await llm.generate(messages=messages, feedback=feedback_system)

API Reference

agentswarm.datamodels.Feedback

Bases: BaseModel

The Feedback class represents a notification or a piece of feedback emitted during agent execution.

Source code in src/agentswarm/datamodels/feedback.py
 7
 8
 9
10
11
12
13
14
class Feedback(BaseModel):
    """
    The Feedback class represents a notification or a piece of feedback
    emitted during agent execution.
    """

    source: str
    payload: Any

agentswarm.datamodels.FeedbackSystem

Bases: ABC

The FeedbackSystem class defines the interface for pushing and subscribing to feedback events.

Source code in src/agentswarm/datamodels/feedback.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class FeedbackSystem(ABC):
    """
    The FeedbackSystem class defines the interface for pushing and
    subscribing to feedback events.
    """

    @abstractmethod
    def push(self, feedback: Feedback):
        """
        Pushes a new feedback event.
        """
        raise NotImplementedError

    @abstractmethod
    def subscribe(self, callback: Callable[[Feedback], None]):
        """
        Subscribes to feedback events.
        """
        raise NotImplementedError

    @abstractmethod
    def to_dict(self) -> dict:
        """
        Returns the configuration needed to recreate the feedback system.
        WARNING: Never share keys or other secret during this process.
        """
        raise NotImplementedError

    @classmethod
    @abstractmethod
    def recreate(cls, config: dict) -> "FeedbackSystem":
        """
        Recreates a FeedbackSystem instance from a configuration dictionary.
        """
        raise NotImplementedError

push(feedback) abstractmethod

Pushes a new feedback event.

Source code in src/agentswarm/datamodels/feedback.py
23
24
25
26
27
28
@abstractmethod
def push(self, feedback: Feedback):
    """
    Pushes a new feedback event.
    """
    raise NotImplementedError

recreate(config) abstractmethod classmethod

Recreates a FeedbackSystem instance from a configuration dictionary.

Source code in src/agentswarm/datamodels/feedback.py
45
46
47
48
49
50
51
@classmethod
@abstractmethod
def recreate(cls, config: dict) -> "FeedbackSystem":
    """
    Recreates a FeedbackSystem instance from a configuration dictionary.
    """
    raise NotImplementedError

subscribe(callback) abstractmethod

Subscribes to feedback events.

Source code in src/agentswarm/datamodels/feedback.py
30
31
32
33
34
35
@abstractmethod
def subscribe(self, callback: Callable[[Feedback], None]):
    """
    Subscribes to feedback events.
    """
    raise NotImplementedError

to_dict() abstractmethod

Returns the configuration needed to recreate the feedback system. WARNING: Never share keys or other secret during this process.

Source code in src/agentswarm/datamodels/feedback.py
37
38
39
40
41
42
43
@abstractmethod
def to_dict(self) -> dict:
    """
    Returns the configuration needed to recreate the feedback system.
    WARNING: Never share keys or other secret during this process.
    """
    raise NotImplementedError

agentswarm.datamodels.LocalFeedbackSystem

Bases: FeedbackSystem

The LocalFeedbackSystem implements a simple in-memory feedback system using a list of subscription callbacks.

Source code in src/agentswarm/datamodels/local_feedback.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class LocalFeedbackSystem(FeedbackSystem):
    """
    The LocalFeedbackSystem implements a simple in-memory feedback system
    using a list of subscription callbacks.
    """

    def __init__(self):
        self._subscriptions: Set[Callable[[Feedback], None]] = set()

    def push(self, feedback: Feedback):
        """
        Pushes a feedback event to all subscribers.
        """
        for callback in self._subscriptions:
            try:
                callback(feedback)
            except Exception as e:
                logger.error(f"Error in callback: {e}")

    def subscribe(self, callback: Callable[[Feedback], None]):
        """
        Adds a callback to the feedback subscriptions.
        """
        self._subscriptions.add(callback)

    def to_dict(self) -> dict:
        from ..utils.exceptions import RemoteExecutionNotSupportedError

        raise RemoteExecutionNotSupportedError(
            "LocalFeedbackSystem cannot be serialized for remote execution."
        )

    @classmethod
    def recreate(cls, config: dict) -> "LocalFeedbackSystem":
        from ..utils.exceptions import RemoteExecutionNotSupportedError

        raise RemoteExecutionNotSupportedError(
            "LocalFeedbackSystem cannot be recreated from remote configuration."
        )

push(feedback)

Pushes a feedback event to all subscribers.

Source code in src/agentswarm/datamodels/local_feedback.py
17
18
19
20
21
22
23
24
25
def push(self, feedback: Feedback):
    """
    Pushes a feedback event to all subscribers.
    """
    for callback in self._subscriptions:
        try:
            callback(feedback)
        except Exception as e:
            logger.error(f"Error in callback: {e}")

subscribe(callback)

Adds a callback to the feedback subscriptions.

Source code in src/agentswarm/datamodels/local_feedback.py
27
28
29
30
31
def subscribe(self, callback: Callable[[Feedback], None]):
    """
    Adds a callback to the feedback subscriptions.
    """
    self._subscriptions.add(callback)