Skip to content

checkpoint_storage

checkpoint_storage

Checkpoint storage for fault tolerance.

Provides persistent storage of execution state to enable resume after failures.

CheckpointStorage

Bases: ABC

Abstract base for checkpoint storage implementations.

Follows Strategy pattern for pluggable storage backends.

save abstractmethod

save(session_id: UUID, data: dict[str, Any]) -> bool

Save checkpoint data.

Parameters:

Name Type Description Default
session_id UUID

Unique session identifier

required
data dict[str, Any]

Checkpoint data to save

required

Returns:

Type Description
bool

True if successful

Source code in ondine/adapters/checkpoint_storage.py
@abstractmethod
def save(self, session_id: UUID, data: dict[str, Any]) -> bool:
    """
    Save checkpoint data.

    Args:
        session_id: Unique session identifier
        data: Checkpoint data to save

    Returns:
        True if successful
    """
    pass

load abstractmethod

load(session_id: UUID) -> dict[str, Any] | None

Load latest checkpoint data.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
dict[str, Any] | None

Checkpoint data or None if not found

Source code in ondine/adapters/checkpoint_storage.py
@abstractmethod
def load(self, session_id: UUID) -> dict[str, Any] | None:
    """
    Load latest checkpoint data.

    Args:
        session_id: Session identifier

    Returns:
        Checkpoint data or None if not found
    """
    pass

list_checkpoints abstractmethod

list_checkpoints() -> list[CheckpointInfo]

List all available checkpoints.

Returns:

Type Description
list[CheckpointInfo]

List of checkpoint information

Source code in ondine/adapters/checkpoint_storage.py
@abstractmethod
def list_checkpoints(self) -> list[CheckpointInfo]:
    """
    List all available checkpoints.

    Returns:
        List of checkpoint information
    """
    pass

delete abstractmethod

delete(session_id: UUID) -> bool

Delete checkpoint for session.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if deleted

Source code in ondine/adapters/checkpoint_storage.py
@abstractmethod
def delete(self, session_id: UUID) -> bool:
    """
    Delete checkpoint for session.

    Args:
        session_id: Session identifier

    Returns:
        True if deleted
    """
    pass

exists abstractmethod

exists(session_id: UUID) -> bool

Check if checkpoint exists.

Parameters:

Name Type Description Default
session_id UUID

Session identifier

required

Returns:

Type Description
bool

True if exists

Source code in ondine/adapters/checkpoint_storage.py
@abstractmethod
def exists(self, session_id: UUID) -> bool:
    """
    Check if checkpoint exists.

    Args:
        session_id: Session identifier

    Returns:
        True if exists
    """
    pass

LocalFileCheckpointStorage

LocalFileCheckpointStorage(checkpoint_dir: Path = Path('.checkpoints'), use_json: bool = True)

Bases: CheckpointStorage

Local filesystem checkpoint storage implementation.

Stores checkpoints as JSON files for human readability and debugging.

Initialize local file checkpoint storage.

Parameters:

Name Type Description Default
checkpoint_dir Path

Directory for checkpoints

Path('.checkpoints')
use_json bool

Use JSON format (True) or pickle (False)

True
Source code in ondine/adapters/checkpoint_storage.py
def __init__(
    self,
    checkpoint_dir: Path = Path(".checkpoints"),
    use_json: bool = True,
):
    """
    Initialize local file checkpoint storage.

    Args:
        checkpoint_dir: Directory for checkpoints
        use_json: Use JSON format (True) or pickle (False)
    """
    self.checkpoint_dir = checkpoint_dir
    self.use_json = use_json

    # Create directory if doesn't exist
    self.checkpoint_dir.mkdir(parents=True, exist_ok=True)

save

save(session_id: UUID, data: dict[str, Any]) -> bool

Save checkpoint to local file.

Source code in ondine/adapters/checkpoint_storage.py
def save(self, session_id: UUID, data: dict[str, Any]) -> bool:
    """Save checkpoint to local file."""
    checkpoint_path = self._get_checkpoint_path(session_id)

    # Add metadata
    checkpoint_data = {
        "version": "1.0",
        "session_id": str(session_id),
        "timestamp": datetime.now().isoformat(),
        "data": data,
    }

    try:
        if self.use_json:
            with open(checkpoint_path, "w") as f:
                json.dump(
                    checkpoint_data,
                    f,
                    indent=2,
                    default=str,  # Handle non-serializable types
                )
        else:
            with open(checkpoint_path, "wb") as f:
                pickle.dump(checkpoint_data, f)

        return True
    except Exception:
        return False

load

load(session_id: UUID) -> dict[str, Any] | None

Load checkpoint from local file.

Source code in ondine/adapters/checkpoint_storage.py
def load(self, session_id: UUID) -> dict[str, Any] | None:
    """Load checkpoint from local file."""
    checkpoint_path = self._get_checkpoint_path(session_id)

    if not checkpoint_path.exists():
        return None

    try:
        if self.use_json:
            with open(checkpoint_path) as f:
                checkpoint_data = json.load(f)
        else:
            with open(checkpoint_path, "rb") as f:
                checkpoint_data = pickle.load(f)

        return checkpoint_data.get("data")
    except Exception:
        return None

list_checkpoints

list_checkpoints() -> list[CheckpointInfo]

List all checkpoints in directory.

Source code in ondine/adapters/checkpoint_storage.py
def list_checkpoints(self) -> list[CheckpointInfo]:
    """List all checkpoints in directory."""
    checkpoints = []

    pattern = "*.json" if self.use_json else "*.pkl"
    for checkpoint_file in self.checkpoint_dir.glob(pattern):
        try:
            # Extract session ID from filename
            session_id_str = checkpoint_file.stem.replace("checkpoint_", "")
            session_id = UUID(session_id_str)

            # Get file stats
            stat = checkpoint_file.stat()

            # Try to load checkpoint for additional info
            data = self.load(session_id)
            row_index = data.get("last_processed_row", 0) if data else 0
            stage_index = data.get("current_stage_index", 0) if data else 0

            checkpoints.append(
                CheckpointInfo(
                    session_id=session_id,
                    checkpoint_path=str(checkpoint_file),
                    row_index=row_index,
                    stage_index=stage_index,
                    timestamp=datetime.fromtimestamp(stat.st_mtime),
                    size_bytes=stat.st_size,
                )
            )
        except Exception:  # nosec B112
            # Skip invalid checkpoint files
            continue

    return sorted(checkpoints, key=lambda x: x.timestamp, reverse=True)

delete

delete(session_id: UUID) -> bool

Delete checkpoint file.

Source code in ondine/adapters/checkpoint_storage.py
def delete(self, session_id: UUID) -> bool:
    """Delete checkpoint file."""
    checkpoint_path = self._get_checkpoint_path(session_id)

    if checkpoint_path.exists():
        try:
            checkpoint_path.unlink()
            return True
        except Exception:
            return False
    return False

exists

exists(session_id: UUID) -> bool

Check if checkpoint exists.

Source code in ondine/adapters/checkpoint_storage.py
def exists(self, session_id: UUID) -> bool:
    """Check if checkpoint exists."""
    return self._get_checkpoint_path(session_id).exists()

cleanup_old_checkpoints

cleanup_old_checkpoints(days: int = 7) -> int

Delete checkpoints older than specified days.

Parameters:

Name Type Description Default
days int

Age threshold in days

7

Returns:

Type Description
int

Number of checkpoints deleted

Source code in ondine/adapters/checkpoint_storage.py
def cleanup_old_checkpoints(self, days: int = 7) -> int:
    """
    Delete checkpoints older than specified days.

    Args:
        days: Age threshold in days

    Returns:
        Number of checkpoints deleted
    """
    deleted = 0
    cutoff = datetime.now().timestamp() - (days * 86400)

    pattern = "*.json" if self.use_json else "*.pkl"
    for checkpoint_file in self.checkpoint_dir.glob(pattern):
        if checkpoint_file.stat().st_mtime < cutoff:
            try:
                checkpoint_file.unlink()
                deleted += 1
            except Exception:  # nosec B112
                continue

    return deleted