Some scenarios require messages to arrive on different MQTT topics depending on their content. For example, sensor readings above a critical threshold might need to go to an alerts topic, while normal readings go to a historian topic. With data flow graphs, you can set the output topic dynamically, even though the dataflow has a single destination.
How it works
A map transform can write to message metadata, including the MQTT topic, by using the $metadata.topic output path. The destination then uses the ${outputTopic} variable to publish to whatever topic the transform set.
Two pieces work together:
- Inside the transform: A map rule writes a string value to
$metadata.topic.
- In the destination: The
dataDestination field references ${outputTopic}, which resolves to the value the transform wrote.
The simplest approach uses one map transform with an if expression that picks the topic.
In the Operations experience, create a data flow graph:
- Add a source that reads from
sensors/temperature.
- Add a map transform with two rules:
- A wildcard passthrough rule (input
*, output *).
- A compute rule with input
temperature, output $metadata.topic, and expression if($1 > 1000, "alerts", "historian").
- Add a destination with topic
factory/${outputTopic}.
When the map transform writes "alerts" to $metadata.topic, the destination resolves factory/${outputTopic} to factory/alerts.
The Azure CLI applies a data flow graph from a single JSON config file. Create a graph.json file with the graph properties. In the graph.json file, each transform's rules are stored in the value field as an escaped JSON string. For the readable form of each transform's rules, see the how-to for that transform type.
{
"mode": "Enabled",
"nodes": [
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"sensors/temperature"
]
}
},
{
"nodeType": "Graph",
"name": "route-by-temperature",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"*\"],\"output\":\"*\"},{\"description\":\"Set topic based on temperature threshold\",\"inputs\":[\"temperature\"],\"output\":\"$metadata.topic\",\"expression\":\"if($1 > 1000, \\\"alerts\\\", \\\"historian\\\")\"}]}"
}
]
}
},
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "factory/${outputTopic}"
}
}
],
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "route-by-temperature"
}
},
{
"from": {
"name": "route-by-temperature"
},
"to": {
"name": "output"
}
}
]
}
Tip
To generate the escaped string, save the rules to a file like rules.json, then run jq -c . rules.json and paste the single-line output into the value field.
Apply the config file. The extendedLocation is added automatically from the instance and resource group, so don't include it in the file.
az iot ops dataflowgraph apply \
--name dynamic-topic-routing \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP> \
--config-file graph.json
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2026-03-01' = {
name: 'dynamic-topic-routing'
parent: dataflowProfile
properties: {
profileRef: dataflowProfileName
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [ 'sensors/temperature' ]
}
}
{
nodeType: 'Graph'
name: 'route-by-temperature'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"description":"Set topic based on temperature threshold","inputs":["temperature"],"output":"$metadata.topic","expression":"if($1 > 1000, \\"alerts\\", \\"historian\\")"}]}'
}
]
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'factory/${outputTopic}'
}
}
]
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'route-by-temperature' } }
{ from: { name: 'route-by-temperature' }, to: { name: 'output' } }
]
}
}
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: dynamic-topic-routing
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- sensors/temperature
- nodeType: Graph
name: route-by-temperature
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["*"],
"output": "*"
},
{
"description": "Set topic based on temperature threshold",
"inputs": ["temperature"],
"output": "$metadata.topic",
"expression": "if($1 > 1000, \"alerts\", \"historian\")"
}
]
}
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: "factory/${outputTopic}"
nodeConnections:
- from: { name: sensors }
to: { name: route-by-temperature }
- from: { name: route-by-temperature }
to: { name: output }
Option 2: Branch, map each path, and merge
If you need different transformations on each path (not just a different topic), use a branch transform to split the flow, a map transform on each arm to set the topic and apply path-specific rules, and a concatenate transform to merge the paths.
In the Operations experience:
- Add a source that reads from
sensors/temperature.
- Add a branch transform with condition
$1 > 1000 on the temperature field.
- On the true path, add a map transform with a wildcard passthrough and a rule that sets
$metadata.topic to "alerts".
- On the false path, add a map transform with a wildcard passthrough and a rule that sets
$metadata.topic to "historian".
- Add a concatenate transform to merge both paths.
- Add a destination with topic
factory/${outputTopic}.
The Azure CLI applies a data flow graph from a single JSON config file. Create a graph.json file with the graph properties. In the graph.json file, each transform's rules are stored in the value field as an escaped JSON string. For the readable form of each transform's rules, see the how-to for that transform type.
{
"mode": "Enabled",
"nodes": [
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"sensors/temperature"
]
}
},
{
"nodeType": "Graph",
"name": "check-temperature",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-branch:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"branch\":{\"inputs\":[\"temperature\"],\"expression\":\"$1 > 1000\",\"description\":\"Route critical temperatures to alerts\"}}"
}
]
}
},
{
"nodeType": "Graph",
"name": "set-alerts-topic",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"*\"],\"output\":\"*\"},{\"inputs\":[],\"output\":\"$metadata.topic\",\"expression\":\"\\\"alerts\\\"\"}]}"
}
]
}
},
{
"nodeType": "Graph",
"name": "set-historian-topic",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"*\"],\"output\":\"*\"},{\"inputs\":[],\"output\":\"$metadata.topic\",\"expression\":\"\\\"historian\\\"\"}]}"
}
]
}
},
{
"nodeType": "Graph",
"name": "merge",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-concatenate:1.0.0"
}
},
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "factory/${outputTopic}"
}
}
],
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "check-temperature"
}
},
{
"from": {
"name": "check-temperature.output.true"
},
"to": {
"name": "set-alerts-topic"
}
},
{
"from": {
"name": "check-temperature.output.false"
},
"to": {
"name": "set-historian-topic"
}
},
{
"from": {
"name": "set-alerts-topic"
},
"to": {
"name": "merge"
}
},
{
"from": {
"name": "set-historian-topic"
},
"to": {
"name": "merge"
}
},
{
"from": {
"name": "merge"
},
"to": {
"name": "output"
}
}
]
}
Apply the config file. The extendedLocation is added automatically from the instance and resource group, so don't include it in the file.
az iot ops dataflowgraph apply \
--name dynamic-topic-routing-branched \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP> \
--config-file graph.json
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2026-03-01' = {
name: 'dynamic-topic-routing-branched'
parent: dataflowProfile
properties: {
profileRef: dataflowProfileName
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [ 'sensors/temperature' ]
}
}
{
nodeType: 'Graph'
name: 'check-temperature'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-branch:1.0.0'
configuration: [
{
key: 'rules'
value: '{"branch":{"inputs":["temperature"],"expression":"$1 > 1000","description":"Route critical temperatures to alerts"}}'
}
]
}
}
{
nodeType: 'Graph'
name: 'set-alerts-topic'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"inputs":[],"output":"$metadata.topic","expression":"\\"alerts\\""}]}'
}
]
}
}
{
nodeType: 'Graph'
name: 'set-historian-topic'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"inputs":[],"output":"$metadata.topic","expression":"\\"historian\\""}]}'
}
]
}
}
{
nodeType: 'Graph'
name: 'merge'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-concatenate:1.0.0'
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'factory/${outputTopic}'
}
}
]
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'check-temperature' } }
{ from: { name: 'check-temperature.output.true' }, to: { name: 'set-alerts-topic' } }
{ from: { name: 'check-temperature.output.false' }, to: { name: 'set-historian-topic' } }
{ from: { name: 'set-alerts-topic' }, to: { name: 'merge' } }
{ from: { name: 'set-historian-topic' }, to: { name: 'merge' } }
{ from: { name: 'merge' }, to: { name: 'output' } }
]
}
}
Important
The use of Kubernetes deployment manifests isn't supported in production environments and should only be used for debugging and testing.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: dynamic-topic-routing-branched
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- sensors/temperature
- nodeType: Graph
name: check-temperature
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-branch:1.0.0
configuration:
- key: rules
value: |
{
"branch": {
"inputs": ["temperature"],
"expression": "$1 > 1000",
"description": "Route critical temperatures to alerts"
}
}
- nodeType: Graph
name: set-alerts-topic
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{ "inputs": ["*"], "output": "*" },
{ "inputs": [], "output": "$metadata.topic", "expression": "\"alerts\"" }
]
}
- nodeType: Graph
name: set-historian-topic
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{ "inputs": ["*"], "output": "*" },
{ "inputs": [], "output": "$metadata.topic", "expression": "\"historian\"" }
]
}
- nodeType: Graph
name: merge
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-concatenate:1.0.0
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: "factory/${outputTopic}"
nodeConnections:
- from: { name: sensors }
to: { name: check-temperature }
- from: { name: check-temperature.output.true }
to: { name: set-alerts-topic }
- from: { name: check-temperature.output.false }
to: { name: set-historian-topic }
- from: { name: set-alerts-topic }
to: { name: merge }
- from: { name: set-historian-topic }
to: { name: merge }
- from: { name: merge }
to: { name: output }
Which option to choose
| Consideration |
Option 1 (single map) |
Option 2 (branch + maps) |
| Simplicity |
Fewer nodes, simpler to read |
More nodes, more explicit |
| Topic-only routing |
Ideal |
Works, but more setup than needed |
| Different transforms per path |
Possible with nested if(), gets complex |
Natural: each branch has its own map rules |
| Adding more paths |
Chain if() calls |
Requires nested branches |
For straightforward topic routing based on a single condition, option 1 is simpler. Use option 2 when each path needs different processing beyond the topic name.
Topic translation details
The ${outputTopic} variable in dataDestination resolves to the full value of $metadata.topic as set by the last transform in the pipeline. You can also use segments with ${outputTopic.N} (1-indexed). For example, if the transform sets $metadata.topic to "region/west":
dataDestination |
Resolved topic |
factory/${outputTopic} |
factory/region/west |
factory/${outputTopic.1} |
factory/region |
factory/${outputTopic.2} |
factory/west |
If the topic variable can't be resolved (for example, $metadata.topic was never set), the message is dropped and an error is logged.
Next steps