Skip to content

state_manager

state_manager

State management for checkpointing and recovery.

StateManager

StateManager(storage: CheckpointStorage, checkpoint_interval: int = 500)

Manages execution state persistence and recovery.

Follows Single Responsibility: only handles state management. Uses Strategy pattern for pluggable storage backends.

Initialize state manager.

Parameters:

Name Type Description Default
storage CheckpointStorage

Checkpoint storage backend

required
checkpoint_interval int

Rows between checkpoints

500
Source code in ondine/orchestration/state_manager.py
def __init__(self, storage: CheckpointStorage, checkpoint_interval: int = 500):
    """
    Initialize state manager.

    Args:
        storage: Checkpoint storage backend
        checkpoint_interval: Rows between checkpoints
    """
    self.storage = storage
    self.checkpoint_interval = checkpoint_interval
    self._last_checkpoint_row = 0

should_checkpoint

should_checkpoint(current_row: int) -> bool

Check if checkpoint should be saved.

Parameters:

Name Type Description Default
current_row int

Current row index

required

Returns:

Type Description
bool

True if checkpoint due

Source code in ondine/orchestration/state_manager.py
def should_checkpoint(self, current_row: int) -> bool:
    """
    Check if checkpoint should be saved.

    Args:
        current_row: Current row index

    Returns:
        True if checkpoint due
    """
    return (current_row - self._last_checkpoint_row) >= self.checkpoint_interval

save_checkpoint

save_checkpoint(context: ExecutionContext) -> bool

Save checkpoint for execution context.

Parameters:

Name Type Description Default
context ExecutionContext

Execution context to save

required

Returns:

Type Description
bool

True if successful

Source code in ondine/orchestration/state_manager.py
def save_checkpoint(self, context: ExecutionContext) -> bool:
    """
    Save checkpoint for execution context.

    Args:
        context: Execution context to save

    Returns:
        True if successful
    """
    try:
        checkpoint_data = context.to_checkpoint()
        success = self.storage.save(context.session_id, checkpoint_data)

        if success:
            self._last_checkpoint_row = context.last_processed_row
            logger.info(f"Checkpoint saved at row {context.last_processed_row}")

        return success
    except Exception as e:
        logger.error(f"Failed to save checkpoint: {e}")
        return False

load_checkpoint

load_checkpoint(session_id: UUID) -> ExecutionContext | None

Load checkpoint for session.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
ExecutionContext | None

Restored execution context or None

Source code in ondine/orchestration/state_manager.py
def load_checkpoint(self, session_id: UUID) -> ExecutionContext | None:
    """
    Load checkpoint for session.

    Args:
        session_id: Session identifier

    Returns:
        Restored execution context or None
    """
    try:
        checkpoint_data = self.storage.load(session_id)

        if checkpoint_data is None:
            return None

        context = ExecutionContext.from_checkpoint(checkpoint_data)
        logger.info(f"Checkpoint loaded from row {context.last_processed_row}")

        return context
    except Exception as e:
        logger.error(f"Failed to load checkpoint: {e}")
        return None

can_resume

can_resume(session_id: UUID) -> bool

Check if session can be resumed.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if checkpoint exists

Source code in ondine/orchestration/state_manager.py
def can_resume(self, session_id: UUID) -> bool:
    """
    Check if session can be resumed.

    Args:
        session_id: Session identifier

    Returns:
        True if checkpoint exists
    """
    return self.storage.exists(session_id)

cleanup_checkpoints

cleanup_checkpoints(session_id: UUID) -> bool

Delete checkpoints for session.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if deleted

Source code in ondine/orchestration/state_manager.py
def cleanup_checkpoints(self, session_id: UUID) -> bool:
    """
    Delete checkpoints for session.

    Args:
        session_id: Session identifier

    Returns:
        True if deleted
    """
    try:
        success = self.storage.delete(session_id)
        if success:
            logger.info(f"Checkpoints cleaned up for session {session_id}")
        return success
    except Exception as e:
        logger.error(f"Failed to cleanup checkpoints: {e}")
        return False

list_checkpoints

list_checkpoints() -> list[CheckpointInfo]

List all available checkpoints.

Returns:

Type Description
list[CheckpointInfo]

List of checkpoint information

Source code in ondine/orchestration/state_manager.py
def list_checkpoints(self) -> list[CheckpointInfo]:
    """
    List all available checkpoints.

    Returns:
        List of checkpoint information
    """
    return self.storage.list_checkpoints()