"""
Core TimeFrame Data Model.
This module defines the `TimeFrame` class, the central abstraction of the time_stream package.
A `TimeFrame` wraps a Polars DataFrame and adds functionality for handling temporal data, quality flags, metadata, and
derived operations.
Main responsibilities
---------------------
1. **Time management** (via `TimeManager`):
- Validates that the time column exists and has a temporal dtype.
- Enforces resolution and periodicity of the time series.
- Uses time anchoring to define the period over which values are valid (`POINT`, `START`, `END`).
- Detects duplicates and resolves them according to a strategy (`ERROR`, `KEEP_FIRST`, `KEEP_LAST`, `DROP`, `MERGE`).
- Prevents mutation of time values between operations.
2. **Metadata management**:
- TimeFrame-level metadata (`.metadata`).
- Per-column metadata (`.column_metadata`) that stays in sync with the DataFrame.
3. **Flag management** (via `FlagManager`):
- Register reusable flag systems (based on `BitwiseFlag` enums).
- Initialise flag columns linked to data columns.
- Add/remove flags with Polars expressions.
4. **Data operations**:
- Aggregation: run `AggregationFunction` pipelines with support for missing-data criteria and time anchoring.
- Quality control: apply `QCCheck` classes to detect anomalies or enforce validation rules.
- Infilling: apply `InfillMethod` classes to fill missing values according to defined strategies.
- Column selection: return reduced TimeFrame with metadata and flags synced consistently.
"""
from copy import deepcopy
from datetime import datetime
from typing import Any, Self, Sequence, Type
import polars as pl
from time_stream.aggregation import AggregationFunction
from time_stream.bitwise import BitwiseFlag
from time_stream.enums import DuplicateOption, TimeAnchor
from time_stream.exceptions import ColumnNotFoundError, MetadataError
from time_stream.flag_manager import FlagColumn, FlagManager, FlagSystemType
from time_stream.infill import InfillMethod
from time_stream.metadata import ColumnMetadataDict
from time_stream.period import Period
from time_stream.qc import QCCheck
from time_stream.time_manager import TimeManager
from time_stream.utils import check_columns_in_dataframe, configure_period_object, pad_time
[docs]
class TimeFrame:
"""A class representing a time series data model, with data held in a Polars DataFrame.
Args:
df: The ``Polars`` DataFrame containing the time series data.
time_name: The name of the time column in the DataFrame.
resolution: The resolution of the time series.
periodicity: The periodicity of the time series.
time_anchor: The time anchor to which the date/times of the time series conform to.
on_duplicates: What to do if duplicate rows are found in the data. Default to ERROR.
"""
_df: pl.DataFrame
_time_manager: TimeManager
_flag_manager: FlagManager
_metadata: dict[str, Any]
_column_metadata: ColumnMetadataDict
def __init__(
self,
df: pl.DataFrame,
time_name: str,
resolution: Period | str | None = None,
periodicity: Period | str | None = None,
time_anchor: TimeAnchor | str = TimeAnchor.START,
on_duplicates: DuplicateOption | str = DuplicateOption.ERROR,
) -> None:
self._time_manager = TimeManager(
time_name=time_name,
resolution=resolution,
periodicity=periodicity,
on_duplicates=on_duplicates,
time_anchor=time_anchor,
)
self._df = self._time_manager._handle_time_duplicates(df)
self._time_manager.validate(self.df)
self.sort_time()
self._metadata = {}
self._column_metadata = ColumnMetadataDict(lambda: self.df.columns)
self._flag_manager = FlagManager()
def copy(self, share_df: bool = True) -> Self:
"""Return a shallow copy of this ``TimeFrame``, either sharing or cloning the underlying DataFrame.
Args:
share_df: If True, the copy references the same DataFrame object. If False, a cloned DataFrame is used.
Returns:
A copy of this TimeFrame
"""
df = self.df if share_df else self.df.clone()
out = TimeFrame(
df,
time_name=self.time_name,
resolution=self.resolution,
periodicity=self.periodicity,
time_anchor=self.time_anchor,
)
out.metadata = deepcopy(self._metadata)
out.column_metadata.update(deepcopy(self._column_metadata))
out._flag_manager = self._flag_manager.copy()
return out
[docs]
def with_df(self, new_df: pl.DataFrame) -> Self:
"""Return a new TimeFrame with a new DataFrame, checking the integrity of the time values hasn't
been compromised between the old and new TimeFrame.
Args:
new_df: The new Polars DataFrame to set as the new time series data.
"""
old_df = self._df.clone()
self._time_manager._check_time_integrity(old_df, new_df)
tf = self.copy()
tf._df = new_df
tf._column_metadata.sync()
return tf
[docs]
def with_flag_system(self, name: str, flag_system: FlagSystemType) -> Self:
"""Return a new TimeFrame, with a flag system registered.
Args:
name: Short name for the flag system
flag_system: The flag system to register
Returns:
A new TimeFrame with flag system set.
"""
tf = self.copy()
tf.register_flag_system(name, flag_system)
return tf
@property
def metadata(self) -> dict[str, Any]:
"""TimeFrame-level metadata."""
return self._metadata
@metadata.setter
def metadata(self, value: dict[str, Any] | None) -> None:
"""Set the TimeFrame-level metadata.
This method checks type of object being set to ensure we continue to work with expected dicts.
Args:
value: The new metadata to set.
"""
if value is None:
self._metadata = {}
elif isinstance(value, dict):
self._metadata = value
else:
raise MetadataError(f"TimeFrame-level metadata must be a dict object. Got: '{type(value)}'")
@metadata.deleter
def metadata(self) -> None:
"""Clear TimeFrame-level metadata."""
self._metadata.clear()
@property
def column_metadata(self) -> dict[str, dict[str, Any]]:
"""Per-column metadata."""
return self._column_metadata
@column_metadata.setter
def column_metadata(self, value: dict[str, dict[str, Any]] | None) -> None:
"""Set the column-level metadata.
This method checks type of object being set to ensure we continue to work with expected dicts.
Args:
value: The new metadata to set.
"""
if value is None:
# Reset all the columns metadata to empty dicts
self._column_metadata = ColumnMetadataDict(lambda: self.df.columns)
elif isinstance(value, dict):
self.column_metadata.update(value)
else:
raise MetadataError(f"Column-level metadata must be a dict object. Got: '{type(value)}'")
@column_metadata.deleter
def column_metadata(self) -> None:
"""Clear all per-column metadata."""
self._column_metadata.clear()
@property
def time_name(self) -> str:
"""The name of the primary datetime column in the underlying TimeFrame DataFrame."""
return self._time_manager.time_name
@property
def resolution(self) -> Period:
"""The resolution of the timeseries data within the TimeFrame"""
return self._time_manager.resolution
@property
def periodicity(self) -> Period:
"""The periodicity of the timeseries data within the TimeFrame"""
return self._time_manager.periodicity
@property
def time_anchor(self) -> TimeAnchor:
"""The time anchor of the timeseries data within the TimeFrame"""
return self._time_manager.time_anchor
@property
def df(self) -> pl.DataFrame:
"""The underlying ``Polars`` DataFrame containing the timeseries data."""
return self._df
@property
def columns(self) -> list[str]:
"""All column labels of the DataFrame within the TimeFrame."""
return [c for c in self.df.columns if c != self.time_name]
@property
def flag_columns(self) -> list[str]:
"""Only the labels for any flag columns within the TimeFrame."""
return list(self._flag_manager.flag_columns.keys())
@property
def data_columns(self) -> list[str]:
"""Only the labels for the data columns within the TimeFrame."""
return [c for c in self.columns if c not in self.flag_columns]
[docs]
def sort_time(self) -> None:
"""Sort the TimeFrame DataFrame by the time column."""
self._df = self.df.sort(self.time_name)
[docs]
def pad(self) -> None:
"""Pad the time series with missing datetime rows, filling in NULLs for missing values."""
self._df = pad_time(self.df, self.time_name, self.periodicity, self.time_anchor)
self.sort_time()
[docs]
def register_flag_system(self, name: str, flag_system: FlagSystemType) -> None:
"""Register a named flag system with the internal flag manager.
Args:
name: Short name for the flag system
flag_system: The flag system to register, provided either as:
- a dict mapping of flag names to single-bit integer values, or;
- a ``BitwiseFlag`` enum class, whose members are single-bit integers.
"""
self._flag_manager.register_flag_system(name, flag_system)
[docs]
def get_flag_system(self, name: str) -> type[BitwiseFlag]:
"""Return a registered flag system.
Args:
name: The registered flag system name
Returns:
The ``BitwiseFlag`` enum that defines the flag system.
"""
return self._flag_manager.get_flag_system(name)
[docs]
def register_flag_column(self, column_name: str, base: str, flag_system: str) -> None:
"""Mark the specified existing column as a flag column.
This does not modify the DataFrame; it only records that ``name`` is a flag column associated with the
value column ``base``, with values handled by the flag system ``flag_system``.
Args:
column_name: A column name to mark as a flag column.
base: Name of the value/data column this flag column refers to.
flag_system: The name of the flag system.
"""
check_columns_in_dataframe(self.df, [column_name, base])
self._flag_manager.register_flag_column(column_name, base, flag_system)
[docs]
def init_flag_column(
self, base: str, flag_system: str, column_name: str | None = None, data: int | Sequence[int] = 0
) -> None:
"""Add a new column to the TimeFrame DataFrame, setting it as a Flag Column.
Args:
base: Name of the value/data column this flag column will refer to.
flag_system: The name of the flag system.
column_name: Optional name for the new flag column. If omitted, a name of the
form "{base}__flag__{flag_system}" is used.
data: The default value to populate the flag column with. Can be a scalar or list-like. Defaults to 0.
"""
check_columns_in_dataframe(self.df, base)
# Validate that the flag system exists
self.get_flag_system(flag_system)
# Set up data that will go into the new column
if isinstance(data, int):
data = pl.lit(data, dtype=pl.Int64)
else:
data = pl.Series(data, dtype=pl.Int64)
# Determine name of flag column
if not column_name:
column_name = f"{base}__flag__{flag_system}"
# Add and register as a flag column
self._df = self.df.with_columns(data.alias(column_name))
self.register_flag_column(column_name, base, flag_system)
self._column_metadata.sync()
[docs]
def get_flag_column(self, flag_column_name: str) -> FlagColumn:
"""Look up a registered flag column by name.
Args:
flag_column_name: Flag column name.
Returns:
The corresponding ``FlagColumn`` object.
"""
return self._flag_manager.get_flag_column(flag_column_name)
[docs]
def add_flag(self, flag_column_name: str, flag_value: int | str, expr: pl.Expr = pl.lit(True)) -> None:
"""Add flag value (if not there) to flag column, where expression is True.
Args:
flag_column_name: The name of the flag column
flag_value: The flag value to add
expr: Polars expression for which rows to add flag to
"""
flag_column = self.get_flag_column(flag_column_name)
self._df = flag_column.add_flag(self.df, flag_value, expr)
[docs]
def remove_flag(self, column_name: str, flag_value: int | str, expr: pl.Expr = pl.lit(True)) -> None:
"""
Remove flag value (if there) from flag column.
Args:
column_name: The name of the flag column
flag_value: The flag value to remove
expr: Polars expression for which rows to remove flag from
"""
flag_column = self.get_flag_column(column_name)
self._df = flag_column.remove_flag(self.df, flag_value, expr)
[docs]
def aggregate(
self,
aggregation_period: Period | str,
aggregation_function: str | Type[AggregationFunction] | AggregationFunction,
columns: str | list[str] | None = None,
missing_criteria: tuple[str, float | int] | None = None,
aggregation_time_anchor: TimeAnchor | None = None,
) -> Self:
"""Apply an aggregation function to a column in this TimeFrame, check the aggregation satisfies user
requirements and return a new derived TimeFrame containing the aggregated data.
Args:
aggregation_period: The period over which to aggregate the data
aggregation_function: The aggregation function to apply
columns: The column(s) containing the data to be aggregated. If omitted, will use all data columns.
missing_criteria: How the aggregation handles missing data
aggregation_time_anchor: The time anchor for the aggregation result.
Returns:
A TimeFrame containing the aggregated data.
"""
# Get the aggregation function instance and run the apply method
agg_func = AggregationFunction.get(aggregation_function)
aggregation_period = configure_period_object(aggregation_period)
aggregation_time_anchor = TimeAnchor(aggregation_time_anchor) if aggregation_time_anchor else self.time_anchor
if not columns:
columns = self.data_columns
agg_df = agg_func.apply(
self.df,
self.time_name,
self.time_anchor,
self.periodicity,
aggregation_period,
columns,
missing_criteria=missing_criteria,
aggregation_time_anchor=aggregation_time_anchor,
)
tf = TimeFrame(
df=agg_df,
time_name=self.time_name,
resolution=aggregation_period,
periodicity=aggregation_period,
time_anchor=aggregation_time_anchor,
)
tf.metadata = deepcopy(self.metadata)
return tf
[docs]
def qc_check(
self,
check: str | Type[QCCheck] | QCCheck,
column_name: str,
observation_interval: tuple[datetime, datetime | None] | None = None,
into: str | bool = False,
**kwargs,
) -> "TimeFrame | pl.Series":
"""Apply a quality control check to the TimeFrame.
Args:
check: The QC check to apply.
column_name: The column to perform the check on.
observation_interval: Optional time interval to limit the check to.
into: Whether to add the result of the QC to the TimeFrame dataframe (True | string name of column to add),
or just return a boolean Series of the QC result (False).
**kwargs: Parameters specific to the check type.
Returns:
Result of the QC check, either as a boolean Series or added to the TimeFrame dataframe
Examples:
# Threshold check
ts_flagged = tf.qc_check("comparison", "battery_voltage", compare_to=12.0, operator="<")
# Range check
ts_flagged = tf.qc_check("range", "temperature", min_value=-50, max_value=50)
# Spike check
ts_flagged = tf.qc_check("spike", "wind_speed", threshold=5.0)
# Value check for error codes
ts_flagged = tf.qc_check("comparison", "status_code", compare_to=[99, -999, "ERROR"])
"""
# Get the QC check instance and run the apply method
check_instance = QCCheck.get(check, **kwargs)
qc_result = check_instance.apply(self.df, self.time_name, column_name, observation_interval)
# Return the boolean series, if requested
if not into:
return qc_result
# Determine the name of the column for the QC result
if isinstance(into, str):
qc_result_col_name = into
else:
qc_result_col_name = f"__qc__{column_name}__{check_instance.name}"
if qc_result_col_name in self.df.columns:
# Auto-suffix the column name to avoid accidental overwrite
col_suffix = 1
while f"{qc_result_col_name}__{col_suffix}" in self.df.columns:
col_suffix += 1
qc_result_col_name = f"{qc_result_col_name}__{col_suffix}"
# Create a copy of the current TimeFrame, and update the dataframe with the QC result
new_df = self.df.with_columns(pl.Series(qc_result_col_name, qc_result))
return self.with_df(new_df)
[docs]
def infill(
self,
infill_method: str | Type[InfillMethod] | InfillMethod,
column_name: str,
observation_interval: tuple[datetime, datetime | None] | None = None,
max_gap_size: int | None = None,
**kwargs,
) -> Self:
"""Apply an infilling method to a column in the TimeFrame to fill in missing data.
Args:
infill_method: The method to use for infilling
column_name: The column to infill
observation_interval: Optional time interval to limit the check to.
max_gap_size: The maximum size of consecutive null gaps that should be filled. Any gap larger than this
will not be infilled and will remain as null.
**kwargs: Parameters specific to the infill method.
Returns:
A TimeFrame containing the aggregated data.
"""
# Get the infill method instance and run the apply method
infill_instance = InfillMethod.get(infill_method, **kwargs)
infill_result = infill_instance.apply(
self.df, self.time_name, self.periodicity, column_name, observation_interval, max_gap_size
)
# Create result TimeFrame
# Create a copy of the current TimeFrame, and update the dataframe with the infilled data
return self.with_df(infill_result)
[docs]
def select(
self,
column_names: str | Sequence[str],
include_flag_columns: bool = True,
) -> Self:
"""Return a new TimeFrame instance to include only the specified columns.
By default, this:
- carries over TimeFrame-level metadata,
- prunes column-level metadata to the kept columns,
- rebuilds the flag manager to include only kept flag columns.
Args:
column_names: Column name(s) to retain in the updated TimeFrame.
include_flag_columns: If True, include any registered flag columns whose base is among the
kept value columns.
Returns:
New TimeFrame instance with only selected columns.
"""
if not column_names:
raise ColumnNotFoundError("No columns specified.")
if isinstance(column_names, str):
column_names = [column_names]
check_columns_in_dataframe(self.df, column_names)
# Include primary time column (if not already included)
if self.time_name not in column_names:
column_names.insert(0, self.time_name)
# Optionally include associated flag columns
if include_flag_columns:
for flag_name, flag_column in self._flag_manager.flag_columns.items():
# include if its base (value col) is being kept
if flag_column.base in column_names:
column_names.append(flag_name)
# Build new frame
new_df = self.df.select(column_names)
# New TimeFrame
tf = self.with_df(new_df)
# Prune column level metadata to kept columns
kept_metadata = {col: self.column_metadata[col] for col in column_names}
tf.column_metadata.clear()
tf.column_metadata.update(kept_metadata)
# Rebuild the flag registry for kept columns
new_flag_manager = FlagManager()
# re-register flag systems
for name, flag_system in self._flag_manager.flag_systems.items():
new_flag_manager.register_flag_system(name, flag_system.to_dict())
# keep only flag columns that survived
for flag_name, flag_column in self._flag_manager.flag_columns.items():
if flag_name in column_names:
new_flag_manager.register_flag_column(
flag_name, flag_column.base, flag_column.flag_system.system_name()
)
tf._flag_manager = new_flag_manager
tf._column_metadata.sync()
return tf
def __getitem__(self, key: str | Sequence[str]) -> Self:
"""Access columns using indexing syntax.
Args:
key: Column name(s) to access
Returns:
A new TimeFrame instance with the specified column(s) selected.
Notes:
This is equivalent to ``tf.select([...])``.
"""
if isinstance(key, str):
key = [key]
return self.select(key, include_flag_columns=False)
def __str__(self) -> str:
"""Return the string representation of the TimeFrame dataframe."""
return self.df.__str__()
def __repr__(self) -> str:
"""Returns the representation of the TimeFrame"""
return self.df.__repr__()
def __copy__(self) -> Self:
return self.copy(share_df=True)
def __deepcopy__(self, memo: dict) -> Self:
return self.copy(share_df=False)
def __eq__(self, other: object) -> bool:
"""Check if two TimeFrame instances are equal.
Args:
other: The object to compare.
Returns:
bool: True if the TimeFrame instances are equal, False otherwise.
"""
if not isinstance(other, TimeFrame):
return False
return (
self.df.equals(other.df)
and self.time_name == other.time_name
and self.resolution == other.resolution
and self.periodicity == other.periodicity
and self.time_anchor == other.time_anchor
and self._flag_manager == other._flag_manager
and self.metadata == other.metadata
and self.column_metadata == other.column_metadata
)
# Make class instances unhashable
__hash__ = None