Skip to content

Core API Reference

This page provides detailed documentation for the core GUM module.

Main Class

gum.gum

Attributes

Classes

gum(user_name: str, model: str, *observers: Observer, propose_prompt: str | None = None, similar_prompt: str | None = None, revise_prompt: str | None = None, audit_prompt: str | None = None, data_directory: str = '~/.cache/gum', db_name: str = 'gum.db', max_concurrent_updates: int = 4, verbosity: int = logging.INFO, audit_enabled: bool = False, api_base: str | None = None, api_key: str | None = None, session_id: str | None = None)

A class for managing general user models.

This class provides functionality for observing user behavior, generating and managing propositions about user behavior, and maintaining relationships between observations and propositions.

The system uses a unified AI client that supports multiple providers: - Text completion: OpenAI (default), Azure OpenAI, Vertex AI, or Google AI Studio (Gemini API key via litellm) - Vision completion: OpenAI (default), OpenRouter, Vertex AI, or Google AI Studio (Gemini API key via litellm)

Parameters:

Name Type Description Default
user_name str

The name of the user being modeled.

required
*observers Observer

Variable number of observer instances to track user behavior.

()
propose_prompt str

Custom prompt for proposition generation.

None
similar_prompt str

Custom prompt for similarity analysis.

None
revise_prompt str

Custom prompt for proposition revision.

None
audit_prompt str

Custom prompt for auditing.

None
data_directory str

Directory for storing data. Defaults to "~/.cache/gum".

'~/.cache/gum'
db_name str

Name of the database file. Defaults to "gum.db".

'gum.db'
max_concurrent_updates int

Maximum number of concurrent updates. Defaults to 4.

4
verbosity int

Logging verbosity level. Defaults to logging.INFO.

INFO
audit_enabled bool

Whether to enable auditing. Defaults to False.

False
api_base str

Deprecated, use environment variables instead.

None
api_key str

Deprecated, use environment variables instead.

None
Source code in gum/gum.py
def __init__(
    self,
    user_name: str,
    model: str,
    *observers: Observer,
    propose_prompt: str | None = None,
    similar_prompt: str | None = None,
    revise_prompt: str | None = None,
    audit_prompt: str | None = None,
    data_directory: str = "~/.cache/gum",
    db_name: str = "gum.db",
    max_concurrent_updates: int = 4,
    verbosity: int = logging.INFO,
    audit_enabled: bool = False,
    api_base: str | None = None,
    api_key: str | None = None,
    session_id: str | None = None,
):
    # basic paths
    data_directory = os.path.expanduser(data_directory)
    os.makedirs(data_directory, exist_ok=True)

    # runtime
    self.user_name = user_name
    self.observers: list[Observer] = list(observers)
    self.model = model
    self.audit_enabled = audit_enabled
    self.session_id = session_id

    # logging
    self.logger = logging.getLogger("gum")
    self.logger.setLevel(verbosity)
    if not self.logger.handlers:
        h = logging.StreamHandler()
        h.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
        self.logger.addHandler(h)
    # Stop propagation to the root logger (which `gum.cli` configures via
    # `logging.basicConfig`). Otherwise every message prints twice — once
    # from our formatter and once from the root handler.
    self.logger.propagate = False

    # LiteLLM attaches its own StreamHandler at import time AND its logger
    # propagates to the root logger that `gum.cli` configures. Result: every
    # LiteLLM line is printed twice (once in our "HH:MM:SS - LiteLLM:INFO"
    # format, once in the default "INFO:LiteLLM" format). Disabling
    # propagation leaves LiteLLM's own handler intact but removes the dup.
    logging.getLogger("LiteLLM").propagate = False

    # prompts - use default prompts from gum.py, or load from files if custom methods are added
    self.propose_prompt = propose_prompt or PROPOSE_PROMPT
    self.similar_prompt = similar_prompt or SIMILAR_PROMPT
    self.revise_prompt = revise_prompt or REVISE_PROMPT
    self.audit_prompt = audit_prompt or AUDIT_PROMPT

    # Initialize unified AI client (supports OpenAI, Azure OpenAI, and OpenRouter)
    self.ai_client = None  # Will be initialized lazily

    self.engine = None
    self.Session = None
    self._db_name        = db_name
    self._data_directory = data_directory

    self._update_sem = asyncio.Semaphore(max_concurrent_updates)
    self._tasks: set[asyncio.Task] = set()
    self._background_tasks: set[asyncio.Task] = set()
    self._loop_task: asyncio.Task | None = None
    self.update_handlers: list[Callable[[Observer, Update], None]] = []
Attributes
Session = None instance-attribute
ai_client = None instance-attribute
audit_enabled = audit_enabled instance-attribute
audit_prompt = audit_prompt or AUDIT_PROMPT instance-attribute
engine = None instance-attribute
logger = logging.getLogger('gum') instance-attribute
model = model instance-attribute
observers: list[Observer] = list(observers) instance-attribute
propose_prompt = propose_prompt or PROPOSE_PROMPT instance-attribute
revise_prompt = revise_prompt or REVISE_PROMPT instance-attribute
session_id = session_id instance-attribute
similar_prompt = similar_prompt or SIMILAR_PROMPT instance-attribute
update_handlers: list[Callable[[Observer, Update], None]] = [] instance-attribute
user_name = user_name instance-attribute
Functions
__aenter__() async

Async context manager entry point.

Returns:

Name Type Description
gum

The instance of the gum class.

Source code in gum/gum.py
async def __aenter__(self):
    """Async context manager entry point.

    Returns:
        gum: The instance of the gum class.
    """
    await self.connect_db()
    self.start_update_loop()
    return self
__aexit__(exc_type, exc, tb) async

Async context manager exit point.

Parameters:

Name Type Description Default
exc_type

The type of exception if any.

required
exc

The exception instance if any.

required
tb

The traceback if any.

required
Source code in gum/gum.py
async def __aexit__(self, exc_type, exc, tb):
    """Async context manager exit point.

    Args:
        exc_type: The type of exception if any.
        exc: The exception instance if any.
        tb: The traceback if any.
    """
    await self.stop_update_loop()

    # wait for any in-flight handlers
    if self._tasks:
        await asyncio.gather(*self._tasks, return_exceptions=True)
    await self.drain_background_tasks()

    # stop observers
    for obs in self.observers:
        await obs.stop()
add_observer(observer: Observer)

Add an observer to track user behavior.

Parameters:

Name Type Description Default
observer Observer

The observer to add.

required
Source code in gum/gum.py
def add_observer(self, observer: Observer):
    """Add an observer to track user behavior.

    Args:
        observer (Observer): The observer to add.
    """
    self.observers.append(observer)
connect_db() async

Initialize the database connection if not already connected.

Source code in gum/gum.py
async def connect_db(self):
    """Initialize the database connection if not already connected."""
    if self.engine is None:
        self.engine, self.Session = await init_db(
            self._db_name, self._data_directory
        )
        # Wire the session factory to the Gumbo engine so suggestions are
        # persisted regardless of whether GUM is started via CLI or the API
        # controller.  The controller wires it again at startup which is a
        # no-op because set_db_session_factory is idempotent.
        try:
            from .services.gumbo_engine import get_gumbo_engine
            gumbo_engine = await get_gumbo_engine()
            gumbo_engine.set_db_session_factory(self.Session)
            self.logger.info("Gumbo engine session factory wired from connect_db")
        except Exception as e:
            self.logger.warning(f"Could not wire Gumbo session factory: {e}")

        # Give every observer a reference to the session factory so that
        # observers that need direct DB access (e.g. FocusObserver) can
        # use it without coupling to gum internals.
        for obs in self.observers:
            try:
                obs.set_session_factory(self.Session)
            except Exception as e:
                self.logger.warning(
                    f"Could not set session factory on {obs.name}: {e}"
                )
drain_background_tasks() -> None async

Wait for any background tasks spawned by pipeline processing.

Source code in gum/gum.py
async def drain_background_tasks(self) -> None:
    """Wait for any background tasks spawned by pipeline processing."""
    if self._background_tasks:
        await asyncio.gather(*list(self._background_tasks), return_exceptions=True)
process_update(observer: Observer | str, update: Update, *, run_stage: str = 'through-suggestions') -> Observation async

Process one update through a selected pipeline boundary.

Source code in gum/gum.py
async def process_update(
    self,
    observer: Observer | str,
    update: Update,
    *,
    run_stage: str = "through-suggestions",
) -> Observation:
    """Process one update through a selected pipeline boundary."""
    observer_name = observer.name if hasattr(observer, "name") else str(observer)
    content_preview = update.content[:120].replace("\n", " ")
    self.logger.info(
        f"[PIPELINE] Step 1 — Observation received from '{observer_name}' "
        f"({len(update.content)} chars): {content_preview}..."
    )

    from datetime import datetime, timezone as _tz

    if update.event_time:
        try:
            event_time = datetime.fromisoformat(update.event_time)
        except ValueError:
            event_time = datetime.now(_tz.utc)
    else:
        event_time = datetime.now(_tz.utc)

    async with self._session() as session:
        observation = Observation(
            source=observer_name,
            content=update.content,
            content_type=update.content_type,
            image_path=update.image_path,
            raw_data=update.raw_data,
            created_at=event_time,
        )

        if await self._handle_audit(observation):
            self.logger.info("[PIPELINE] Audit blocked this observation — skipping")
            return observation

        session.add(observation)
        await session.flush()
        self.logger.info(f"[PIPELINE] Step 2 — Observation stored (id={observation.id})")

        if run_stage == "obs-only":
            return observation

        pool = await self._generate_and_search(
            session,
            update,
            observation,
            trigger_suggestions=(run_stage == "through-suggestions"),
        )

        if pool:
            self.logger.info(
                f"[PIPELINE] Step 4 — Linking observation {observation.id} "
                f"to {len(pool)} candidate propositions"
            )
            for prop in pool:
                await self._attach_obs_if_missing(prop, observation, session)
            await session.flush()

        identical, similar, different = await self._filter_propositions(pool)
        self.logger.info(
            f"[PIPELINE] Step 5 — Similarity filter: "
            f"{len(identical)} identical, {len(similar)} similar, {len(different)} different"
        )

        await self._handle_identical(session, identical, observation)
        await self._handle_similar(session, similar, observation)
        await self._handle_different(session, different, observation)
        self.logger.info(
            f"[PIPELINE] Step 6 — Proposition updates applied. "
            f"Identical linked: {len(identical)}, "
            f"Similar revised into new versions: {len(similar)}, "
            f"Different linked: {len(different)}"
        )
        return observation
query(user_query: str, *, limit: int = 3, mode: str = 'OR', start_time: datetime | None = None, end_time: datetime | None = None) -> list[tuple[Proposition, float]] async

Query the database for propositions matching the user query.

Parameters:

Name Type Description Default
user_query str

The query string to search for.

required
limit int

Maximum number of results to return. Defaults to 3.

3
mode str

Search mode ("OR" or "AND"). Defaults to "OR".

'OR'
start_time datetime

Start time for filtering results. Defaults to None.

None
end_time datetime

End time for filtering results. Defaults to None.

None

Returns:

Type Description
list[tuple[Proposition, float]]

list[tuple[Proposition, float]]: List of tuples containing propositions and their relevance scores.

Source code in gum/gum.py
async def query(
    self,
    user_query: str,
    *,
    limit: int = 3,
    mode: str = "OR",
    start_time: datetime | None = None,
    end_time: datetime | None = None,
) -> list[tuple[Proposition, float]]:
    """Query the database for propositions matching the user query.

    Args:
        user_query (str): The query string to search for.
        limit (int, optional): Maximum number of results to return. Defaults to 3.
        mode (str, optional): Search mode ("OR" or "AND"). Defaults to "OR".
        start_time (datetime, optional): Start time for filtering results. Defaults to None.
        end_time (datetime, optional): End time for filtering results. Defaults to None.

    Returns:
        list[tuple[Proposition, float]]: List of tuples containing propositions and their relevance scores.
    """
    async with self._session() as session:
        return await search_propositions_bm25(
            session,
            user_query,
            limit=limit,
            mode=mode,
            start_time=start_time,
            end_time=end_time,
        )
register_update_handler(fn: Callable[[Observer, Update], None])

Register a custom update handler function.

Parameters:

Name Type Description Default
fn Callable[[Observer, Update], None]

The handler function to register.

required
Source code in gum/gum.py
def register_update_handler(self, fn: Callable[[Observer, Update], None]):
    """Register a custom update handler function.

    Args:
        fn (Callable[[Observer, Update], None]): The handler function to register.
    """
    self.update_handlers.append(fn)
remove_observer(observer: Observer)

Remove an observer from tracking.

Parameters:

Name Type Description Default
observer Observer

The observer to remove.

required
Source code in gum/gum.py
def remove_observer(self, observer: Observer):
    """Remove an observer from tracking.

    Args:
        observer (Observer): The observer to remove.
    """
    if observer in self.observers:
        self.observers.remove(observer)
start_update_loop()

Start the asynchronous update loop for processing observer updates.

Source code in gum/gum.py
def start_update_loop(self):
    """Start the asynchronous update loop for processing observer updates."""
    if self._loop_task is None and self.observers:
        self._loop_task = asyncio.create_task(self._update_loop())
stop_update_loop() async

Stop the asynchronous update loop and clean up resources.

Source code in gum/gum.py
async def stop_update_loop(self):
    """Stop the asynchronous update loop and clean up resources."""
    if self._loop_task:
        self._loop_task.cancel()
        try:
            await self._loop_task
        except asyncio.CancelledError:
            pass
        self._loop_task = None

Functions