Source code for time_stream.base

"""
Core TimeFrame Data Model.

This module defines the :class:`TimeFrame` class, the central abstraction of the time_stream package.
A :class:`TimeFrame` wraps a Polars DataFrame and adds functionality for handling temporal data, quality flags,
metadata, and derived operations.

Main responsibilities
---------------------
1. **Time management**:
   - Validates that the time column exists and has a temporal dtype.
   - Enforces resolution and periodicity of the time series.
   - Uses time anchoring to define the period over which values are valid.
   - Detects duplicates and resolves them according to a strategy.
   - Prevents mutation of time values between operations.

2. **Metadata management**:
   - TimeFrame-level metadata (`.metadata`).
   - Per-column metadata (`.column_metadata`) that stays in sync with the DataFrame.

3. **Flag management**:
   - Register reusable flag systems.
   - Initialise flag columns linked to data columns.
   - Add/remove flags with Polars expressions.
   - Filter TimeFrame based on flag values

4. **Data operations**:
   - Aggregation: run aggregation pipelines with support for missing-data criteria and time anchoring.
   - Quality control: apply quality control routines to detect anomalies or enforce validation rules.
   - Infilling: apply infill methods to fill missing values according to defined strategies.
   - Column selection: return reduced :class:`TimeFrame` with metadata and flags synced consistently.
"""

from __future__ import annotations

from copy import deepcopy
from datetime import datetime, time
from typing import Any, Sequence, Type, overload

import polars as pl

from time_stream.aggregation import (
    AggregationCtx,
    AggregationFunction,
    RollingAggregationPipeline,
    StandardAggregationPipeline,
)
from time_stream.calculations import calculate_min_max_envelope
from time_stream.enums import ClosedInterval, DuplicateOption, RollingAlignment, TimeAnchor, ValidationErrorOptions
from time_stream.exceptions import (
    ColumnNotFoundError,
    ColumnTypeError,
    MetadataError,
)
from time_stream.flags.flag_manager import (
    CategoricalSingleFlagColumn,
    FlagColumn,
    FlagManager,
    FlagSystemType,
)
from time_stream.flags.flag_system import FlagSystemBase, FlagSystemLiteral
from time_stream.formatting import timeframe_repr
from time_stream.infill import InfillMethod
from time_stream.metadata import ColumnMetadataDict
from time_stream.period import Period
from time_stream.qc import QCCheck
from time_stream.time_manager import TimeManager
from time_stream.utils import TimeWindow, check_columns_in_dataframe, configure_period_object, pad_time


[docs] class TimeFrame: """A class representing a time series data model, with data held in a Polars DataFrame. Args: df: The :class:`polars.DataFrame` containing the time-series data. time_name: The name of the time column in ``df``. resolution: Sampling interval for the timeseries; the unit of time step allowable between consecutive data points. Accepts a :class:`Period` or ISO-8601 duration string (e.g. ``"PT15M"``, ``"P1D"``, ``"P1Y"``). If ``None``, defaults to microsecond step (PT0.000001S) (effectively allows any set of datetime values). offset: Offset applied from the natural boundary of ``resolution`` to position the datetime values along the timeline. For example, you may have daily data (``resolution="P1D"``), but all the values are measured at 9:00am, an offset of 9 hours (``"+T9H"``) from the natural boundary of midnight 00:00. Accepts an offset string, following the principles of ISO-8601 but replacing the "P" with a "+" (e.g. ``"+T9H"``, `"+9MT9H"`). If ``None``, no offset is applied. periodicity: Defines the allowed "frequency" of datetimes in your timeseries, i.e., how many datetime entries are allowed within a given period of time. For example, you may have an annual maximum timeseries, where the individual data points are considered to be at daily resolution (``resolution="P1D"``), but are limited to only one data point per year (``periodicity="P1Y"``). Accepts a :class:`Period` or ISO-8601 duration string (e.g. ``"PT15M"``, ``"P1D"``, ``"P1Y"``) with an optional offset syntax (e.g. ``"P1D+T9H"``, ``"P1Y+9MT9H"``). If ``None``, it defaults to the period defined by ``resolution + offset``. time_anchor: Defines the window of time over which a given timestamp refers to. In the descriptions below, "t" is the time value, "r" stands for a single unit of the resolution of the data: - ``POINT``: The time stamp is anchored for the instant of time "t". A value at "t" is considered valid only for the instant of time "t". - ``START``: The time stamp is anchored starting at "t". A value at "t" is considered valid starting at "t" (inclusive) and ending at "t+r" (exclusive). - ``END``: The time stamp is anchored ending at "t". A value at "t" is considered valid starting at "t-r" (exclusive) and ending at "t" (inclusive) on_duplicates: What to do if duplicate rows are found in the data: - ``ERROR`` (default): Raise error - ``KEEP_FIRST``: Keep the first row of any duplicate groups. - ``KEEP_LAST``: Keep the last row of any duplicate groups. - ``DROP``: Drop all duplicate rows. - ``MERGE``: Merge duplicate rows using coalesce (the first non-null value for each column takes precedence) on_misaligned_rows: What to do if misaligned rows are found in the data: - ``ERROR`` (default): Raise error - ``RESOLVE``: Remove any misaligned rows Examples: >>> # Simple 15 minute timeseries: >>> tf = TimeFrame( >>> df, "timestamp", resolution="PT15M" >>> ) >>> print( >>> "resolution=", tf.resolution, >>> " alignment=", tf.alignment, >>> " periodicity=", tf.periodicity >>> ) resoution=PT15M alignment=PT15M periodicity=PT15M >>> # Daily water day (09:00 to 09:00) with default uniqueness per water day: >>> >>> tf = TimeFrame( >>> df, "timestamp", resolution="P1D", offset="+T9H" >>> ) >>> print( >>> "resolution=", tf.resolution, >>> " alignment=", tf.alignment, >>> " periodicity=", tf.periodicity >>> ) resoution=P1D alignment=P1D+T9H periodicity=P1D+T9H >>> # Daily timestamps but uniqueness per water-year: >>> >>> tf = TimeFrame( >>> df, "timestamp", resolution="P1D", offset="+T9H", periodicity="P1Y+P9MT9H" >>> ) >>> print( >>> "resolution=", tf.resolution, >>> " alignment=", tf.alignment, >>> " periodicity=", tf.periodicity >>> ) resoution=P1D alignment=P1D+T9H periodicity=P1Y+P9MT9H >>> # Annual series stored directly on water-year boundary: >>> >>> tf = TimeFrame( >>> df, "timestamp", resolution="P1Y", offset="+9MT9H" >>> ) >>> print( >>> "resolution=", tf.resolution, >>> " alignment=", tf.alignment, >>> " periodicity=", tf.periodicity >>> ) resoution=P1Y alignment=P1D+9MT9H periodicity=P1Y+P9MT9H """ _df: pl.DataFrame _time_manager: TimeManager _flag_manager: FlagManager _metadata: dict[str, Any] _column_metadata: ColumnMetadataDict def __init__( self, df: pl.DataFrame, time_name: str, resolution: Period | str | None = None, offset: str | None = None, periodicity: Period | str | None = None, time_anchor: TimeAnchor | str = TimeAnchor.START, on_duplicates: DuplicateOption | str = DuplicateOption.ERROR, on_misaligned_rows: ValidationErrorOptions | str = ValidationErrorOptions.ERROR, ) -> None: self._time_manager = TimeManager( time_name=time_name, resolution=resolution, offset=offset, periodicity=periodicity, on_duplicates=on_duplicates, on_misaligned_rows=on_misaligned_rows, time_anchor=time_anchor, ) self._df = self._time_manager._handle_time_duplicates(df) self._df = self._time_manager._handle_misaligned_rows(self._df) self._time_manager.validate(self.df) self.sort_time() self._metadata = {} self._column_metadata = ColumnMetadataDict(lambda: self.df.columns) self._flag_manager = FlagManager() def copy(self, share_df: bool = True) -> TimeFrame: """Return a shallow copy of this ``TimeFrame``, either sharing or cloning the underlying DataFrame. Args: share_df: If True, the copy references the same DataFrame object. If False, a cloned DataFrame is used. Returns: A copy of this TimeFrame """ df = self.df if share_df else self.df.clone() out = TimeFrame( df, time_name=self.time_name, resolution=self.resolution, offset=self.offset, periodicity=self.periodicity, time_anchor=self.time_anchor, ) out.metadata = deepcopy(self._metadata) out.column_metadata.update(deepcopy(self._column_metadata)) out._flag_manager = self._flag_manager.copy() return out
[docs] def with_df(self, new_df: pl.DataFrame) -> TimeFrame: """Return a new TimeFrame with a new DataFrame, checking the integrity of the time values hasn't been compromised between the old and new TimeFrame. Args: new_df: The new Polars DataFrame to set as the new time series data. """ old_df = self._df.clone() self._time_manager._check_time_integrity(old_df, new_df) tf = self.copy() tf._df = new_df tf._column_metadata.sync() return tf
[docs] def with_metadata(self, metadata: dict[str, Any]) -> TimeFrame: """Return a new TimeFrame with TimeFrame-level metadata. Args: metadata: Mapping of arbitrary keys/values describing the time series as a whole. Returns: A new TimeFrame with timeFrame-level metadata has set to the provided metadata. """ tf = self.copy() tf.metadata = metadata return tf
[docs] def with_column_metadata(self, metadata: dict[str, dict[str, Any]]) -> TimeFrame: """Return a new TimeFrame with column-level metadata. Args: metadata: Mapping of column names to a dict of arbitrary keys/values describing the column. Returns: A new TimeFrame with column-level metadata has set to the provided metadata. """ tf = self.copy() tf.column_metadata.update(metadata) tf._column_metadata.sync() return tf
[docs] def with_flag_system( self, name: str, flag_system: FlagSystemType, flag_type: FlagSystemLiteral = "bitwise" ) -> TimeFrame: """Return a new TimeFrame, with a flag system registered. Args: name: Short name for the flag system. flag_system: The flag system definition. See ``register_flag_system`` for accepted types. flag_type: Whether to create a ``"bitwise"`` or ``"categorical"`` flag system. Only relevant when ``flag_system`` is a ``dict[str, int]`` or ``list[str]``. Returns: A new TimeFrame with the flag system registered. """ tf = self.copy() tf.register_flag_system(name, flag_system, flag_type) return tf
[docs] def with_periodicity(self, periodicity: str | Period) -> TimeFrame: """Return a new TimeFrame, with a new periodicity registered. Args: periodicity: The new periodicity Returns: A new TimeFrame with a new periodicity set. """ tf = self.copy() tf._time_manager._periodicity = configure_period_object(periodicity) tf._time_manager.validate(tf.df) return tf
@property def metadata(self) -> dict[str, Any]: """TimeFrame-level metadata.""" return self._metadata @metadata.setter def metadata(self, value: dict[str, Any] | None) -> None: """Set the TimeFrame-level metadata. This method checks type of object being set to ensure we continue to work with expected dicts. Args: value: The new metadata to set. """ if value is None: self._metadata = {} elif isinstance(value, dict): self._metadata = value else: raise MetadataError(f"TimeFrame-level metadata must be a dict object. Got: '{type(value)}'") @metadata.deleter def metadata(self) -> None: """Clear TimeFrame-level metadata.""" self._metadata.clear() @property def column_metadata(self) -> dict[str, dict[str, Any]]: """Per-column metadata.""" return self._column_metadata @column_metadata.setter def column_metadata(self, value: dict[str, dict[str, Any]] | None) -> None: """Set the column-level metadata. This method checks type of object being set to ensure we continue to work with expected dicts. Args: value: The new metadata to set. """ if value is None: # Reset all the columns metadata to empty dicts self._column_metadata = ColumnMetadataDict(lambda: self.df.columns) elif isinstance(value, dict): self.column_metadata.update(value) else: raise MetadataError(f"Column-level metadata must be a dict object. Got: '{type(value)}'") @column_metadata.deleter def column_metadata(self) -> None: """Clear all per-column metadata.""" self._column_metadata.clear() @property def time_name(self) -> str: """The name of the primary datetime column in the underlying TimeFrame DataFrame.""" return self._time_manager.time_name @property def resolution(self) -> Period: """The resolution of the timeseries data within the TimeFrame""" return self._time_manager.resolution @property def offset(self) -> str | None: """The offset of the time steps within the TimeFrame""" return self._time_manager.offset @property def alignment(self) -> Period: """The alignment of the time steps within the TimeFrame""" return self._time_manager.alignment @property def periodicity(self) -> Period: """The periodicity of the timeseries data within the TimeFrame""" return self._time_manager.periodicity @property def time_anchor(self) -> TimeAnchor: """The time anchor of the timeseries data within the TimeFrame""" return self._time_manager.time_anchor @property def df(self) -> pl.DataFrame: """The underlying ``Polars`` DataFrame containing the timeseries data.""" return self._df @property def columns(self) -> list[str]: """All column labels of the DataFrame within the TimeFrame.""" return [c for c in self.df.columns if c != self.time_name] @property def flag_columns(self) -> list[str]: """Only the labels for any flag columns within the TimeFrame.""" return list(self._flag_manager.flag_columns.keys()) @property def flag_systems(self) -> dict[str, type[FlagSystemBase]]: """The registered flag systems of this TimeFrame.""" return self._flag_manager.flag_systems @property def data_columns(self) -> list[str]: """Only the labels for the data columns within the TimeFrame.""" return [c for c in self.columns if c not in self.flag_columns]
[docs] def sort_time(self) -> None: """Sort the TimeFrame DataFrame by the time column.""" self._df = self.df.sort(self.time_name)
[docs] def pad(self, start: datetime | None = None, end: datetime | None = None) -> TimeFrame: """Pad the time series with missing datetime rows, filling in NULLs for missing values. Args: start: The starting datetime value to pad time values from (inclusive). If not provided then the beginning of the dataframe will be used. end: The final datetime value to pad time values to (inclusive). If not provided then the beginning of the dataframe will be used. Returns: Padded TimeFrame """ tf = self.copy() tf._df = pad_time( df=self.df, time_name=self.time_name, periodicity=self.periodicity, time_anchor=self.time_anchor, start=start, end=end, ) tf.sort_time() return tf
[docs] def register_flag_system( self, name: str, flag_system: FlagSystemType = None, flag_type: FlagSystemLiteral = "bitwise", ) -> None: """Register a named flag system with the internal flag manager. Args: name: Short name for the flag system. flag_system: The flag system definition. Accepted types are ``None`` (default bitwise system with a single ``FLAGGED`` flag at value 1), ``dict[str, int]`` (bitwise or categorical depending on ``flag_type`` - bitwise values must be powers of two), ``dict[str, str]`` (always categorical; ``flag_type`` is ignored), or ``list[str]`` (flag names are sorted; bitwise assigns powers of two, categorical uses each name as both key and value - an empty list produces the default ``FLAGGED`` flag). flag_type: Whether to create a ``"bitwise"`` or ``"categorical"`` flag system. Only relevant when ``flag_system`` is a ``dict[str, int]`` or ``list[str]``. """ self._flag_manager.register_flag_system(name, flag_system, flag_type)
[docs] def get_flag_system(self, name: str) -> type[FlagSystemBase]: """Return a registered flag system. Args: name: The registered flag system name. Returns: The ``BitwiseFlag`` or ``CategoricalFlag`` enum class that defines the flag system. """ return self._flag_manager.get_flag_system(name)
[docs] def register_flag_column(self, column_name: str, flag_system_name: str) -> None: """Mark the specified existing column as a flag column. All non-null values in the column must be valid flag values for the given flag system. Args: column_name: A column name to mark as a flag column. flag_system_name: The name of the registered flag system. Raises: BitwiseFlagUnknownError: If the column contains values with bits not in the bitwise flag system. CategoricalFlagUnknownError: If the column contains values not in the categorical flag system. """ check_columns_in_dataframe(self.df, column_name) flag_system = self.get_flag_system(flag_system_name) flag_system.validate_column(self.df[column_name]) self._flag_manager.register_flag_column(column_name, flag_system_name)
[docs] def init_flag_column( self, flag_system_name: str, column_name: str | None = None, data: int | str | Sequence[int | str] | None = None, ) -> None: """Add a new column to the TimeFrame DataFrame, setting it as a Flag Column. For bitwise flag systems, the column is initialised with integer values (default 0). For ``CategoricalSingleFlag`` systems, the column is initialised with null values. For ``CategoricalListFlag`` systems, the column is initialised with empty lists. Args: flag_system_name: The name of the registered flag system. column_name: Optional name for the new flag column. If omitted, a name of the form ``__flag__{flag_system_name}`` is used, with an integer suffix appended if the name already exists (e.g. ``__flag__CORE_FLAGS__1``). data: The default value(s) to populate the flag column with. Can be a scalar or list-like. For bitwise systems defaults to ``0``; for categorical systems defaults to ``None`` (null) in single mode or an empty list in list mode. """ flag_sys = self.get_flag_system(flag_system_name) flag_type = flag_sys.flag_type # 1. Resolve dtype if flag_type in ("categorical", "categorical_list"): inner_dtype = pl.Int32 if flag_sys.value_type() is int else pl.Utf8 col_dtype = pl.List(inner_dtype) if flag_type == "categorical_list" else inner_dtype else: col_dtype = pl.Int64 # 2. Build column - if it's a scalar or missing, use pl.lit; otherwise it's a sequence so cast to a Series if isinstance(data, (int, str)) or data is None: if data is None: if flag_type == "categorical_list": data = [] elif flag_type == "bitwise": data = 0 col_data = pl.lit(data, dtype=col_dtype) else: col_data = pl.Series(data, dtype=col_dtype) # 3. Determine name of flag column if not column_name: column_name = f"__flag__{flag_system_name}" if column_name in self.df.columns: col_suffix = 1 while f"{column_name}__{col_suffix}" in self.df.columns: col_suffix += 1 column_name = f"{column_name}__{col_suffix}" # 4. Add and register as a flag column self._df = self.df.with_columns(col_data.alias(column_name)) self._flag_manager.register_flag_column(column_name, flag_system_name) self._column_metadata.sync()
[docs] def get_flag_column(self, flag_column_name: str) -> FlagColumn: """Look up a registered flag column by name. Args: flag_column_name: Flag column name. Returns: The corresponding ``BitwiseFlagColumn`` or ``CategoricalFlagColumn`` object. """ return self._flag_manager.get_flag_column(flag_column_name)
[docs] def add_flag( self, flag_column_name: str, flag_value: int | str, expr: pl.Expr | pl.Series = pl.lit(True), overwrite: bool = True, ) -> None: """Add flag value to flag column, where expression is True. For bitwise flag columns, applies the flag via bitwise OR. ``overwrite`` is not applicable for bitwise columns. For categorical flag columns in scalar mode, sets the column value. When ``overwrite`` is ``False``, only rows whose current value is null are updated. For categorical flag columns in list mode, appends the value to the list. ``overwrite`` is not applicable in list mode. Args: flag_column_name: The name of the flag column. flag_value: The flag value to add. expr: Polars expression for which rows to add flag to. overwrite: Categorical scalar mode only. If ``True`` (default), replaces any existing value. If ``False``, only updates rows whose current value is null. """ flag_column = self.get_flag_column(flag_column_name) if isinstance(flag_column, CategoricalSingleFlagColumn): self._df = flag_column.add_flag(self.df, flag_value, expr, overwrite) else: self._df = flag_column.add_flag(self.df, flag_value, expr)
[docs] def remove_flag(self, column_name: str, flag_value: int | str, expr: pl.Expr | pl.Series = pl.lit(True)) -> None: """Remove a flag value from a flag column, where expression is True. For bitwise flag columns, clears the flag bit via bitwise AND NOT. For categorical flag columns in scalar mode, sets the column value to null. For categorical flag columns in list mode, removes all occurrences of the value from the list. Args: column_name: The name of the flag column. flag_value: The flag value to remove. expr: Polars expression for which rows to remove the flag from. """ flag_column = self.get_flag_column(column_name) self._df = flag_column.remove_flag(self.df, flag_value, expr)
[docs] def decode_flag_column(self, flag_column_name: str) -> TimeFrame: """Decode a flag column from raw values to human-readable flag names. For ``BitwiseFlagColumn``: replaces the integer column with a ``List(String)`` column of active flag names, sorted by ascending bit value. A value of 0 produces an empty list. For ``CategoricalFlagColumn`` in scalar mode: replaces each raw value with its flag name (e.g. ``123 -> "good"``), producing a ``Utf8`` column. For ``CategoricalFlagColumn`` in list mode: replaces each value in the list with its flag name, producing a ``List(Utf8)`` column. The column remains registered as a flag column. ``add_flag`` and ``remove_flag`` continue to work transparently on the decoded column. Args: flag_column_name: Name of the registered flag column to decode. Returns: A new TimeFrame with the flag column replaced by its decoded form. Raises: ColumnNotFoundError: If flag_column_name is not a registered flag column. ColumnTypeError: If the flag column is already in decoded form. """ flag_column = self.get_flag_column(flag_column_name) if flag_column.is_decoded: raise ColumnTypeError(f"Flag column '{flag_column_name}' is already in decoded form.") tf = self.with_df(flag_column.decode(self.df)) tf._flag_manager.flag_columns[flag_column_name].is_decoded = True return tf
[docs] def encode_flag_column(self, flag_column_name: str) -> TimeFrame: """Encode a decoded flag column back to raw values. For ``BitwiseFlagColumn``: replaces the ``List(String)`` column back to a bitwise integer column. Each flag name in the list contributes its bit value; an empty list produces 0. For ``CategoricalFlagColumn`` in scalar mode: replaces each flag name with its original value (e.g. ``"good" -> 123``). For ``CategoricalFlagColumn`` in list mode: replaces each flag name in the list with its original value. Args: flag_column_name: Name of the registered decoded flag column to encode. Returns: A new TimeFrame with the flag column replaced by its encoded form. Raises: ColumnNotFoundError: If flag_column_name is not a registered flag column. ColumnTypeError: If the flag column is not in decoded form. BitwiseFlagUnknownError: If a bitwise column contains any flag name not in the flag system. CategoricalFlagUnknownError: If a categorical column contains any flag name not in the flag system. """ flag_column = self.get_flag_column(flag_column_name) if not flag_column.is_decoded: raise ColumnTypeError(f"Flag column '{flag_column_name}' is not in decoded form.") tf = self.with_df(flag_column.encode(self.df)) tf._flag_manager.flag_columns[flag_column_name].is_decoded = False return tf
[docs] def filter_by_flag( self, flag_column_name: str, flag: int | str | list[int | str], include: bool = True, ) -> TimeFrame: """Return a new TimeFrame filtered to rows that have (or lack) specific flags set. For bitwise flag columns, a row matches if any of the given flags are set (bitwise OR check). For categorical flag columns, a row matches if its value (or any list element in list mode) is any of the given flag values. Args: flag_column_name: The name of the registered flag column to filter on. flag: One or more flag names or values to filter against. include: Whether to keep only rows that have the flag(s) (``True``) or keep only rows that do not have the flag(s) (``False``) Returns: A new TimeFrame containing only the rows that satisfy the filter condition. """ flags = flag if isinstance(flag, list) else [flag] flag_column = self.get_flag_column(flag_column_name) expr = flag_column.filter_expr(flags) if not include: # Fill null ensures that rows that don't have any flag values (null) are kept expr = ~expr.fill_null(False) tf = self.copy() tf._df = self.df.filter(expr) tf._column_metadata.sync() return tf
[docs] def aggregate( self, aggregation_period: Period | str, aggregation_function: str | Type[AggregationFunction] | AggregationFunction, columns: str | list[str] | None = None, missing_criteria: tuple[str, float | int] | None = None, aggregation_time_anchor: TimeAnchor | None = None, time_window: tuple[time, time] | tuple[time, time, str | ClosedInterval] | TimeWindow | None = None, **kwargs, ) -> TimeFrame: """Apply an aggregation function to a column in this TimeFrame, check the aggregation satisfies user requirements and return a new derived TimeFrame containing the aggregated data. Args: aggregation_period: The period over which to aggregate the data aggregation_function: The aggregation function to apply columns: The column(s) containing the data to be aggregated. If omitted, will use all data columns. missing_criteria: How the aggregation handles missing data aggregation_time_anchor: The time anchor for the aggregation result. time_window: Optional restriction of which time-of-day observations are included in each aggregation period. Only supported when the aggregation period is daily or longer and the data periodicity is sub-daily. Accepts a :class:`TimeWindow` instance, or a ``(start, end, closed[optional])`` tuple, where: - ``start``: :class:`datetime.time` object for start of the window - ``end``: :class:`datetime.time` object for end of the window - ``closed``: Define which sides of the interval are closed (inclusive) {'both', 'left', 'right', 'none'} (default = "both") **kwargs: Parameters specific to the aggregation function. Returns: A TimeFrame containing the aggregated data. """ # Normalise time_window tuple to a TimeWindow instance normalised_time_window = TimeWindow.from_tuple(time_window) if isinstance(time_window, tuple) else time_window agg_func = AggregationFunction.get(aggregation_function, **kwargs) aggregation_period = configure_period_object(aggregation_period) aggregation_time_anchor = TimeAnchor(aggregation_time_anchor) if aggregation_time_anchor else self.time_anchor if not columns: columns = self.data_columns ctx = AggregationCtx( df=self.df, time_name=self.time_name, time_anchor=self.time_anchor, periodicity=self.periodicity, ) agg_df = StandardAggregationPipeline( agg_func, ctx, aggregation_period, columns, missing_criteria=missing_criteria, aggregation_time_anchor=aggregation_time_anchor, time_window=normalised_time_window, ).execute() # The resulting resolution and offset needs to be extracted from the aggregation period new_resolution = aggregation_period.without_offset() new_offset = aggregation_period.offset tf = TimeFrame( df=agg_df, time_name=self.time_name, resolution=new_resolution, offset=new_offset, periodicity=aggregation_period, time_anchor=aggregation_time_anchor, ) tf.metadata = deepcopy(self.metadata) return tf
[docs] def rolling_aggregate( self, window_size: Period | str, aggregation_function: str | Type[AggregationFunction] | AggregationFunction, columns: str | list[str] | None = None, missing_criteria: tuple[str, float | int] | None = None, alignment: RollingAlignment | str = RollingAlignment.TRAILING, **kwargs, ) -> TimeFrame: """Apply a rolling aggregation function to this TimeFrame and return a new TimeFrame with the same temporal structure. Unlike :meth:`aggregate`, which reduces the resolution of the data, rolling aggregation preserves the original timestamps: each row in the output corresponds to a row in the input, with the aggregation computed over a sliding window of the specified size. Args: window_size: The size of the rolling window. aggregation_function: The aggregation function to apply. columns: The column(s) containing the data to be aggregated. If omitted, will use all data columns. missing_criteria: How the aggregation handles missing data. When the actual number of values in a window is below the threshold the result is flagged as invalid via a ``valid_<column>`` column. alignment: Where the window is positioned relative to each timestamp: - ``TRAILING`` (default): window looks backward - ``(t - window_size, t]``. Edge effects appear at the start of the series. - ``LEADING``: window looks forward - ``[t, t + window_size)``. Edge effects appear at the end of the series. - ``CENTER``: window is centered - ``[t - window_size/2, t + window_size/2]``. Edge effects appear at both ends. Not supported for calendar-based window sizes. Accepts a :class:`~time_stream.enums.RollingAlignment` instance or a string (``'trailing'``, ``'leading'``, ``'center'``). **kwargs: Parameters specific to the aggregation function. Returns: A TimeFrame with the same resolution, periodicity, and time anchor as this TimeFrame, containing the rolling aggregation results. """ agg_func = AggregationFunction.get(aggregation_function, **kwargs) window_size = configure_period_object(window_size) alignment = RollingAlignment(alignment) if isinstance(alignment, str) else alignment if not columns: columns = self.data_columns ctx = AggregationCtx( df=self.df, time_name=self.time_name, time_anchor=self.time_anchor, periodicity=self.periodicity, ) agg_df = RollingAggregationPipeline( agg_func, ctx, window_size, columns, missing_criteria=missing_criteria, alignment=alignment, ).execute() tf = TimeFrame( df=agg_df, time_name=self.time_name, resolution=self.resolution, offset=self.offset, periodicity=self.periodicity, time_anchor=self.time_anchor, ) tf.metadata = deepcopy(self.metadata) return tf
# @overload lets type checkers know the return type depends on whether flag_params is provided. # Without overloads, callers always get TimeFrame | Series, making e.g. .df access after qc_check a type error. @overload def qc_check( self, check: str | Type[QCCheck] | QCCheck, column_name: str, observation_interval: tuple[datetime, datetime | None] | None = ..., flag_params: None = ..., **kwargs, ) -> pl.Series: ... @overload def qc_check( self, check: str | Type[QCCheck] | QCCheck, column_name: str, observation_interval: tuple[datetime, datetime | None] | None = ..., *, flag_params: tuple[str, str | int], **kwargs, ) -> "TimeFrame": ...
[docs] def qc_check( self, check: str | Type[QCCheck] | QCCheck, column_name: str, observation_interval: tuple[datetime, datetime | None] | None = None, flag_params: tuple[str, str | int] | None = None, **kwargs, ) -> "TimeFrame | pl.Series": """Apply a quality control check to the TimeFrame. Args: check: The QC check to apply. column_name: The column to perform the check on. observation_interval: Optional time interval to limit the check to. flag_params: Tuple of (flag column name [str], flag value [str | int]. If provided, add given flag value to the flag column where the QC check returns ``True``. If not provided, the result of the QC check is returned as a boolean series. **kwargs: Parameters specific to the check type. Returns: Result of the QC check, either as a boolean Series or added to the TimeFrame dataframe """ # Get the QC check instance and run the apply method check_instance = QCCheck.get(check, **kwargs) qc_result = check_instance.apply(self.df, self.time_name, column_name, observation_interval) if not flag_params: # Return the boolean series, if requested return qc_result else: # Otherwise, create a copy of the current TimeFrame, and update the dataframe with the QC result flag_column_name, flag_value = flag_params tf_result = self.copy() tf_result.add_flag(flag_column_name, flag_value, qc_result) return tf_result
[docs] def infill( self, infill_method: str | Type[InfillMethod] | InfillMethod, column_name: str, max_gap_size: int | None = None, observation_interval: tuple[datetime, datetime | None] | None = None, flag_params: tuple[str, str | int] | None = None, **kwargs, ) -> TimeFrame: """Apply an infilling method to a column in the TimeFrame to fill in missing data. Args: infill_method: The method to use for infilling column_name: The column to infill max_gap_size: The maximum size of consecutive null gaps that should be filled. Any gap larger than this will not be infilled and will remain as null. observation_interval: Optional time interval to limit the check to. flag_params: Tuple of (flag column name [str], flag value [str | int]. If provided, add given flag value to the flag column on rows that were infilled. If not provided, no flags added. **kwargs: Parameters specific to the infill method. Returns: A TimeFrame containing the aggregated data. """ # Get the infill method instance and run the apply method infill_instance = InfillMethod.get(infill_method, **kwargs) infill_result = infill_instance.apply( self.df, self.time_name, self.periodicity, column_name, observation_interval, max_gap_size ) # Create a copy of the current TimeFrame, and update the dataframe with the infilled data tf_result = self.with_df(infill_result) if flag_params: # Add flag where we have infilled data flag_column_name, flag_value = flag_params before = self.df[column_name] after = tf_result.df[column_name] before_is_null = before.is_null() | before.is_nan() after_is_null = after.is_null() | after.is_nan() mask = before_is_null.ne(after_is_null) tf_result.add_flag(flag_column_name, flag_value, mask) return tf_result
[docs] def select( self, column_names: str | list[str], ) -> TimeFrame: """Return a new TimeFrame instance to include only the specified columns. This: - carries over TimeFrame-level metadata, - prunes column-level metadata to the kept columns, - rebuilds the flag manager to include only kept flag columns. Flag columns are not automatically included; name them explicitly if you want them retained. Args: column_names: Column name(s) to retain in the updated TimeFrame. Returns: New TimeFrame instance with only selected columns. """ if not column_names: raise ColumnNotFoundError("No columns specified.") if isinstance(column_names, str): column_names = [column_names] check_columns_in_dataframe(self.df, column_names) # Include primary time column (if not already included) if self.time_name not in column_names: column_names.insert(0, self.time_name) # Build new frame new_df = self.df.select(column_names) # New TimeFrame tf = self.with_df(new_df) # Prune column level metadata to kept columns kept_metadata = {col: self.column_metadata[col] for col in column_names} tf.column_metadata.clear() tf.column_metadata.update(kept_metadata) # Rebuild the flag registry for kept columns new_flag_manager = FlagManager() # re-register flag systems for name, flag_system in self._flag_manager.flag_systems.items(): new_flag_manager.register_flag_system(name, flag_system.to_dict(), flag_system.flag_type) # keep only flag columns that survived for flag_name, flag_column in self._flag_manager.flag_columns.items(): if flag_name in column_names: new_flag_manager.register_flag_column(flag_name, flag_column.flag_system.system_name()) tf._flag_manager = new_flag_manager tf._column_metadata.sync() return tf
[docs] def calculate_min_max_envelope(self, columns: list[str] | None = None) -> TimeFrame: """Calculate the min-max envelope for the TimeFrame. For each unique date-time find the historical min and max values across the time series. For example, for a daily time series, the min-max envelope would be calculated from every instance of 01-Jan. For sub-daily time series, the min-max envelope is calculated from ever instance of the day-time across the time series. For example, for hourly resolution, the min-max envelope would be calculated for all instances of 01-Jan 00:00, 01-Jan 01:00 etc. Args: columns: The columns to calculate the min-max envelope on. If None, then all data columns will be used. Returns: TimeFrame: A new copy of the TimeFrame instance with an updated df attribute. """ df_with_min_max = calculate_min_max_envelope(tf=self, columns=columns) tf = self.with_df(df_with_min_max) return tf
def __getitem__(self, key: str | list[str]) -> TimeFrame: """Access columns using indexing syntax. Args: key: Column name(s) to access Returns: A new TimeFrame instance with the specified column(s) selected. Notes: This is equivalent to ``tf.select([...])``. """ if isinstance(key, str): key = [key] return self.select(key) def __str__(self) -> str: """Return the string representation of the TimeFrame dataframe.""" return timeframe_repr(self) def __repr__(self) -> str: """Returns the representation of the TimeFrame""" return timeframe_repr(self) def __copy__(self) -> TimeFrame: return self.copy(share_df=True) def __deepcopy__(self, memo: dict) -> TimeFrame: return self.copy(share_df=False) def __eq__(self, other: object) -> bool: """Check if two TimeFrame instances are equal. Args: other: The object to compare. Returns: bool: True if the TimeFrame instances are equal, False otherwise. """ if not isinstance(other, TimeFrame): return False return ( self.df.equals(other.df) and self.time_name == other.time_name and self.resolution == other.resolution and self.periodicity == other.periodicity and self.time_anchor == other.time_anchor and self._flag_manager == other._flag_manager and self.metadata == other.metadata and self.column_metadata == other.column_metadata ) # Make class instances unhashable __hash__ = None # type: ignore[assignment]