Source code for time_stream.infill

"""
Time Series Infill Module

This module provides a flexible framework for filling missing values (infilling) in time series data using
Polars and SciPy. Infill methods are implemented as subclasses of ``InfillMethod`` and can be registered
and instantiated by name, class, or instance.

The infill pipeline handles:

- Padding the time series to ensure consistent timestamps
- Identifying gaps and their sizes
- Applying constraints such as maximum gap size and observation intervals
- Delegating to a specific infill method to fill missing values
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any

import numpy as np
import polars as pl
from scipy.interpolate import Akima1DInterpolator, PchipInterpolator, make_interp_spline

from time_stream import Period
from time_stream.exceptions import InfillError, InfillInsufficientValuesError
from time_stream.operation import Operation
from time_stream.utils import check_columns_in_dataframe, gap_size_count, get_date_filter, pad_time


@dataclass(frozen=True)
class InfillCtx:
    """Immutable context passed to infill methods."""

    df: pl.DataFrame
    time_name: str
    periodicity: Period


class InfillMethod(Operation, ABC):
    """Base class for infill methods."""

    def _infilled_column_name(self, infill_column: str) -> str:
        """Return the name of the infilled column."""
        return f"{infill_column}_{self.name}"

    @abstractmethod
    def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame:
        """Return the Polars dataframe containing infilled data.

        Args:
            df: The DataFrame to infill.
            infill_column: The column to infill.
            ctx: The infill context.

        Returns:
            pl.DataFrame with infilled values
        """
        pass

    def apply(
        self,
        df: pl.DataFrame,
        time_name: str,
        periodicity: Period,
        infill_column: str,
        observation_interval: datetime | tuple[datetime, datetime | None] | None = None,
        max_gap_size: int | None = None,
    ) -> pl.DataFrame:
        """Apply the infill method to the time series data.

        Args:
            df: The Polars DataFrame containing the time series data to infill
            time_name: Name of the time column in the dataframe
            infill_column: The column to infill data within.
            periodicity: Periodicity of the time series
            observation_interval: Optional time interval to limit the infilling to.
            max_gap_size: The maximum size of consecutive null gaps that should be filled. Any gap larger than this
                          will not be infilled and will remain as null.
        Returns:
            The infilled time series
        """
        ctx = InfillCtx(df, time_name, periodicity)
        pipeline = InfillMethodPipeline(self, ctx, infill_column, observation_interval, max_gap_size)
        return pipeline.execute()


[docs] class InfillMethodPipeline: """Encapsulates the logic for the infill pipeline steps."""
[docs] def __init__( self, infill_method: InfillMethod, ctx: InfillCtx, column: str, observation_interval: datetime | tuple[datetime, datetime | None] | None = None, max_gap_size: int | None = None, ): self.infill_method = infill_method self.ctx = ctx self.column = column self.observation_interval = observation_interval self.max_gap_size = max_gap_size
def execute(self) -> pl.DataFrame: """Execute the infill pipeline""" self._validate() # We need to make sure the data is padded so that missing time steps are filled with nulls df = pad_time(self.ctx.df, self.ctx.time_name, self.ctx.periodicity) # Calculate sizes of each gap in the time series df = gap_size_count(df, self.column) # Create a mask determining which values get infilled infill_mask = self._infill_mask() # Check if there is actually anything to infill if df.filter(infill_mask).is_empty(): # If not, return the original data return self.ctx.df # Apply the specific infill logic from the child class df_infilled = self.infill_method._fill(df, self.column, self.ctx) infilled_column = self.infill_method._infilled_column_name(self.column) # Limit the infilled data to where the infill mask is True df_infilled = df_infilled.with_columns( pl.when(infill_mask).then(pl.col(infilled_column)).otherwise(pl.col(self.column)).alias(infilled_column) ) # Do some tidying up of columns, leaving only the original column names df_infilled = df_infilled.with_columns( pl.col(infilled_column).alias(self.column) # Rename the infilled column back to the original name ).drop([infilled_column, "gap_size"], strict=False) # Drop the temporary processing columns return df_infilled def _validate(self) -> None: """Carry out validation that the infill method can actually be carried out.""" if self.ctx.df.is_empty(): raise InfillError("Cannot perform infilling on an empty DataFrame.") check_columns_in_dataframe(self.ctx.df, [self.column, self.ctx.time_name]) def _infill_mask(self) -> pl.Expr: """Create a mask for determining which values in a time series to infill. Take into account: - Observation interval - constraining the time series to a specific datetime range - Maximum gap size - constraining the infilling to gaps of a maximum size - Start and end gaps - constraining so nulls at the beginning and end of the series remain null. Returns: Polars expression that can be used to determine which values to infill (True) or not (False) """ # Base assumption is that any gap can be infilled filter_expr = pl.col("gap_size") > 0 # Check for any gaps if self.max_gap_size: # If constrained, change the filter to check if there is any missing data with: 0 < gap <= max_gap_size filter_expr = pl.col("gap_size").is_between(0, self.max_gap_size, closed="right") # Apply observation interval constraint if self.observation_interval: # Check if these gaps are within the specified observation interval filter_expr = filter_expr & get_date_filter(self.ctx.time_name, self.observation_interval) # Make a mask to ensure that Nulls at the beginning and end of the series remain null. not_null_mask = pl.col(self.column).is_not_null() row_idx = pl.arange(0, pl.len()) filter_expr = filter_expr & row_idx.is_between( (row_idx.filter(not_null_mask).min()), # first True (row_idx.filter(not_null_mask).max()), # last True ) return filter_expr
class ScipyInterpolation(InfillMethod, ABC): """Base class for scipy-based interpolation methods.""" def __init__(self, **kwargs): """Initialize a scipy interpolation method. Args: **kwargs: Additional parameters passed to scipy interpolator method. """ self.scipy_kwargs = kwargs @abstractmethod def _create_interpolator(self, x_valid: np.ndarray, y_valid: np.ndarray) -> Any: """Create the scipy interpolator object. Args: x_valid: Array of row indices (0, 1, 2, ...) corresponding to non-null data points. For example, if rows 0, 2, 5 have valid data, x_valid = [0, 2, 5]. y_valid: Array of actual data values at those row indices. Returns: Scipy interpolator object. Raises: ValueError: If insufficient data for this interpolation method. Example: If original data is [10.5, NaN, 12.3, NaN, NaN, 9.8]: - x_valid = [0, 2, 5] (row indices of non-null values) - y_valid = [10.5, 12.3, 9.8] (the actual non-null values) - The interpolator will estimate values for indices 1, 3, 4 """ pass @property @abstractmethod def min_points_required(self) -> int: """Minimum number of data points required for this interpolation method.""" pass def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame: """Apply scipy interpolation to fill missing values in the specified column. This method handles the common scipy interpolation workflow: 1. Converts data to numpy arrays for scipy compatibility 2. Identifies valid (non-null) data points for interpolation 3. Validates that sufficient data points exist for interpolation method 4. Creates and applies the specific scipy interpolator 5. Handles edge cases like infinite values in the interpolated result 6. Returns the DataFrame with a new column containing interpolated values Args: df: The DataFrame to infill. infill_column: The column to infill. ctx: The infill context. Returns: pl.DataFrame with infilled values """ # Convert to numpy values = df[infill_column].to_numpy() x = np.arange(len(values)) # Find non-null points mask = ~np.isnan(values) n_valid = np.sum(mask) # Check if we have enough points if n_valid < self.min_points_required: raise InfillInsufficientValuesError( f"Infill method '{self.name}' requires at least {self.min_points_required} data points, " f"but only {n_valid} valid points found." ) x_valid = x[mask] y_valid = values[mask] # Create the specific interpolator interpolator = self._create_interpolator(x_valid, y_valid) # Apply interpolation interpolated = interpolator(x) # Handle any remaining NaNs or infinities interpolated = np.where(np.isfinite(interpolated), interpolated, np.nan) return df.with_columns(pl.Series(self._infilled_column_name(infill_column), interpolated))
[docs] @InfillMethod.register class BSplineInterpolation(ScipyInterpolation): """B-spline interpolation using scipy make_interp_spline with configurable order. https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.make_interp_spline.html """ name = "bspline"
[docs] def __init__(self, order: int, **kwargs): """Initialize B-spline interpolation. Args: order: Order of the B-spline (1-5, where 3=cubic, 2=quadratic, 1=linear). **kwargs: Additional scipy parameters for the `make_interp_spline` method. """ super().__init__(**kwargs) self.order = order
@property def min_points_required(self) -> int: """B-spline needs at least order+1 points.""" return self.order + 1 def _create_interpolator(self, x_valid: np.ndarray, y_valid: np.ndarray) -> Any: """Create scipy B-spline interpolator.""" return make_interp_spline(x_valid, y_valid, k=self.order, **self.scipy_kwargs)
[docs] @InfillMethod.register class LinearInterpolation(BSplineInterpolation): """Linear spline interpolation (Convenience wrapper around B-spline with order=1). https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.make_interp_spline.html """ name = "linear"
[docs] def __init__(self, **kwargs): """Initialize linear interpolation.""" super().__init__(order=1, **kwargs)
[docs] @InfillMethod.register class QuadraticInterpolation(BSplineInterpolation): """Quadratic spline interpolation (Convenience wrapper around B-spline with order=2). https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.make_interp_spline.html """ name = "quadratic"
[docs] def __init__(self, **kwargs): """Initialize quadratic interpolation.""" super().__init__(order=2, **kwargs)
[docs] @InfillMethod.register class CubicInterpolation(BSplineInterpolation): """Cubic spline interpolation (Convenience wrapper around B-spline with order=3). https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.make_interp_spline.html """ name = "cubic"
[docs] def __init__(self, **kwargs): """Initialize cubic interpolation.""" super().__init__(order=3, **kwargs)
[docs] @InfillMethod.register class AkimaInterpolation(ScipyInterpolation): """Akima interpolation using scipy (good for avoiding oscillations). https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.Akima1DInterpolator.html """ name = "akima" min_points_required = 5 # type: ignore[override] def _create_interpolator(self, x_valid: np.ndarray, y_valid: np.ndarray) -> Any: """Create scipy Akima interpolator.""" return Akima1DInterpolator(x_valid, y_valid, **self.scipy_kwargs)
[docs] @InfillMethod.register class PchipInterpolation(ScipyInterpolation): """PCHIP interpolation using scipy (preserves monotonicity). https://docs.scipy.org/doc/scipy-1.16.1/reference/generated/scipy.interpolate.PchipInterpolator.html """ name = "pchip" min_points_required = 2 # type: ignore[override] def _create_interpolator(self, x_valid: np.ndarray, y_valid: np.ndarray) -> Any: """Create scipy PCHIP interpolator.""" return PchipInterpolator(x_valid, y_valid, **self.scipy_kwargs)
[docs] @InfillMethod.register class AltData(InfillMethod): """Infill from an alternative data source, with optional correction factor.""" name = "alt_data"
[docs] def __init__(self, alt_data_column: str, correction_factor: float = 1.0, alt_df: pl.DataFrame | None = None): """Initialize the alternative data infill method. Args: alt_data_column: The name of the column providing the alternative data. correction_factor: An optional correction factor to apply to the alternative data. alt_df: The DataFrame containing the alternative data. """ self.alt_data_column = alt_data_column self.correction_factor = correction_factor self.alt_df = alt_df
def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame: """Fill missing values using data from the alternative column. Args: df: The DataFrame to infill. infill_column: The column to infill. ctx: The infill context. Returns: pl.DataFrame with infilled values. """ if self.alt_df is None: check_columns_in_dataframe(df, [self.alt_data_column]) alt_data_column_name = self.alt_data_column else: time_column_name = ctx.time_name check_columns_in_dataframe(self.alt_df, [time_column_name, self.alt_data_column]) alt_data_column_name = f"__ALT_DATA__{self.alt_data_column}" alt_df = self.alt_df.select([time_column_name, self.alt_data_column]).rename( {self.alt_data_column: alt_data_column_name} ) df = df.join( alt_df, on=time_column_name, how="left", suffix="_alt", ) infilled = df.with_columns( pl.when(pl.col(infill_column).is_null()) .then(pl.col(alt_data_column_name) * self.correction_factor) .otherwise(pl.col(infill_column)) .alias(self._infilled_column_name(infill_column)) ) if self.alt_df is not None: infilled = infilled.drop(alt_data_column_name) return infilled