Skip to main content
In this tutorial, we’ll walk through how to ingest your Gmail emails into Honcho. By the end, each email thread will be a Honcho session and each participant will be a peer — giving your agents memory of who said what across your email history. This guide includes a ready-to-run Python script that handles everything: Gmail OAuth, thread fetching, participant extraction, and Honcho ingestion. You can run it as-is or use the full tutorial below to understand each piece as you go.
The full script is available on GitHub. This is a developer-focused tutorial — it requires creating a Google Cloud project and OAuth credentials.

TL;DR

If you just want to get your emails into Honcho, here’s everything you need.

1. Set Up Google Cloud Credentials

Follow Google’s official Gmail API Python Quickstart to:
  1. Create a Google Cloud project and enable the Gmail API
  2. Configure the OAuth consent screen
  3. Create OAuth credentials (select Desktop app as the application type)
  4. Download the credentials JSON into the same directory as the script
The script auto-detects Google’s default client_secret_*.json filename, so no renaming needed. The script only needs the gmail.readonly scope.

2. Install Dependencies

uv pip install google-api-python-client google-auth-oauthlib honcho-ai

3. Preview with a Dry Run

uv run honcho_gmail.py --dry-run --max-threads 5
On first run, a browser window opens for OAuth consent. After authorizing, a token.json file is created — future runs skip this step.

4. Load into Honcho

export HONCHO_API_KEY=your_api_key
uv run honcho_gmail.py --workspace gmail-inbox --max-threads 20
You can filter threads with Gmail search syntax:
uv run honcho_gmail.py --query "from:alice@example.com"
uv run honcho_gmail.py --label INBOX
uv run honcho_gmail.py --query "after:2024/01/01 has:attachment" --max-threads 50
That’s it — your emails are now queryable in Honcho. Read on if you want to understand how the script works and the design decisions behind it.

Full Tutorial

How Gmail Maps to Honcho

The core idea is straightforward: each Gmail thread becomes a Honcho session, and each email participant becomes a peer. Here’s the full mapping:
Gmail ConceptHoncho ConceptDetails
Your Gmail accountWorkspace (gmail)One workspace for all email data
Email participantPeerEmail address as ID for deduplication
Email threadSession (gmail-thread-{id})One session per thread, all participants attached
Individual emailMessageAttributed to the sender with original timestamp

Email as Peer ID

The script normalizes email addresses into URL-safe peer IDs — alice@example.com becomes alice-example-com. This means the same person is automatically deduplicated across threads. If Alice emails you in 10 different threads, all of those conversations accumulate under a single peer.
def peer_id_from_email(email: str) -> str:
    """Convert email to a valid Honcho peer ID."""
    return email.replace("@", "-").replace(".", "-")
This also means peers are consistent across data sources. If you import Granola meetings and Gmail threads for the same person, they merge under the same peer ID.

Extracting Participants

Every email has a sender, recipients, and optionally CC/BCC addresses. The script extracts all of these to build a complete picture of who’s involved in each thread:
for m in msgs:
    register_peer(m["from"])
    for addr in parse_address_list(m["to"]):
        register_peer(addr)
    for addr in parse_address_list(m["cc"]):
        register_peer(addr)
    for addr in parse_address_list(m["bcc"]):
        register_peer(addr)
Display names are extracted when available (e.g., Alice Smith <alice@example.com> → name: “Alice Smith”). When only an email is present, the script generates a name from the local part.

Message Attribution and Timestamps

Each email becomes a message attributed to its sender via peer.message(). The original email timestamp is preserved using created_at, so Honcho sees the conversation in chronological order — not the order you imported it.
honcho_msgs.append(peer.message(
    content,
    metadata={
        "gmail_id": m["id"],
        "subject": m["subject"],
        "from": m["from"],
        "to": m["to"],
        "labels": m["labels"],
    },
    created_at=m["timestamp"],
))

Multi-Peer Sessions

Each thread’s session is linked to all participants using session.add_peers(). This means when you query Honcho about a peer, it has context not just from their messages but from the full conversations they participated in.
session = honcho.session(session_id, metadata={
    "gmail_thread_id": tid,
    "subject": subject,
    "source": "gmail",
    "message_count": len(msgs),
})
session.add_peers(thread_peers)

Stripping Quoted Replies

Email threads are full of quoted replies — each message repeats everything above it. The script strips these out so only the new content is stored per message, avoiding duplication in Honcho’s memory:
def strip_quoted_replies(text: str) -> str:
    """Strip quoted reply text, keeping only the new content."""
    lines = text.split("\n")
    clean_lines = []
    for line in lines:
        stripped = line.strip()
        if re.match(r"^On .+wrote:\s*$", stripped):
            break
        if stripped.startswith(">"):
            break
        # ... other reply markers
        clean_lines.append(line)
    return "\n".join(clean_lines).rstrip()

Querying After Import

Once your emails are in Honcho, you can query any peer:
import os
from honcho import Honcho

honcho = Honcho(workspace_id="gmail-inbox", api_key=os.environ["HONCHO_API_KEY"])

alice = honcho.peer("alice-example-com")
print(alice.chat("What has Alice been discussing with me?"))
print(alice.chat("What action items has Alice mentioned?"))

CLI Reference

usage: honcho_gmail.py [-h] [--workspace WORKSPACE] [--query QUERY]
                          [--label LABEL] [--max-threads N] [--dry-run]
                          [--credentials PATH] [--token PATH]

options:
  --workspace, -w    Honcho workspace ID (default: gmail)
  --query, -q        Gmail search query (e.g., 'from:alice@example.com')
  --label, -l        Gmail label to filter by (e.g., INBOX)
  --max-threads, -n  Max threads to fetch (default: 10)
  --dry-run          Preview without writing to Honcho
  --credentials, -c  Path to OAuth credentials JSON (auto-detects client_secret*.json)
  --token, -t        Path to store access token (default: token.json)

Troubleshooting

”No client_secret*.json file found”

Download OAuth credentials from Google Cloud Console and place the client_secret_*.json file in the same directory as the script.

”Access blocked: This app’s request is invalid”

Your OAuth consent screen may not be configured correctly. Ensure you’ve added the gmail.readonly scope.

”Token has been expired or revoked”

Delete token.json and run the script again to re-authenticate.

Rate Limits

The script includes a small delay when creating peers to avoid hitting Honcho’s rate limits. For large imports (100+ threads), consider running in batches.

Unique Messages

Use an AI assistant in your inbox? Want to parse out its messages differently? Feel free to modify and improve the structure of this script to fit your bespoke email setup. This script was written for agents and as such is easy to update with your coding assistant.

Full Script

#!/usr/bin/env python3
"""Load Gmail messages into Honcho.

Uses the Gmail API directly (with OAuth) to fetch emails and the Honcho Python SDK to store them.
Each Gmail thread becomes a Honcho session, each sender becomes a peer.

Prerequisites:
1. Create a Google Cloud project and enable the Gmail API
2. Create OAuth 2.0 credentials (Desktop app type)
3. Download the credentials JSON (client_secret_*.json) into this directory
4. Install dependencies:
   pip install google-api-python-client google-auth-oauthlib honcho-ai

On first run, a browser window will open for OAuth consent. After authorizing,
a 'token.json' file will be created to store your credentials for future runs.
"""

import argparse
import base64
import glob
import os
import re
import time
from datetime import datetime, timezone
from email.header import decode_header, make_header
from email.utils import getaddresses, parseaddr

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
PEER_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]+$")


def find_credentials() -> str:
    """Find a Google OAuth credentials file in the current directory."""
    matches = glob.glob("client_secret*.json")
    if matches:
        return matches[0]
    raise FileNotFoundError(
        "No client_secret*.json file found.\n"
        "Download OAuth credentials from Google Cloud Console:\n"
        "1. Go to console.cloud.google.com\n"
        "2. Create/select a project and enable Gmail API\n"
        "3. Create OAuth 2.0 credentials (Desktop app)\n"
        "4. Download the JSON into this directory"
    )


def get_gmail_service(credentials_file: str | None = None, token_file: str = "token.json"):
    """Authenticate and return a Gmail API service instance."""
    creds = None

    if os.path.exists(token_file):
        creds = Credentials.from_authorized_user_file(token_file, SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            print("Refreshing expired credentials...")
            creds.refresh(Request())
        else:
            if credentials_file is None:
                credentials_file = find_credentials()
            print(f"Using credentials: {credentials_file}")
            print("Opening browser for OAuth consent...")
            flow = InstalledAppFlow.from_client_secrets_file(credentials_file, SCOPES)
            creds = flow.run_local_server(port=0)

        with open(token_file, "w") as token:
            token.write(creds.to_json())
        print(f"Credentials saved to {token_file}")

    return build("gmail", "v1", credentials=creds)


def list_threads(service, query: str = None, label_ids: list = None, max_results: int = 10) -> list[dict]:
    """List Gmail threads with pagination support."""
    all_threads = []
    page_token = None

    while len(all_threads) < max_results:
        try:
            params = {
                "userId": "me",
                "maxResults": min(100, max_results - len(all_threads)),
            }
            if query:
                params["q"] = query
            if label_ids:
                params["labelIds"] = label_ids
            if page_token:
                params["pageToken"] = page_token

            response = service.users().threads().list(**params).execute()
            threads = response.get("threads", [])
            all_threads.extend(threads)

            page_token = response.get("nextPageToken")
            if not page_token:
                break

        except HttpError as e:
            print(f"Error listing threads: {e}")
            break

    return all_threads[:max_results]


def get_thread(service, thread_id: str) -> dict:
    """Fetch a complete Gmail thread with all messages."""
    try:
        return service.users().threads().get(
            userId="me",
            id=thread_id,
            format="full"
        ).execute()
    except HttpError as e:
        print(f"Error fetching thread {thread_id}: {e}")
        return {}


def _decode_header_str(header: str) -> str:
    """Decode an RFC 2047 encoded header string to plain Unicode."""
    return str(make_header(decode_header(header)))


def extract_email(from_header: str) -> str:
    """Extract bare email from an RFC 5322 header value."""
    _, addr = parseaddr(_decode_header_str(from_header))
    return addr.lower().strip()


def extract_name(from_header: str) -> str:
    """Extract display name from an RFC 5322 header value."""
    name, _ = parseaddr(_decode_header_str(from_header))
    return name.strip() or from_header.strip()


def decode_body(payload: dict) -> str:
    """Recursively extract plain text from a Gmail message payload."""
    if payload.get("mimeType") == "text/plain":
        data = payload.get("body", {}).get("data", "")
        if data:
            return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")

    parts = payload.get("parts", [])
    for part in parts:
        text = decode_body(part)
        if text:
            return text
    return ""


def strip_quoted_replies(text: str) -> str:
    """Strip quoted reply text from an email body, keeping only the new content."""
    lines = text.split("\n")
    clean_lines = []
    for line in lines:
        stripped = line.strip()
        if re.match(r"^On .+wrote:\s*$", stripped):
            break
        if stripped.startswith("---------- Forwarded message"):
            break
        if stripped.startswith(">"):
            break
        if re.match(r"^[-_]{10,}$", stripped):
            break
        clean_lines.append(line)
    return "\n".join(clean_lines).rstrip()


def parse_address_list(header: str) -> list[str]:
    """Parse a comma-separated email header into individual addresses."""
    if not header.strip():
        return []
    decoded = _decode_header_str(header)
    return [
        f"{name} <{addr}>" if name else addr
        for name, addr in getaddresses([decoded])
        if addr
    ]


def peer_id_from_email(email: str) -> str:
    """Convert email to a valid Honcho peer ID."""
    peer_id = re.sub(r"[^A-Za-z0-9_-]+", "-", email).strip("-").lower()
    peer_id = re.sub(r"-{2,}", "-", peer_id)
    if not peer_id:
        peer_id = "unknown-peer"

    if not PEER_ID_PATTERN.fullmatch(peer_id):
        raise ValueError(f"Generated peer ID is invalid: {peer_id!r}")
    return peer_id


def fetch_thread_messages(service, thread_id: str) -> list[dict]:
    """Fetch all messages in a Gmail thread with full content."""
    data = get_thread(service, thread_id)
    messages = []

    for msg in data.get("messages", []):
        headers = {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
        body = strip_quoted_replies(decode_body(msg.get("payload", {})))
        ts = int(msg.get("internalDate", "0")) / 1000

        messages.append({
            "id": msg["id"],
            "thread_id": msg["threadId"],
            "from": headers.get("From", ""),
            "to": headers.get("To", ""),
            "cc": headers.get("Cc", ""),
            "bcc": headers.get("Bcc", ""),
            "subject": headers.get("Subject", ""),
            "date": headers.get("Date", ""),
            "timestamp": datetime.fromtimestamp(ts, tz=timezone.utc),
            "body": body.strip(),
            "labels": msg.get("labelIds", []),
            "snippet": msg.get("snippet", ""),
        })

    return messages


def main():
    parser = argparse.ArgumentParser(description="Load Gmail messages into Honcho")
    parser.add_argument("--workspace", "-w", default="gmail", help="Honcho workspace ID (default: gmail)")
    parser.add_argument("--query", "-q", default=None, help="Gmail search query (e.g. 'from:alice@example.com')")
    parser.add_argument("--label", "-l", default=None, help="Gmail label to filter by (e.g. INBOX)")
    parser.add_argument("--max-threads", "-n", type=int, default=10, help="Max threads to fetch (default: 10)")
    parser.add_argument("--dry-run", action="store_true", help="Print what would be loaded without writing to Honcho")
    parser.add_argument("--credentials", "-c", default=None, help="Path to OAuth credentials JSON (auto-detects client_secret*.json)")
    parser.add_argument("--token", "-t", default="token.json", help="Path to store/load access token")
    args = parser.parse_args()

    # Authenticate
    print("Authenticating with Gmail API...")
    service = get_gmail_service(args.credentials, args.token)
    print("  Authenticated successfully!")

    label_ids = [args.label] if args.label else None

    # List threads
    print(f"\nFetching up to {args.max_threads} threads from Gmail...")
    threads = list_threads(service, query=args.query, label_ids=label_ids, max_results=args.max_threads)
    print(f"  Found {len(threads)} threads")

    if not threads:
        print("No threads found. Try adjusting --query or --label.")
        return

    # Fetch full messages for each thread
    all_thread_messages = {}
    seen_peers = {}

    def register_peer(addr: str):
        email = extract_email(addr)
        if email and email not in seen_peers:
            name = extract_name(addr)
            if name.lower().strip() == email or "@" in name:
                name = email.split("@")[0].replace(".", " ").title()
            seen_peers[email] = {
                "name": name,
                "peer_id": peer_id_from_email(email),
                "email": email,
            }

    for i, t in enumerate(threads):
        tid = t["id"]
        print(f"  Fetching thread {i+1}/{len(threads)}: {tid}")
        msgs = fetch_thread_messages(service, tid)
        all_thread_messages[tid] = msgs
        for m in msgs:
            register_peer(m["from"])
            for addr in parse_address_list(m["to"]):
                register_peer(addr)
            for addr in parse_address_list(m["cc"]):
                register_peer(addr)
            for addr in parse_address_list(m["bcc"]):
                register_peer(addr)

    # Summary
    total_msgs = sum(len(v) for v in all_thread_messages.values())
    print("\nSummary:")
    print(f"  Threads: {len(all_thread_messages)}")
    print(f"  Messages: {total_msgs}")
    print(f"  Unique participants: {len(seen_peers)}")
    for email, info in seen_peers.items():
        print(f"    {info['peer_id']} ({info['name']} <{email}>)")

    if args.dry_run:
        print("\n[DRY RUN] Would create the above in Honcho. Showing first message per thread:")
        for tid, msgs in all_thread_messages.items():
            m = msgs[0]
            body_preview = m["body"][:120].replace("\n", " ") if m["body"] else m["snippet"][:120]
            print(f"  Thread {tid}: {m['subject']}")
            print(f"    {m['from']} @ {m['date']}")
            print(f"    {body_preview}...")
        return

    # Load into Honcho
    from honcho import Honcho

    print(f"\nLoading into Honcho workspace '{args.workspace}'...")
    honcho = Honcho(workspace_id=args.workspace)

    # Create peers
    peers = {}
    for i, (email, info) in enumerate(seen_peers.items()):
        if i > 0 and i % 4 == 0:
            time.sleep(1)
        peers[email] = honcho.peer(info["peer_id"], metadata={
            "email": email,
            "name": info["name"],
            "source": "gmail",
        })
        print(f"  Peer: {info['peer_id']}")

    # Create sessions and messages per thread
    for tid, msgs in all_thread_messages.items():
        subject = msgs[0]["subject"] if msgs else "No subject"
        session_id = f"gmail-thread-{tid}"

        thread_peer_emails = set()
        for m in msgs:
            thread_peer_emails.add(extract_email(m["from"]))
            for addr in parse_address_list(m["to"]):
                thread_peer_emails.add(extract_email(addr))
            for addr in parse_address_list(m["cc"]):
                thread_peer_emails.add(extract_email(addr))
            for addr in parse_address_list(m["bcc"]):
                thread_peer_emails.add(extract_email(addr))
        thread_peers = [peers[e] for e in thread_peer_emails if e in peers]

        session = honcho.session(session_id, metadata={
            "gmail_thread_id": tid,
            "subject": subject,
            "source": "gmail",
            "message_count": len(msgs),
        })
        session.add_peers(thread_peers)

        honcho_msgs = []
        for m in msgs:
            email = extract_email(m["from"])
            peer = peers.get(email)
            if not peer:
                continue
            content = m["body"] if m["body"] else m["snippet"]
            if not content:
                continue
            honcho_msgs.append(peer.message(
                content,
                metadata={
                    "gmail_id": m["id"],
                    "subject": m["subject"],
                    "from": m["from"],
                    "to": m["to"],
                    "labels": m["labels"],
                },
                created_at=m["timestamp"],
            ))

        if honcho_msgs:
            session.add_messages(honcho_msgs)
            print(f"  Session {session_id}: {len(honcho_msgs)} messages — {subject[:60]}")

    print(f"\nDone! Loaded {total_msgs} messages into workspace '{args.workspace}'.")


if __name__ == "__main__":
    main()

Next Steps

Design Patterns

See how the Granola integration maps to common Honcho patterns.

GitHub Repository

Source code and example script.