Aggregation

Simple code, robust results.

Why use Time-Stream?

Aggregating time series data sounds simple - until you hit real-world edge cases, particularly when working with hydrological and environmental datasets. There are considerations such as: handling leap years, anchor points, working with offset-periods (like water-years), or tracking completeness of data going into each aggregation.

Forget the pain of writing your own custom code; with Time-Stream, you declare the aggregation period you want (daily, monthly, yearly, or any other custom period) and the library handles the rest. Simple code, robust results.

One-liner

Rolling your own aggregation functionality can get complex. With Time-Stream you express the intent directly:

tf.aggregate("P1D", "sum", "precipitation")

That’s it, a single line with clear intent: “I want a daily sum of my precipitation data.”

Complex example

UK hydrologists often use a “UK water-year” that runs from 1 October at 09:00 through to the following year. Writing code to aggregate data (whether that’s 15-minute river level, or daily flow values) into water-year totals can be painful.

Take, for example, generating a water-year “annual maximum” (AMAX) series. You might want to:

  • Generate an output time series, with the datetime values of the start of each water-year

  • Keep hold of the exact timestamp that the maximum occurred on

  • Know exactly how many data points were available in each water year

Here’s how you might have to do this in Pandas, Polars and then in Time-Stream:

Input:

15-minute river flow timeseries, including some missing data.

shape: (110_977, 2)
┌─────────────────────┬───────────┐
│ time                ┆ flow      │
│ ---                 ┆ ---       │
│ datetime[ns]        ┆ f64       │
╞═════════════════════╪═══════════╡
│ 2020-09-01 00:00:00 ┆ 92.860538 │
│ 2020-09-01 00:15:00 ┆ null      │
│ 2020-09-01 00:30:00 ┆ 98.103103 │
│ 2020-09-01 00:45:00 ┆ null      │
│ 2020-09-01 01:00:00 ┆ null      │
│ 2020-09-01 01:15:00 ┆ null      │
│ 2020-09-01 01:30:00 ┆ null      │
│ 2020-09-01 01:45:00 ┆ 92.085242 │
│ …                   ┆ …         │
│ 2023-10-31 22:15:00 ┆ 84.677897 │
│ 2023-10-31 22:30:00 ┆ 86.0179   │
│ 2023-10-31 22:45:00 ┆ 83.122459 │
│ 2023-10-31 23:00:00 ┆ 76.928613 │
│ 2023-10-31 23:15:00 ┆ 83.320365 │
│ 2023-10-31 23:30:00 ┆ null      │
│ 2023-10-31 23:45:00 ┆ null      │
│ 2023-11-01 00:00:00 ┆ 84.721752 │
└─────────────────────┴───────────┘

Code:

import pandas as pd

df = get_example_df(library="pandas")

# Shift time back by 9 hours to align water year boundaries with midnight Oct 1st
# This makes Oct 1 9am appear as Oct 1 midnight for resampling purposes
df.index = df.index - pd.Timedelta(hours=9)

# Aggregate using resample
# We can use "YS-OCT" (Year Start in October) as the resample period
resampled = df.resample("YS-OCT")

df_amax = pd.DataFrame(
    {
        "max_flow": resampled["flow"].max(),
        "count_flow": resampled["flow"].count(),
    }
)

# Get the datetime that each max value occurred on
max_dates = resampled["flow"].idxmax() + pd.Timedelta(hours=9)
df_amax["time_of_max_flow"] = max_dates

# Adjust the time column back to represent actual water year starts (Oct 1 9am)
df_amax.index = df_amax.index + pd.Timedelta(hours=9)
df_amax.index.name = "time"

# Calculate FULL water year expected counts
# Each water year should have the number of 15-min intervals (900 seconds)
#   from Oct 1 9am to next Oct 1 9am
df_amax["expected_count_time"] = (
    (
        (df_amax.index + pd.DateOffset(years=1)) - df_amax.index
    ).total_seconds() / 900
)

df_amax = df_amax.reset_index()
import polars as pl

df = get_example_df(library="polars")

# Shift time back by 9 hours to align water year boundaries with midnight Oct 1st
# This makes Oct 1 9am appear as Oct 1 midnight for resampling purposes
df = df.with_columns(pl.col("time").dt.offset_by("-9h").alias("shifted_time"))
# Assign water year based on shifted time
df = df.with_columns(
    [
        pl.when(pl.col("shifted_time").dt.month() >= 10)
        .then(pl.col("shifted_time").dt.year())
        .otherwise(pl.col("shifted_time").dt.year() - 1)
        .alias("water_year")
    ]
)

df_amax = df.group_by("water_year").agg([
    pl.col("flow").max().alias("max_flow"),
    pl.col("flow").count().alias("count_flow"),

    # Get the datetime that each max value occurred on
    pl.col("time").filter(
        pl.col("flow") == pl.col("flow").max()
    ).first().alias("time_of_max_flow"),
]).sort("water_year")

# Adjust the time column back to represent actual water year starts (Oct 1 9am)
df_amax = df_amax.with_columns(pl.datetime(pl.col("water_year"), 10, 1, 9).alias("time"))

# Calculate FULL water year expected counts
# Each water year should have the number of 15-min intervals (900 seconds)
#   from Oct 1 9am to next Oct 1 9am
df_amax = df_amax.with_columns(
    ((pl.col("time").dt.offset_by("1y") - pl.col("time")).dt.total_seconds() // 900
     ).alias("expected_count_time")
)
import time_stream as ts

df = get_example_df(library="polars")

# Wrap the DataFrame in a TimeFrame object
tf = ts.TimeFrame(df, "time", resolution="PT15M", periodicity="PT15M")

# Perform the aggregation to a water-year
tf_amax = tf.aggregate("P1Y+9MT9H", "max", "flow")


























# that's it...

Output:

shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[ns]        ┆ datetime[ns]        ┆ f64        ┆ i64        ┆ f64                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136.0             │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040.0             │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040.0             │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040.0             │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136.0             │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘
shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[μs]        ┆ datetime[ns]        ┆ f64        ┆ u32        ┆ i64                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136               │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040               │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040               │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040               │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136               │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘
shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[ns]        ┆ datetime[ns]        ┆ f64        ┆ u32        ┆ u32                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136               │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040               │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040               │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040               │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136               │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘

Key benefits

  • Less boilerplate: No need to wrangle custom datetime columns or write manual offset logic.

  • Fewer mistakes: Periodicity, alignment, and anchor semantics are enforced for you.

  • Domain-ready: Express hydrological conventions directly: daily at 09:00, or water year from October.

  • Readable & reproducible: Your code is self-explanatory to collaborators and reviewers.

In more detail

The aggregate() method is the entry point for performing aggregations with timeseries data in Time-Stream. It combines Polars performance with TimeFrame semantics (resolution, periodicity, anchor).

Aggregation period

The time window you want to aggregate into. This can be specified as an ISO-8601 duration string, with optional modification to specify a custom offset to the period.

Common examples:

  • "P1D" – calendar day

  • "P1M" – calendar month

  • "P1Y" – calendar year

  • "P1Y+9MT9H"water year starting 1 Oct 09:00

  • "PT15M" – 15-minute buckets

Note

The resulting TimeFrame will have its resolution and periodicity set to this value.

Aggregation methods

Choose how values inside each window are summarised. Pass a string corresponding to one of the built-in functions.

sum

time_stream.aggregation.Sum

What it does: Adds up all values in each period.

When to use: Use this for quantities that accumulate over time, such as precipitation.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "sum", "precip")

mean

time_stream.aggregation.Mean

What it does: Averages all values in each period.

When to use: Useful for variables like temperature or concentration, where the average represents the period well.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "mean", "concentration")

min

time_stream.aggregation.Min

What it does: Finds the smallest value observed in each period.

When to use: Often used to track minimum daily temperature, or low flows in rivers.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "min", "temperature")

max

time_stream.aggregation.Max

What it does: Finds the largest value observed in each period.

When to use: Common in hydrology for annual maxima (AMAX) or flood frequency analysis.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "max", "flow")

percentile

time_stream.aggregation.Percentile

What it does: Finds the ‘nth’ percentile value for each period.

When to use: Useful for capturing extremes within a period, such as the 5th or 95th percentile of streamflow.

Additional args:

p: The percentile value to be calculated, provided as an integer parameter from 0 to 100 (inclusive).

Example usage: tf_agg = tf.aggregate("P1D", "percentile", "flow", p=95)

pot

time_stream.aggregation.PeaksOverThreshold

What it does: “Peaks over threshold” calculation - counts number of values above a given threshold.

When to use: Commonly used in hydrology to extract extreme events in a given year.

Additional args:

threshold: The threshold over which to count.

Example usage: tf_agg = tf.aggregate("P1Y", "pot", "flow", threshold=65.8)

conditional_count

time_stream.aggregation.ConditionalCount

What it does: Count values that meet a specific condition within each period.

When to use: When you need flexibility in the condition that you need to count. Any Polars expressions can be used. Examples may include:

  1. Count of where a value increases compared to the previous value (change detection)

  2. Count of sudden jumps greater than a threshold (spike detection)

  3. Count of categorical data

Additional args:

condition: A function that takes a Polars expression and returns a boolean expression.

Example usage: For the examples given above:

  1. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: (col - col.shift(1)) > 0)

  2. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: col.diff().abs() > 5)

  3. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: col.is_in(["ok", "good"]))

Column selection

Specify which columns to aggregate; only these will be used by the aggregation function. This can be a single column name, a list of columns, or if not provided - the method will use all columns in the timeseries.

Missing criteria

Control whether a window is considered “complete enough” to produce a value by specifying a specific missing criteria policy and associated threshold.

The policies you can specify are:

  • available: Requires at least “n” points within the aggregation window.

  • percent: Requires at least “n”% of points within the aggregation window.

  • missing: No more than “n” points can be missing within the aggregation window.

Examples

Using the 15-minute flow example data:

# Require at least 2,400 values present (25 days of 15 minute data)
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("available", 25 * 96))

# Require at least 75% of data to be present
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("percent", 75))

# Allow at most 150 missing values
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("missing", 150))

The resulting TimeFrame object will contain metadata columns that provide detail about the completeness of the aggregation windows, and whether an aggregated data point is considered valid:

  • count_<column>: The number of points found in each aggregation window

  • expected_count_<time>: The number of points expected if the aggregation window was full

  • valid_<column>: Whether the individual aggregated data points are valid or not, based on the missing criteria specified.

tf.aggregate("P1M", "mean", "flow", missing_criteria=("missing", 150))

<time_stream.TimeFrame> Size (estimated): 989.00 B
Time properties:
    Time column         : time  [2020-09-01 00:00:00, ..., 2023-11-01 00:00:00]
    Type                : Datetime(time_unit='ns', time_zone=None)
    Resolution          : P1M
    Offset              : 
    Alignment           : P1M
    Periodicity         : P1M
    Anchor              : TimeAnchor.START
Columns:
    mean_flow           : Float64  312.00 B  [96.30423558869707, ..., 84.72175228556347]
    count_flow          : UInt32  156.00 B  [2730, ..., 1]
    expected_count_time : UInt32  156.00 B  [2880, ..., 2880]
    valid_flow          : Boolean  5.00 B  [True, ..., False]

Time anchoring

Choose the time anchor of the aggregated TimeFrame, which you may want to be different than the input TimeFrame. For example, meteorological observations are often considered as end anchored - where the value is considered valid up to the given timestamp. When producing a daily mean from this data, it may make more sense for the result to use a start anchor - indicating the value is valid from the start of the day to the end of the day.

See the concepts page page for more information about time anchors.

Note

If omitted, the aggregation uses the input TimeFrame’s time anchor