Skip to main content
Whenever messages are stored in Honcho, background processes kick off to reason about the conversation and generate insights. Reasoning is an asynchronous process and will not immediately generate insights for the latest message you’ve sent. This is by design: we want to reason efficiently over batches of messages rather than assessing each message in a vacuum. Honcho provides several utilities to check the status of the queue.
from honcho import Honcho
honcho = Honcho()

status = honcho.queue_status()
Output types
class QueueStatus(BaseModel):
    completed_work_units: int
    """Completed work units"""

    in_progress_work_units: int
    """Work units currently being processed"""

    pending_work_units: int
    """Work units waiting to be processed"""

    total_work_units: int
    """Total work units"""

    sessions: Optional[Dict[str, Sessions]] = None
    """Per-session status when not filtered by session"""
Whenever a message is sent it will generate several tasks. These could be tasks such as generating insights, cleaning up a representation, summarizing a conversation etc. These tasks are defined based on who is sending the message, what session the message is in, and potentially who is observing the message. We call the combination of these parameters a work_unit This has a few different implications.
  • tasks within the same work_unit are processed sequentially, but multiple work_units will be processed in parallel
  • If local representations are turned in a Session then a message will generate an additional work unit for every peer that has observe_others=True
The queue_status method can take additional parameters to scope the status to a specific work unit:
def queue_status(
        self,
        observer_id: str | None = None,
        sender_id: str | None = None,
        session_id: str | None = None,
    ) -> QueueStatus:
Additionally, there are queue status methods available on the session objects in each of the SDKs.
Do not wait for the queue to be empty. The queue is a continuous processing system—new messages may arrive at any time, and “completion” is not a meaningful state. Design your application to work without assuming the queue will ever be fully drained. Use queueStatus() for observability and debugging, not for synchronization.
Below are the function signatures for the session level queue status method:
@validate_call
    def queue_status(
        self,
        observer_id: str | None = None,
        sender_id: str | None = None,
    ) -> QueueStatus: