A window transform collects messages over a fixed time interval and produces a single output message with aggregated values. Instead of forwarding every reading individually, you can compute statistics like averages, minimums, or counts and send one consolidated result downstream.
Prerequisites
- A default registry endpoint named
default that points to mcr.microsoft.com is automatically created during deployment.
Scaling limitation for stateful graphs
Important
Data flow graphs that contain a window transform are stateful—each instance accumulates messages independently. When the data flow profile has an instance count greater than one, messages are distributed across instances through shared subscriptions. Because each instance maintains its own aggregation state and the instances don't share state with each other, each instance only sees a subset of the messages. This means aggregation results such as averages, sums, and counts are computed over a partial data set and are incorrect.
To ensure correct aggregation results, set the data flow profile instance count to 1 for any data flow graph that uses a window transform.
Use a window transform when you receive high-frequency sensor data and want to reduce the volume before sending it downstream. Common scenarios include:
- Compute averages: A temperature sensor publishes every second, but your cloud application only needs a 30-second average.
- Track extremes: You want the minimum and maximum pressure readings over each one-minute interval.
- Count events: You need to know how many door-open events occurred in the last five minutes.
The window transform has two internal steps connected in sequence:
- Delay: Sets the tumbling window duration. Incoming messages are assigned to a window boundary based on their timestamp.
- Accumulate: Applies your aggregation rules when the window closes. All messages in the window are reduced to a single output message.
Note
The delay step aligns message timestamps to window boundaries. If a message arrives 7 seconds into a 10-second window, it's assigned to the 10-second boundary.
The delay step controls how long each tumbling window lasts. The configuration key is delay (not rules).
In the window transform configuration, set the Window duration in seconds. For example, set it to 30 for a 30-second tumbling window.
The CLI applies the whole graph from one config file, so add this to the window transform node's configuration in your graph.json and apply it with az iot ops dataflowgraph apply.
The rules are a JSON object:
{
"type": "duration",
"delaySeconds": 30
}
These rules go in the value field as an escaped string:
"configuration": [
{
"key": "delay",
"value": "{\"type\":\"duration\",\"delaySeconds\":30}"
}
]
Tip
To generate the escaped string, save the rules to a file like rules.json, then run jq -c . rules.json and paste the single-line output into the value field.
configuration: [
{
key: 'delay'
value: '{"type":"duration","delaySeconds":30}'
}
]
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
{
"type": "duration",
"delaySeconds": 30
}
| Property |
Type |
Description |
type |
string |
Must be "duration". |
delaySeconds |
uint64 |
The tumbling window size in seconds. Must be greater than 0. |
Define accumulation rules
Each accumulation rule specifies how to reduce a window of messages into a single output value. The configuration key is rules.
In the window transform configuration, add accumulation rules. For each rule, specify:
| Setting |
Description |
| Input |
The field to aggregate (for example, temperature). |
| Output |
The output field name (for example, avgTemperature). |
| Expression |
The aggregation function (for example, average($1)). |
The CLI applies the whole graph from one config file, so add this to the window transform node's configuration in your graph.json and apply it with az iot ops dataflowgraph apply.
The rules are a JSON object:
{
"accumulate": [
{
"inputs": ["temperature"],
"output": "avgTemperature",
"expression": "average($1)"
}
]
}
These rules go in the value field as an escaped string:
{
"key": "rules",
"value": "{\"accumulate\":[{\"inputs\":[\"temperature\"],\"output\":\"avgTemperature\",\"expression\":\"average($1)\"}]}"
}
{
key: 'rules'
value: '{"accumulate":[{"inputs":["temperature"],"output":"avgTemperature","expression":"average($1)"}]}'
}
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
{
"accumulate": [
{
"inputs": ["temperature"],
"output": "avgTemperature",
"expression": "average($1)"
}
]
}
| Property |
Required |
Description |
inputs |
Yes |
List of field paths to read from each incoming message. |
output |
Yes |
Field path for the aggregated result. Each rule must have a unique output. |
expression |
Yes |
Formula that reduces input values across the window to a single scalar. Must contain at least one aggregation function. |
description |
No |
Human-readable description. |
Unlike map rules, expression is required for every accumulation rule. Using $1 alone isn't valid because it references a collection of values, not a single scalar. You must wrap it in an aggregation function like average($1).
Aggregation functions
| Function |
Returns |
Empty window behavior |
average |
Mean of numeric values |
Error |
sum |
Sum of numeric values |
0.0 |
min |
Minimum numeric value |
Error |
max |
Maximum numeric value |
Error |
count |
Count of messages where the field exists |
0 |
first |
First value in the window |
Error |
last |
Last value in the window |
Error |
Each function takes a single positional variable as its argument ($1 for the first input, $2 for the second, and so on).
Non-numeric values: The average, sum, min, and max functions silently skip non-numeric values.
Presence-based functions: count, first, and last operate on field presence regardless of value type.
Combine aggregations
You can combine multiple aggregation functions in a single expression:
Add a rule with inputs temperature and humidity, and expression average($1) + max($2).
The CLI applies the whole graph from one config file, so add this to the corresponding place in your graph.json and apply it with az iot ops dataflowgraph apply:
{
"inputs": [
"temperature",
"humidity"
],
"output": "tempHumidityIndex",
"expression": "average($1) + max($2)"
}
{
inputs: [ 'temperature', 'humidity' ]
output: 'tempHumidityIndex'
expression: 'average($1) + max($2)'
}
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
- inputs:
- temperature # $1
- humidity # $2
output: tempHumidityIndex
expression: "average($1) + max($2)"
To convert an aggregated value, apply the conversion function outside the aggregation. For example, cToF(average($1)) converts the average temperature to Fahrenheit.
Each aggregation function must reference a single positional variable directly. average($1) + max($2) is valid, but average($1 + $2) isn't.
Differences from map rules
| Capability |
Map rules |
Accumulation rules |
| Expression required |
No |
Yes |
| Wildcard inputs |
Supported |
Not supported |
$metadata access |
Supported |
Not supported |
$context enrichment |
Supported |
Not supported |
? $last directive |
Supported |
Not supported |
| Output content type |
Matches input |
Always application/json |
Full configuration example
A complete window configuration that computes temperature statistics over a 30-second window.
If the window receives these three messages:
{ "temperature": 21.5 }
{ "temperature": 23.0 }
{ "temperature": 19.8 }
The output message is:
{
"avgTemperature": 21.433333333333334,
"minTemperature": 19.8,
"maxTemperature": 23.0,
"readingCount": 3,
"tempRange": 3.2
}
In the Operations experience, create a data flow graph with a window transform:
- Add a source that reads from
telemetry/temperature.
- Add a window transform. Set the window duration to 30 seconds. Configure accumulation rules for average, min, max, count, and range on the
temperature field.
- Add a destination that sends to
telemetry/aggregated.
The Azure CLI applies a data flow graph from a single JSON config file. Create a graph.json file with the graph properties. In the graph.json file, each transform's rules are stored in the value field as an escaped JSON string. For the readable form of each transform's rules, see the how-to for that transform type.
{
"mode": "Enabled",
"nodes": [
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"telemetry/temperature"
]
}
},
{
"nodeType": "Graph",
"name": "aggregate",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-window:1.0.0",
"configuration": [
{
"key": "delay",
"value": "{\"type\":\"duration\",\"delaySeconds\":30}"
},
{
"key": "rules",
"value": "{\"accumulate\":[{\"inputs\":[\"temperature\"],\"output\":\"avgTemperature\",\"expression\":\"average($1)\"},{\"inputs\":[\"temperature\"],\"output\":\"minTemperature\",\"expression\":\"min($1)\"},{\"inputs\":[\"temperature\"],\"output\":\"maxTemperature\",\"expression\":\"max($1)\"},{\"inputs\":[\"temperature\"],\"output\":\"readingCount\",\"expression\":\"count($1)\"},{\"inputs\":[\"temperature\"],\"output\":\"tempRange\",\"expression\":\"max($1) - min($1)\"}]}"
}
]
}
},
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "telemetry/aggregated"
}
}
],
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "aggregate"
}
},
{
"from": {
"name": "aggregate"
},
"to": {
"name": "output"
}
}
]
}
Apply the config file. The extendedLocation is added automatically from the instance and resource group, so don't include it in the file.
az iot ops dataflowgraph apply \
--name temperature-window \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP> \
--config-file graph.json
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2025-10-01' = {
name: 'temperature-window'
parent: dataflowProfile
properties: {
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [ 'telemetry/temperature' ]
}
}
{
nodeType: 'Graph'
name: 'aggregate'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-window:1.0.0'
configuration: [
{
key: 'delay'
value: '{"type":"duration","delaySeconds":30}'
}
{
key: 'rules'
value: '{"accumulate":[{"inputs":["temperature"],"output":"avgTemperature","expression":"average($1)"},{"inputs":["temperature"],"output":"minTemperature","expression":"min($1)"},{"inputs":["temperature"],"output":"maxTemperature","expression":"max($1)"},{"inputs":["temperature"],"output":"readingCount","expression":"count($1)"},{"inputs":["temperature"],"output":"tempRange","expression":"max($1) - min($1)"}]}'
}
]
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/aggregated'
}
}
]
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'aggregate' } }
{ from: { name: 'aggregate' }, to: { name: 'output' } }
]
}
}
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: temperature-window
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- nodeType: Graph
name: aggregate
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-window:1.0.0
configuration:
- key: delay
value: |
{
"type": "duration",
"delaySeconds": 30
}
- key: rules
value: |
{
"accumulate": [
{
"inputs": ["temperature"],
"output": "avgTemperature",
"expression": "average($1)"
},
{
"inputs": ["temperature"],
"output": "minTemperature",
"expression": "min($1)"
},
{
"inputs": ["temperature"],
"output": "maxTemperature",
"expression": "max($1)"
},
{
"inputs": ["temperature"],
"output": "readingCount",
"expression": "count($1)"
},
{
"inputs": ["temperature"],
"output": "tempRange",
"expression": "max($1) - min($1)"
}
]
}
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/aggregated
nodeConnections:
- from: { name: sensors }
to: { name: aggregate }
- from: { name: aggregate }
to: { name: output }
Next steps