Skip to content

data_io

data_io

Data I/O adapters for reading and writing tabular data.

Provides unified interface for multiple data formats following the Adapter pattern.

DataReader

Bases: ABC

Abstract base class for data readers.

Follows Open/Closed principle: open for extension via new readers, closed for modification.

read abstractmethod

read() -> pd.DataFrame

Read entire dataset.

Returns:

Type Description
DataFrame

DataFrame with all data

Source code in ondine/adapters/data_io.py
@abstractmethod
def read(self) -> pd.DataFrame:
    """
    Read entire dataset.

    Returns:
        DataFrame with all data
    """
    pass

read_chunked abstractmethod

read_chunked(chunk_size: int) -> Iterator[pd.DataFrame]

Read data in chunks for memory efficiency.

Parameters:

Name Type Description Default
chunk_size int

Number of rows per chunk

required

Yields:

Type Description
DataFrame

DataFrame chunks

Source code in ondine/adapters/data_io.py
@abstractmethod
def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
    """
    Read data in chunks for memory efficiency.

    Args:
        chunk_size: Number of rows per chunk

    Yields:
        DataFrame chunks
    """
    pass

CSVReader

CSVReader(file_path: Path, delimiter: str = ',', encoding: str = 'utf-8')

Bases: DataReader

CSV file reader implementation.

Initialize CSV reader.

Parameters:

Name Type Description Default
file_path Path

Path to CSV file

required
delimiter str

Column delimiter

','
encoding str

File encoding

'utf-8'
Source code in ondine/adapters/data_io.py
def __init__(
    self,
    file_path: Path,
    delimiter: str = ",",
    encoding: str = "utf-8",
):
    """
    Initialize CSV reader.

    Args:
        file_path: Path to CSV file
        delimiter: Column delimiter
        encoding: File encoding
    """
    self.file_path = file_path
    self.delimiter = delimiter
    self.encoding = encoding

read

read() -> pd.DataFrame

Read entire CSV file.

Source code in ondine/adapters/data_io.py
def read(self) -> pd.DataFrame:
    """Read entire CSV file."""
    return pd.read_csv(
        self.file_path,
        delimiter=self.delimiter,
        encoding=self.encoding,
    )

read_chunked

read_chunked(chunk_size: int) -> Iterator[pd.DataFrame]

Read CSV in chunks.

Source code in ondine/adapters/data_io.py
def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
    """Read CSV in chunks."""
    yield from pd.read_csv(
        self.file_path,
        delimiter=self.delimiter,
        encoding=self.encoding,
        chunksize=chunk_size,
    )

ExcelReader

ExcelReader(file_path: Path, sheet_name: str | int = 0)

Bases: DataReader

Excel file reader implementation.

Initialize Excel reader.

Parameters:

Name Type Description Default
file_path Path

Path to Excel file

required
sheet_name str | int

Sheet name or index

0
Source code in ondine/adapters/data_io.py
def __init__(self, file_path: Path, sheet_name: str | int = 0):
    """
    Initialize Excel reader.

    Args:
        file_path: Path to Excel file
        sheet_name: Sheet name or index
    """
    self.file_path = file_path
    self.sheet_name = sheet_name

read

read() -> pd.DataFrame

Read entire Excel file.

Source code in ondine/adapters/data_io.py
def read(self) -> pd.DataFrame:
    """Read entire Excel file."""
    return pd.read_excel(self.file_path, sheet_name=self.sheet_name)

read_chunked

read_chunked(chunk_size: int) -> Iterator[pd.DataFrame]

Read Excel in chunks.

Note: Excel doesn't support native chunking, so we load all and yield chunks.

Source code in ondine/adapters/data_io.py
def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
    """
    Read Excel in chunks.

    Note: Excel doesn't support native chunking, so we load all
    and yield chunks.
    """
    df = self.read()
    for i in range(0, len(df), chunk_size):
        yield df.iloc[i : i + chunk_size]

ParquetReader

ParquetReader(file_path: Path)

Bases: DataReader

Parquet file reader implementation.

Initialize Parquet reader.

Parameters:

Name Type Description Default
file_path Path

Path to Parquet file

required
Source code in ondine/adapters/data_io.py
def __init__(self, file_path: Path):
    """
    Initialize Parquet reader.

    Args:
        file_path: Path to Parquet file
    """
    self.file_path = file_path

read

read() -> pd.DataFrame

Read entire Parquet file.

Source code in ondine/adapters/data_io.py
def read(self) -> pd.DataFrame:
    """Read entire Parquet file."""
    return pd.read_parquet(self.file_path)

read_chunked

read_chunked(chunk_size: int) -> Iterator[pd.DataFrame]

Read Parquet in chunks using Polars for efficiency.

Source code in ondine/adapters/data_io.py
def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
    """
    Read Parquet in chunks using Polars for efficiency.
    """
    # Use Polars for efficient chunked reading
    lf = pl.scan_parquet(self.file_path)

    # Read in batches
    total_rows = lf.select(pl.len()).collect().item()

    for i in range(0, total_rows, chunk_size):
        chunk = lf.slice(i, chunk_size).collect().to_pandas()
        yield chunk

DataFrameReader

DataFrameReader(dataframe: DataFrame)

Bases: DataReader

In-memory DataFrame reader (pass-through).

Initialize DataFrame reader.

Parameters:

Name Type Description Default
dataframe DataFrame

Pandas DataFrame

required
Source code in ondine/adapters/data_io.py
def __init__(self, dataframe: pd.DataFrame):
    """
    Initialize DataFrame reader.

    Args:
        dataframe: Pandas DataFrame
    """
    self.dataframe = dataframe.copy()

read

read() -> pd.DataFrame

Return DataFrame copy.

Source code in ondine/adapters/data_io.py
def read(self) -> pd.DataFrame:
    """Return DataFrame copy."""
    return self.dataframe.copy()

read_chunked

read_chunked(chunk_size: int) -> Iterator[pd.DataFrame]

Yield DataFrame chunks.

Source code in ondine/adapters/data_io.py
def read_chunked(self, chunk_size: int) -> Iterator[pd.DataFrame]:
    """Yield DataFrame chunks."""
    for i in range(0, len(self.dataframe), chunk_size):
        yield self.dataframe.iloc[i : i + chunk_size].copy()

DataWriter

Bases: ABC

Abstract base class for data writers.

Follows Single Responsibility: only handles data persistence.

write abstractmethod

write(data: DataFrame, path: Path) -> WriteConfirmation

Write data to destination.

Parameters:

Name Type Description Default
data DataFrame

DataFrame to write

required
path Path

Destination path

required

Returns:

Type Description
WriteConfirmation

WriteConfirmation with details

Source code in ondine/adapters/data_io.py
@abstractmethod
def write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """
    Write data to destination.

    Args:
        data: DataFrame to write
        path: Destination path

    Returns:
        WriteConfirmation with details
    """
    pass

atomic_write abstractmethod

atomic_write(data: DataFrame, path: Path) -> WriteConfirmation

Write data atomically (with rollback on failure).

Parameters:

Name Type Description Default
data DataFrame

DataFrame to write

required
path Path

Destination path

required

Returns:

Type Description
WriteConfirmation

WriteConfirmation with details

Source code in ondine/adapters/data_io.py
@abstractmethod
def atomic_write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """
    Write data atomically (with rollback on failure).

    Args:
        data: DataFrame to write
        path: Destination path

    Returns:
        WriteConfirmation with details
    """
    pass

CSVWriter

CSVWriter(delimiter: str = ',', encoding: str = 'utf-8')

Bases: DataWriter

CSV file writer implementation.

Initialize CSV writer.

Parameters:

Name Type Description Default
delimiter str

Column delimiter

','
encoding str

File encoding

'utf-8'
Source code in ondine/adapters/data_io.py
def __init__(self, delimiter: str = ",", encoding: str = "utf-8"):
    """
    Initialize CSV writer.

    Args:
        delimiter: Column delimiter
        encoding: File encoding
    """
    self.delimiter = delimiter
    self.encoding = encoding

write

write(data: DataFrame, path: Path) -> WriteConfirmation

Write to CSV file.

Source code in ondine/adapters/data_io.py
def write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to CSV file."""
    data.to_csv(
        path,
        sep=self.delimiter,
        encoding=self.encoding,
        index=False,
    )

    return WriteConfirmation(
        path=str(path),
        rows_written=len(data),
        success=True,
    )

atomic_write

atomic_write(data: DataFrame, path: Path) -> WriteConfirmation

Write to CSV atomically.

Source code in ondine/adapters/data_io.py
def atomic_write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to CSV atomically."""
    temp_path = path.with_suffix(".tmp")

    try:
        # Write to temp file
        data.to_csv(
            temp_path,
            sep=self.delimiter,
            encoding=self.encoding,
            index=False,
        )

        # Atomic rename
        temp_path.replace(path)

        return WriteConfirmation(
            path=str(path),
            rows_written=len(data),
            success=True,
        )
    except Exception as e:
        # Cleanup on failure
        if temp_path.exists():
            temp_path.unlink()
        raise e

ExcelWriter

Bases: DataWriter

Excel file writer implementation.

write

write(data: DataFrame, path: Path) -> WriteConfirmation

Write to Excel file.

Source code in ondine/adapters/data_io.py
def write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to Excel file."""
    data.to_excel(path, index=False)

    return WriteConfirmation(
        path=str(path),
        rows_written=len(data),
        success=True,
    )

atomic_write

atomic_write(data: DataFrame, path: Path) -> WriteConfirmation

Write to Excel atomically.

Source code in ondine/adapters/data_io.py
def atomic_write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to Excel atomically."""
    temp_path = path.with_suffix(".tmp")

    try:
        data.to_excel(temp_path, index=False)
        temp_path.replace(path)

        return WriteConfirmation(
            path=str(path),
            rows_written=len(data),
            success=True,
        )
    except Exception as e:
        if temp_path.exists():
            temp_path.unlink()
        raise e

ParquetWriter

Bases: DataWriter

Parquet file writer implementation.

write

write(data: DataFrame, path: Path) -> WriteConfirmation

Write to Parquet file.

Source code in ondine/adapters/data_io.py
def write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to Parquet file."""
    data.to_parquet(path, index=False)

    return WriteConfirmation(
        path=str(path),
        rows_written=len(data),
        success=True,
    )

atomic_write

atomic_write(data: DataFrame, path: Path) -> WriteConfirmation

Write to Parquet atomically.

Source code in ondine/adapters/data_io.py
def atomic_write(self, data: pd.DataFrame, path: Path) -> WriteConfirmation:
    """Write to Parquet atomically."""
    temp_path = path.with_suffix(".tmp")

    try:
        data.to_parquet(temp_path, index=False)
        temp_path.replace(path)

        return WriteConfirmation(
            path=str(path),
            rows_written=len(data),
            success=True,
        )
    except Exception as e:
        if temp_path.exists():
            temp_path.unlink()
        raise e

create_data_reader

create_data_reader(source_type: DataSourceType, source_path: Path | None = None, dataframe: DataFrame | None = None, **kwargs: any) -> DataReader

Factory function to create appropriate data reader.

Parameters:

Name Type Description Default
source_type DataSourceType

Type of data source

required
source_path Path | None

Path to file (for file sources)

None
dataframe DataFrame | None

DataFrame (for DataFrame source)

None
**kwargs any

Additional reader-specific parameters

{}

Returns:

Type Description
DataReader

Configured DataReader

Raises:

Type Description
ValueError

If source type not supported or parameters invalid

Source code in ondine/adapters/data_io.py
def create_data_reader(
    source_type: DataSourceType,
    source_path: Path | None = None,
    dataframe: pd.DataFrame | None = None,
    **kwargs: any,
) -> DataReader:
    """
    Factory function to create appropriate data reader.

    Args:
        source_type: Type of data source
        source_path: Path to file (for file sources)
        dataframe: DataFrame (for DataFrame source)
        **kwargs: Additional reader-specific parameters

    Returns:
        Configured DataReader

    Raises:
        ValueError: If source type not supported or parameters invalid
    """
    if source_type == DataSourceType.CSV:
        if not source_path:
            raise ValueError("source_path required for CSV")
        return CSVReader(
            source_path,
            delimiter=kwargs.get("delimiter", ","),
            encoding=kwargs.get("encoding", "utf-8"),
        )
    if source_type == DataSourceType.EXCEL:
        if not source_path:
            raise ValueError("source_path required for Excel")
        return ExcelReader(source_path, sheet_name=kwargs.get("sheet_name", 0))
    if source_type == DataSourceType.PARQUET:
        if not source_path:
            raise ValueError("source_path required for Parquet")
        return ParquetReader(source_path)
    if source_type == DataSourceType.DATAFRAME:
        if dataframe is None:
            raise ValueError("dataframe required for DataFrame source")
        return DataFrameReader(dataframe)
    raise ValueError(f"Unsupported source type: {source_type}")

create_data_writer

create_data_writer(destination_type: DataSourceType) -> DataWriter

Factory function to create appropriate data writer.

Parameters:

Name Type Description Default
destination_type DataSourceType

Type of destination

required

Returns:

Type Description
DataWriter

Configured DataWriter

Raises:

Type Description
ValueError

If destination type not supported

Source code in ondine/adapters/data_io.py
def create_data_writer(destination_type: DataSourceType) -> DataWriter:
    """
    Factory function to create appropriate data writer.

    Args:
        destination_type: Type of destination

    Returns:
        Configured DataWriter

    Raises:
        ValueError: If destination type not supported
    """
    if destination_type == DataSourceType.CSV:
        return CSVWriter()
    if destination_type == DataSourceType.EXCEL:
        return ExcelWriter()
    if destination_type == DataSourceType.PARQUET:
        return ParquetWriter()
    raise ValueError(f"Unsupported destination: {destination_type}")