Whenever messages are stored in Honcho, background processes kick off to reason about the conversation and generate insights.
Reasoning is an asynchronous process and will not immediately
generate insights for the latest message you’ve sent. This is
by design: we want to reason efficiently over batches of messages
rather than assessing each message in a vacuum. Honcho provides
several utilities to check the status of the queue.
from honcho import Honcho
honcho = Honcho()
status = honcho.queue_status()
Output types
class QueueStatus(BaseModel):
completed_work_units: int
"""Completed work units"""
in_progress_work_units: int
"""Work units currently being processed"""
pending_work_units: int
"""Work units waiting to be processed"""
total_work_units: int
"""Total work units"""
sessions: Optional[Dict[str, Sessions]] = None
"""Per-session status when not filtered by session"""
Whenever a message is sent it will generate several tasks. These could
be tasks such as generating insights, cleaning up a representation, summarizing
a conversation etc. These tasks are defined based on who is sending the
message, what session the message is in, and potentially who is observing the
message. We call the combination of these parameters a work_unit
This has a few different implications.
- tasks within the same work_unit are processed sequentially, but multiple
work_units will be processed in parallel
- If local representations are turned in a Session then a message will
generate an additional work unit for every peer that has
observe_others=True
The queue_status method can take additional
parameters to scope the status to a specific work unit:
def queue_status(
self,
observer_id: str | None = None,
sender_id: str | None = None,
session_id: str | None = None,
) -> QueueStatus:
Additionally, there are queue status methods available on the session objects in each of the SDKs.
Do not wait for the queue to be empty. The queue is a continuous processing system—new messages may arrive at any time, and “completion” is not a meaningful state. Design your application to work without assuming the queue will ever be fully drained. Use queueStatus() for observability and debugging, not for synchronization.
Below are the function signatures for the session level queue status method:
@validate_call
def queue_status(
self,
observer_id: str | None = None,
sender_id: str | None = None,
) -> QueueStatus: