Consolidation Filter Plugin¶
Overview¶
The Consolidation Filter is designed to consolidate multiple asset readings into a single reading. This is particularly useful for reducing the number of MQTT messages sent when processing statistics history data or any scenario where multiple assets need to be combined.
The filter monitors incoming assets and stores them in a buffer until one of two events occurs:
Duplicate Asset Detection: When an asset name that already exists in the buffer is encountered
Timeout: When a configured timeout period has elapsed since the oldest asset in the buffer was received
When either event occurs, all buffered readings are consolidated into a single reading and sent onwards. The buffer is then cleared and the process repeats.
On shutdown, any remaining buffered readings are automatically flushed and sent.
Configuration¶
|
The filter supports the following configuration options:
- enable (boolean, default: false)
Enable or disable the filter execution.
- assetName (string, default: “consolidated”)
The name of the asset to use for the consolidated reading output.
- datapointNaming (enumeration, default: “original”)
Specifies how datapoints in the consolidated reading should be named:
original: Use the original datapoint name. If a name clash occurs (same datapoint name from different assets), an error is logged and the conflicting datapoint is skipped.
prepend: Prepend the asset name to the datapoint name, separated by an underscore. For example, if asset “sensor1” has datapoint “value”, the consolidated datapoint will be named “sensor1_value”.
asset: Use the asset name as the datapoint name. This mode requires that each reading contains exactly one datapoint. If a reading has multiple datapoints, an error is logged and the reading is skipped.
- timeout (integer, default: 0)
Timeout in seconds. If set to a positive value, the filter will automatically flush the buffer when the timeout period has elapsed since the oldest asset in the buffer was received. If set to 0, the timeout mechanism is disabled and the filter will only flush on duplicate asset detection.
Use Cases¶
Statistics History Consolidation: Combine multiple statistics assets into a single consolidated message to reduce MQTT traffic.
Batch Processing: Accumulate readings from multiple assets over a time period before sending them as a batch.
Data Aggregation: Combine related readings from different sources into a unified output format.
Examples¶
Example 1: Basic Consolidation¶
- Configuration:
enable: true assetName: “statistics” datapointNaming: “prepend” timeout: “”
This configuration will: - Consolidate readings when a duplicate asset name is detected - Name output asset as “statistics” - Prepend asset names to datapoint names (e.g., “cpu_temperature” + “value” = “cpu_temperature_value”) - No timeout (only flushes on duplicate detection)
Example 2: Time-based Consolidation¶
- Configuration:
enable: true assetName: “batch” datapointNaming: “original” timeout: “60”
This configuration will: - Consolidate readings every 60 seconds OR when a duplicate is detected (whichever occurs first) - Name output asset as “batch” - Use original datapoint names (will skip on name clashes) - Timeout is set to 60 seconds
Example 3: Asset Name as Datapoint¶
- Configuration:
enable: true assetName: “combined” datapointNaming: “asset” timeout: “30”
This configuration will: - Consolidate readings every 30 seconds OR when a duplicate is detected - Name output asset as “combined” - Use asset name as the datapoint name (requires single datapoint per reading) - Timeout is set to 30 seconds
Notes¶
The filter does not require predefined knowledge of the assets it will process
Each reading in the buffer is stored independently until consolidation
On shutdown, all remaining buffered readings are automatically sent
The timestamp of the consolidated reading is set to the current time
Original reading timestamps are preserved in the buffered copies but the final consolidated reading uses the current timestamp
