Aggregation

Simple code, robust results.

Why use Time-Stream?

Aggregating time series data sounds simple - until you hit real-world edge cases, particularly when working with hydrological and environmental datasets. There are considerations such as: handling leap years, anchor points, working with offset-periods (like water-years), or tracking completeness of data going into each aggregation.

Forget the pain of writing your own custom code; with Time-Stream, you declare the aggregation period you want (daily, monthly, yearly, or any other custom period) and the library handles the rest. Simple code, robust results.

One-liner

Rolling your own aggregation functionality can get complex. With Time-Stream you express the intent directly:

tf.aggregate("P1D", "sum", "precipitation")

That’s it, a single line with clear intent: “I want a daily sum of my precipitation data.”

Complex example

UK hydrologists often use a “UK water-year” that runs from 1 October at 09:00 through to the following year. Writing code to aggregate data (whether that’s 15-minute river level, or daily flow values) into water-year totals can be painful.

Take, for example, generating a water-year “annual maximum” (AMAX) series. You might want to:

  • Generate an output time series, with the datetime values of the start of each water-year

  • Keep hold of the exact timestamp that the maximum occurred on

  • Know exactly how many data points were available in each water year

Here’s how you might have to do this in Pandas, Polars and then in Time-Stream:

Input:

15-minute river flow timeseries, including some missing data.

shape: (110_977, 2)
┌─────────────────────┬───────────┐
│ time                ┆ flow      │
│ ---                 ┆ ---       │
│ datetime[ns]        ┆ f64       │
╞═════════════════════╪═══════════╡
│ 2020-09-01 00:00:00 ┆ 92.860538 │
│ 2020-09-01 00:15:00 ┆ null      │
│ 2020-09-01 00:30:00 ┆ 98.103103 │
│ 2020-09-01 00:45:00 ┆ null      │
│ 2020-09-01 01:00:00 ┆ null      │
│ 2020-09-01 01:15:00 ┆ null      │
│ 2020-09-01 01:30:00 ┆ null      │
│ 2020-09-01 01:45:00 ┆ 92.085242 │
│ …                   ┆ …         │
│ 2023-10-31 22:15:00 ┆ 84.677897 │
│ 2023-10-31 22:30:00 ┆ 86.0179   │
│ 2023-10-31 22:45:00 ┆ 83.122459 │
│ 2023-10-31 23:00:00 ┆ 76.928613 │
│ 2023-10-31 23:15:00 ┆ 83.320365 │
│ 2023-10-31 23:30:00 ┆ null      │
│ 2023-10-31 23:45:00 ┆ null      │
│ 2023-11-01 00:00:00 ┆ 84.721752 │
└─────────────────────┴───────────┘

Code:

import pandas as pd

df = get_example_df(library="pandas")

# Shift time back by 9 hours to align water year boundaries with midnight Oct 1st
# This makes Oct 1 9am appear as Oct 1 midnight for resampling purposes
df.index = df.index - pd.Timedelta(hours=9)

# Aggregate using resample
# We can use "YS-OCT" (Year Start in October) as the resample period
resampled = df.resample("YS-OCT")

df_amax = pd.DataFrame(
    {
        "max_flow": resampled["flow"].max(),
        "count_flow": resampled["flow"].count(),
    }
)

# Get the datetime that each max value occurred on
max_dates = resampled["flow"].idxmax() + pd.Timedelta(hours=9)
df_amax["time_of_max_flow"] = max_dates

# Adjust the time column back to represent actual water year starts (Oct 1 9am)
df_amax.index = df_amax.index + pd.Timedelta(hours=9)
df_amax.index.name = "time"

# Calculate FULL water year expected counts
# Each water year should have the number of 15-min intervals (900 seconds)
#   from Oct 1 9am to next Oct 1 9am
df_amax["expected_count_time"] = (
    (
        (df_amax.index + pd.DateOffset(years=1)) - df_amax.index
    ).total_seconds() / 900
)

df_amax = df_amax.reset_index()
import polars as pl

df = get_example_df(library="polars")

# Shift time back by 9 hours to align water year boundaries with midnight Oct 1st
# This makes Oct 1 9am appear as Oct 1 midnight for resampling purposes
df = df.with_columns(pl.col("time").dt.offset_by("-9h").alias("shifted_time"))
# Assign water year based on shifted time
df = df.with_columns(
    [
        pl.when(pl.col("shifted_time").dt.month() >= 10)
        .then(pl.col("shifted_time").dt.year())
        .otherwise(pl.col("shifted_time").dt.year() - 1)
        .alias("water_year")
    ]
)

df_amax = df.group_by("water_year").agg([
    pl.col("flow").max().alias("max_flow"),
    pl.col("flow").count().alias("count_flow"),

    # Get the datetime that each max value occurred on
    pl.col("time").filter(
        pl.col("flow") == pl.col("flow").max()
    ).first().alias("time_of_max_flow"),
]).sort("water_year")

# Adjust the time column back to represent actual water year starts (Oct 1 9am)
df_amax = df_amax.with_columns(pl.datetime(pl.col("water_year"), 10, 1, 9).alias("time"))

# Calculate FULL water year expected counts
# Each water year should have the number of 15-min intervals (900 seconds)
#   from Oct 1 9am to next Oct 1 9am
df_amax = df_amax.with_columns(
    ((pl.col("time").dt.offset_by("1y") - pl.col("time")).dt.total_seconds() // 900
     ).alias("expected_count_time")
)
import time_stream as ts

df = get_example_df(library="polars")

# Wrap the DataFrame in a TimeFrame object
tf = ts.TimeFrame(df, "time", resolution="PT15M", periodicity="PT15M")

# Perform the aggregation to a water-year
tf_amax = tf.aggregate("P1Y+9MT9H", "max", "flow")


























# that's it...

Output:

shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[ns]        ┆ datetime[ns]        ┆ f64        ┆ i64        ┆ f64                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136.0             │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040.0             │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040.0             │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040.0             │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136.0             │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘
shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[μs]        ┆ datetime[ns]        ┆ f64        ┆ u32        ┆ i64                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136               │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040               │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040               │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040               │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136               │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘
shape: (5, 5)
┌─────────────────────┬─────────────────────┬────────────┬────────────┬─────────────────────┐
│ time                ┆ time_of_max_flow    ┆ max_flow   ┆ count_flow ┆ expected_count_time │
│ ---                 ┆ ---                 ┆ ---        ┆ ---        ┆ ---                 │
│ datetime[ns]        ┆ datetime[ns]        ┆ f64        ┆ u32        ┆ u32                 │
╞═════════════════════╪═════════════════════╪════════════╪════════════╪═════════════════════╡
│ 2019-10-01 09:00:00 ┆ 2020-09-02 13:30:00 ┆ 119.850227 ┆ 2759       ┆ 35136               │
│ 2020-10-01 09:00:00 ┆ 2021-04-26 05:45:00 ┆ 119.97495  ┆ 33354      ┆ 35040               │
│ 2021-10-01 09:00:00 ┆ 2021-10-06 19:45:00 ┆ 119.953105 ┆ 33285      ┆ 35040               │
│ 2022-10-01 09:00:00 ┆ 2023-09-26 19:30:00 ┆ 119.971628 ┆ 33271      ┆ 35040               │
│ 2023-10-01 09:00:00 ┆ 2023-10-22 22:15:00 ┆ 119.867875 ┆ 2789       ┆ 35136               │
└─────────────────────┴─────────────────────┴────────────┴────────────┴─────────────────────┘

Key benefits

  • Less boilerplate: No need to wrangle custom datetime columns or write manual offset logic.

  • Fewer mistakes: Periodicity, alignment, and anchor semantics are enforced for you.

  • Domain-ready: Express hydrological conventions directly: daily at 09:00, or water year from October.

  • Readable & reproducible: Your code is self-explanatory to collaborators and reviewers.

In more detail

The aggregate() method is the entry point for performing aggregations with timeseries data in Time-Stream. It combines Polars performance with TimeFrame semantics (resolution, periodicity, anchor).

Aggregation period

The time window you want to aggregate into. This can be specified as an ISO-8601 duration string, with optional modification to specify a custom offset to the period.

Common examples:

  • "P1D" – calendar day

  • "P1M" – calendar month

  • "P1Y" – calendar year

  • "P1Y+9MT9H"water year starting 1 Oct 09:00

  • "PT15M" – 15-minute buckets

Note

The resulting TimeFrame will have its resolution and periodicity set to this value.

Aggregation methods

Choose how values inside each window are summarised. Pass a string corresponding to one of the built-in functions.

sum

time_stream.aggregation.Sum

What it does: Adds up all values in each period.

When to use: Use this for quantities that accumulate over time, such as precipitation.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "sum", "precip")

mean

time_stream.aggregation.Mean

What it does: Averages all values in each period.

When to use: Useful for variables like temperature or concentration, where the average represents the period well.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "mean", "concentration")

angular_mean

time_stream.aggregation.AngularMean

What it does: Averages all angles (measured in degrees) in each period.

Results are in the range 0 to 360 degrees.

When to use: Useful for variables like wind direction ("wd"), where the average represents the period well.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "angular_mean", "wd")

min

time_stream.aggregation.Min

What it does: Finds the smallest value observed in each period.

When to use: Often used to track minimum daily temperature, or low flows in rivers.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "min", "temperature")

max

time_stream.aggregation.Max

What it does: Finds the largest value observed in each period.

When to use: Common in hydrology for annual maxima (AMAX) or flood frequency analysis.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "max", "flow")

percentile

time_stream.aggregation.Percentile

What it does: Finds the ‘nth’ percentile value for each period.

When to use: Useful for capturing extremes within a period, such as the 5th or 95th percentile of streamflow.

Additional args:

p: The percentile value to be calculated, provided as an integer parameter from 0 to 100 (inclusive).

Example usage: tf_agg = tf.aggregate("P1D", "percentile", "flow", p=95)

pot

time_stream.aggregation.PeaksOverThreshold

What it does: “Peaks over threshold” calculation - counts number of values above a given threshold.

When to use: Commonly used in hydrology to extract extreme events in a given year.

Additional args:

threshold: The threshold over which to count.

Example usage: tf_agg = tf.aggregate("P1Y", "pot", "flow", threshold=65.8)

conditional_count

time_stream.aggregation.ConditionalCount

What it does: Count values that meet a specific condition within each period.

When to use: When you need flexibility in the condition that you need to count. Any Polars expressions can be used. Examples may include:

  1. Count of where a value increases compared to the previous value (change detection)

  2. Count of sudden jumps greater than a threshold (spike detection)

  3. Count of categorical data

Additional args:

condition: A function that takes a Polars expression and returns a boolean expression.

Example usage: For the examples given above:

  1. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: (col - col.shift(1)) > 0)

  2. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: col.diff().abs() > 5)

  3. tf_agg = tf.aggregate("P1Y", "conditional_count", "flow", condition=lambda col: col.is_in(["ok", "good"]))

stdev

time_stream.aggregation.StDev

What it does: Captures the standard deviation - the variability or spread of values around the mean - for a set of values.

When to use: Useful for quality control checks. A smaller standard deviation value indicates less variation from the mean value. If this is applied to a variable where it is expected that values will not change significantly across the time period, then a high standard deviation indicates a chance there is an issue with the data.

Additional args: None.

Example usage: tf_agg = tf.aggregate("P1D", "stdev", "ta")

Column selection

Specify which columns to aggregate; only these will be used by the aggregation function. This can be a single column name, a list of columns, or if not provided - the method will use all columns in the timeseries.

Missing criteria

Control whether a window is considered “complete enough” to produce a value by specifying a specific missing criteria policy and associated threshold.

The policies you can specify are:

  • available: Requires at least “n” points within the aggregation window.

  • percent: Requires at least “n”% of points within the aggregation window.

  • missing: No more than “n” points can be missing within the aggregation window.

Examples

Using the 15-minute flow example data:

# Require at least 2,400 values present (25 days of 15 minute data)
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("available", 25 * 96))

# Require at least 75% of data to be present
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("percent", 75))

# Allow at most 150 missing values
tf_agg = tf.aggregate("P1M", "mean", "flow", missing_criteria=("missing", 150))

The resulting TimeFrame object will contain metadata columns that provide detail about the completeness of the aggregation windows, and whether an aggregated data point is considered valid:

  • count_<column>: The number of points found in each aggregation window

  • expected_count_<time>: The number of points expected if the aggregation window was full

  • valid_<column>: Whether the individual aggregated data points are valid or not, based on the missing criteria specified.

shape: (39, 5)
┌─────────────────────┬───────────┬────────────┬─────────────────────┬────────────┐
│ time                ┆ mean_flow ┆ count_flow ┆ expected_count_time ┆ valid_flow │
│ ---                 ┆ ---       ┆ ---        ┆ ---                 ┆ ---        │
│ datetime[ns]        ┆ f64       ┆ u32        ┆ u32                 ┆ bool       │
╞═════════════════════╪═══════════╪════════════╪═════════════════════╪════════════╡
│ 2020-09-01 00:00:00 ┆ 96.304236 ┆ 2730       ┆ 2880                ┆ true       │
│ 2020-10-01 00:00:00 ┆ 94.695939 ┆ 2838       ┆ 2976                ┆ true       │
│ 2020-11-01 00:00:00 ┆ 94.083343 ┆ 2747       ┆ 2880                ┆ true       │
│ 2020-12-01 00:00:00 ┆ 95.940811 ┆ 2833       ┆ 2976                ┆ true       │
│ 2021-01-01 00:00:00 ┆ 95.014653 ┆ 2821       ┆ 2976                ┆ false      │
│ …                   ┆ …         ┆ …          ┆ …                   ┆ …          │
│ 2023-07-01 00:00:00 ┆ 95.973884 ┆ 2835       ┆ 2976                ┆ true       │
│ 2023-08-01 00:00:00 ┆ 94.456867 ┆ 2817       ┆ 2976                ┆ false      │
│ 2023-09-01 00:00:00 ┆ 94.345374 ┆ 2746       ┆ 2880                ┆ true       │
│ 2023-10-01 00:00:00 ┆ 96.038509 ┆ 2822       ┆ 2976                ┆ false      │
│ 2023-11-01 00:00:00 ┆ 84.721752 ┆ 1          ┆ 2880                ┆ false      │
└─────────────────────┴───────────┴────────────┴─────────────────────┴────────────┘

Time anchoring

Choose the time anchor of the aggregated TimeFrame, which you may want to be different than the input TimeFrame. For example, meteorological observations are often considered as end anchored - where the value is considered valid up to the given timestamp. When producing a daily mean from this data, it may make more sense for the result to use a start anchor - indicating the value is valid from the start of the day to the end of the day.

See the concepts page page for more information about time anchors.

Note

If omitted, the aggregation uses the input TimeFrame’s time anchor

Time window

Restrict which time-of-day observations are included in each aggregation period using a time window. This is useful when only observations from certain hours of the day should contribute to the aggregated value - for example, computing a daily mean from daytime-only observations.

The time window is defined by a start time and end time, with an optional closed parameter controlling which boundaries are inclusive (default: both).

Note

time_window is only supported when the aggregation period is daily or longer and the data periodicity is sub-daily.

Example

Using the 15-minute flow example data:

tf_agg = tf.aggregate("P1D", "mean", "flow", time_window=(time(9, 0), time(17, 0)))
shape: (1_156, 5)
┌─────────────────────┬────────────┬────────────┬─────────────────────┬────────────┐
│ time                ┆ mean_flow  ┆ count_flow ┆ expected_count_time ┆ valid_flow │
│ ---                 ┆ ---        ┆ ---        ┆ ---                 ┆ ---        │
│ datetime[ns]        ┆ f64        ┆ u32        ┆ u32                 ┆ bool       │
╞═════════════════════╪════════════╪════════════╪═════════════════════╪════════════╡
│ 2020-09-01 00:00:00 ┆ 104.495131 ┆ 33         ┆ 33                  ┆ true       │
│ 2020-09-02 00:00:00 ┆ 115.098678 ┆ 31         ┆ 33                  ┆ true       │
│ 2020-09-03 00:00:00 ┆ 107.760988 ┆ 32         ┆ 33                  ┆ true       │
│ 2020-09-04 00:00:00 ┆ 90.280964  ┆ 33         ┆ 33                  ┆ true       │
│ 2020-09-05 00:00:00 ┆ 76.320783  ┆ 32         ┆ 33                  ┆ true       │
│ …                   ┆ …          ┆ …          ┆ …                   ┆ …          │
│ 2023-10-27 00:00:00 ┆ 88.629423  ┆ 33         ┆ 33                  ┆ true       │
│ 2023-10-28 00:00:00 ┆ 106.304566 ┆ 31         ┆ 33                  ┆ true       │
│ 2023-10-29 00:00:00 ┆ 115.172355 ┆ 31         ┆ 33                  ┆ true       │
│ 2023-10-30 00:00:00 ┆ 106.88818  ┆ 33         ┆ 33                  ┆ true       │
│ 2023-10-31 00:00:00 ┆ 87.371564  ┆ 32         ┆ 33                  ┆ true       │
└─────────────────────┴────────────┴────────────┴─────────────────────┴────────────┘

The expected_count_<time> column reflects the number of observations expected within the window for each period - not the full period. This means that missing_criteria checks are automatically applied relative to the windowed expectation.

Rolling aggregation

All aggregation functions can also be applied as rolling (sliding window) operations via rolling_aggregate(). Unlike aggregate(), rolling aggregation preserves the original resolution and timestamps.

See Rolling Aggregation for a full guide including alignment options and worked examples.