Filter Plugins

Filter plugins provide a mechanism to alter the data stream as it flows through a foglamp instance, filters may be applied in south or north micro-services and may form a pipeline of multiple processing elements through which the data flows. Filters applied in a south service will only process data that is received by the south service, whilst filters placed in the north will process all data that flows out of that north interface.

Filters may;

  • augment data by adding static metadata or calculated values to the data
  • remove data from the stream
  • add data to the stream
  • modify data in the stream

It should be noted that there are some alternatives to creating a filter if you wish to make simple changes to the data stream. There are a number of existing filters that provide a degree of programmability. These include the expression filter which allows an arbitrary mathematical formula to be applied to the data or the Python 3.5 filter which allows a small include Python script to be applied to the data.

Filter plugins may be written in C++ or Python and have a very simple interface. The plugin mechanism and a subset of the API is common between all types of plugins including filters.

Configuration

Filters use the same configuration mechanism as the rest of FogLAMP, using a JSON document to describe the configuration parameters. As with any other plugin the structure is defined by the plugin and retrieve by the plugin_info entry point. This is then matched with the database content to pass the configured values to the plugin_init entry point.

C++ Filter Plugin API

The filter API consists of a small number of C function entry points, these are called in a strict order and based on the same set of common API entry points for all FogLAMP plugins.

Plugin Information

The plugin_info entry point is the first entry point that is called in a filter plugin and returns the plugin information structure. This is the exact same call that every FogLAMP plugin must support and is used to determine the type of the plugin and the configuration category defaults for the plugin.

A typical implementation of plugin_info would merely return a pointer to a static PLUGIN_INFORMATION structure.

PLUGIN_INFORMATION *plugin_info()
{
     return &info;
}

Plugin Initialise

The plugin_init entry point is called after plugin_info has been called and before any data is passed to the filter. It is called at the phase where the service is setting up the filter pipeline and provides the filter with its configuration category that now contains the user supplied values and the destination to which the filter will send the output of the filter.

PLUGIN_HANDLE plugin_init(ConfigCategory* config,
                       OUTPUT_HANDLE *outHandle,
                       OUTPUT_STREAM output)
{
}

The config parameter is the configuration category with the user supplied values inserted, the outHandle is a handle for the next filter in the chain and the output is a function pointer to call to send the data to the next filter in the chain. The outHandle and output arguments should be stored for future use in the plugin_ingest when data is to be forwarded within the pipeline.

The plugin_init function returns a handle that will be passed to all subsequent plugin calls. This handle can be used to store state that needs to be passed between calls. Typically the plugin_init call will create a C++ class that implements the filter and return a point to the instance as the handle. The instance can then be used to store the state of the filter, including the output handle and callback that needs to be used.

Filter classes can also be used to buffer data between calls to the plugin_ingest entry point, allowing a filter to defer the processing of the data until it has a sufficient quantity of buffered data available to it.

Plugin Ingest

The plugin_ingest entry point is the workhorse of the filter, it is called with sets of readings to process and then passes on the new set of readings to the next filter in the pipeline. The process of passing on the data to the next filter is via the OUTPUT_STREAM function pointer. A filter does not have to output data each time it ingests data, it is free to output no data or to output more or less data than it was called with.

void plugin_ingest(PLUGIN_HANDLE *handle,
                READINGSET *readingSet)
{
}

The number of readings that a filter is called with will depend on the environment it is run in and what any filters earlier in the filter pipeline have produced. A filter that requires a particular sample size in order to process a result should therefore be prepared to buffer data across multiple calls to plugin_ingest. Several examples of filters that so this are available for reference.

The plugin_ingest call may send data onwards in the filter pipeline by using the stored output and outHandle parameters passed to plugin_init.

(*output)(outHandle, readings);

Plugin Reconfigure

As with other plugin types the filter may be reconfigured during its lifetime. When a reconfiguration operation occurs the plugin_reconfigure method will be called with the new configuration for the filter.

void plugin_reconfigure(PLUGIN_HANDLE *handle, const std::string& newConfig)
{
}

Plugin Shutdown

As with other plugins a shutdown call exists which may be used by the plugin to perform any cleanup that is required when the filter is shut down.

void plugin_shutdown(PLUGIN_HANDLE *handle)
{
}

C++ Helper Class

It is expected that filters will be written as C++ classes, with the plugin handle being used a a mechanism to store and pass the pointer to the instance of the filter class. In order to make it easier to write filters a base FogLAMPFilter class has been provided, it is recommended that you derive your specific filter class from this base class in order to simplify the implementation

class FogLAMPFilter {
        public:
                FogLAMPFilter(const std::string& filterName,
                              ConfigCategory& filterConfig,
                              OUTPUT_HANDLE *outHandle,
                              OUTPUT_STREAM output);
                ~FogLAMPFilter() {};
                const std::string&
                                    getName() const { return m_name; };
                bool                isEnabled() const { return m_enabled; };
                ConfigCategory&     getConfig() { return m_config; };
                void                disableFilter() { m_enabled = false; };
                void                setConfig(const std::string& newConfig);
        public:
                OUTPUT_HANDLE*      m_data;
                OUTPUT_STREAM       m_func;
        protected:
                std::string         m_name;
                ConfigCategory      m_config;
                bool                m_enabled;
};

C++ Filter Example

The following example is a simple data processing example. It applies the log() function to numeric data in the data stream

Plugin Interface

Most plugins written in C++ have a source file that encapsulates the C API to the plugin, this is traditionally called plugin.cpp. The example plugin follows this model with the content of plugin.cpp shown below.

The first section includes the filter class that is the actual implementation of the filter logic and defines the JSON configuration category. This uses the QUOTE macro in order to make the JSON definition more readable.

/*
 * FogLAMP "log" filter plugin.
 *
 * Copyright (c) 2020 Dianomic Systems
 *
 * Released under the Apache 2.0 Licence
 *
 * Author: Mark Riddoch
 */

#include <logFilter.h>
#include <version.h>

#define FILTER_NAME "log"
const static char *default_config = QUOTE({
                "plugin" : {
                        "description" : "Log filter plugin",
                        "type" : "string",
                        "default" : FILTER_NAME,
                        "readonly": "true"
                        },
                 "enable": {
                        "description": "A switch that can be used to enable or disable execution of the log filter.",
                        "type": "boolean",
                        "displayName": "Enabled",
                        "default": "false"
                        },
                "match" : {
                        "description" : "An optional regular expression to match in the asset name.",
                        "type": "string",
                        "default": "",
                        "order": "1",
                        "displayName": "Asset filter"}
                });

using namespace std;

We then define the plugin information contents that will be returned by the plugin_info call.

/**
 * The Filter plugin interface
 */
extern "C" {

/**
 * The plugin information structure
 */
static PLUGIN_INFORMATION info = {
        FILTER_NAME,              // Name
        VERSION,                  // Version
        0,                        // Flags
        PLUGIN_TYPE_FILTER,       // Type
        "1.0.0",                  // Interface version
        default_config                 // Default plugin configuration
};

The final section of this file consists of the entry points themselves and the implementation. The majority of this consist of calls to the LogFilter class that in this case implements the logic of the filter.

/**
 * Return the information about this plugin
 */
PLUGIN_INFORMATION *plugin_info()
{
        return &info;
}

/**
 * Initialise the plugin, called to get the plugin handle.
 * We merely create an instance of our LogFilter class
 *
 * @param config     The configuration category for the filter
 * @param outHandle  A handle that will be passed to the output stream
 * @param output     The output stream (function pointer) to which data is passed
 * @return           An opaque handle that is used in all subsequent calls to the plugin
 */
PLUGIN_HANDLE plugin_init(ConfigCategory* config,
                          OUTPUT_HANDLE *outHandle,
                          OUTPUT_STREAM output)
{
        LogFilter *log = new LogFilter(FILTER_NAME,
                                        *config,
                                        outHandle,
                                        output);

        return (PLUGIN_HANDLE)log;
}

/**
 * Ingest a set of readings into the plugin for processing
 *
 * @param handle     The plugin handle returned from plugin_init
 * @param readingSet The readings to process
 */
void plugin_ingest(PLUGIN_HANDLE *handle,
                   READINGSET *readingSet)
{
        LogFilter *log = (LogFilter *) handle;
        log->ingest(readingSet);
}

/**
 * Plugin reconfiguration method
 *
 * @param handle     The plugin handle
 * @param newConfig  The updated configuration
 */
void plugin_reconfigure(PLUGIN_HANDLE *handle, const std::string& newConfig)
{
        LogFilter *log = (LogFilter *)handle;
        log->reconfigure(newConfig);
}

/**
 * Call the shutdown method in the plugin
 */
void plugin_shutdown(PLUGIN_HANDLE *handle)
{
        LogFilter *log = (LogFilter *) handle;
        delete log;
}

// End of extern "C"
};

Filter Class

Although it is not mandatory it is good practice to encapsulate the filter login in a class, these classes are derived from the FogLAMPFilter class

#ifndef _LOG_FILTER_H
#define _LOG_FILTER_H
/*
 * FogLAMP "Log" filter plugin.
 *
 * Copyright (c) 2020 Dianomic Systems
 *
 * Released under the Apache 2.0 Licence
 *
 * Author: Mark Riddoch
 */
#include <filter.h>
#include <reading_set.h>
#include <config_category.h>
#include <string>
#include <logger.h>
#include <mutex>
#include <regex>
#include <math.h>


/**
 * Convert the incoming data to use a logarithmic scale
 */
class LogFilter : public FogLAMPFilter {
        public:
                LogFilter(const std::string& filterName,
                        ConfigCategory& filterConfig,
                        OUTPUT_HANDLE *outHandle,
                        OUTPUT_STREAM output);
                ~LogFilter();
                void ingest(READINGSET *readingSet);
                void reconfigure(const std::string& newConfig);
        private:
                void                         handleConfig(ConfigCategory& config);
                std::string                  m_match;
                std::regex                   *m_regex;
                std::mutex                   m_configMutex;
};


#endif

Filter Class Implementation

The following is the code that implements the filter logic

/*
 * FogLAMP "Log" filter plugin.
 *
 * Copyright (c) 2020 Dianomic Systems
 *
 * Released under the Apache 2.0 Licence
 *
 * Author: Mark Riddoch
 */
#include <logFilter.h>

using namespace std;

/**
 * Constructor for the LogFilter.
 *
 * We call the constructor of the base class and handle the initial
 * configuration of the filter.
 *
 * @param    filterName      The name of the filter
 * @param    filterConfig    The configuration category for this filter
 * @param    outHandle       The handle of the next filter in the chain
 * @param    output          A function pointer to call to output data to the next filter
 */
LogFilter::LogFilter(const std::string& filterName,
                        ConfigCategory& filterConfig,
                        OUTPUT_HANDLE *outHandle,
                        OUTPUT_STREAM output) : m_regex(NULL),
                                FogLAMPFilter(filterName, filterConfig, outHandle, output)
{
        handleConfig(filterConfig);
}

/**
 * Destructor for this filter class
 */
LogFilter::~LogFilter()
{
        if (m_regex)
                delete m_regex;
}

/**
 * The actual filtering code
 *
 * @param readingSet The reading data to filter
 */
void
LogFilter::ingest(READINGSET *readingSet)
{
        lock_guard<mutex> guard(m_configMutex);

        if (isEnabled())     // Filter enable, process the readings
        {
                const vector<Reading *>& readings = ((ReadingSet *)readingSet)->getAllReadings();
                for (vector<Reading *>::const_iterator elem = readings.begin();
                                elem != readings.end(); ++elem)
                {
                        // If we set a matching regex then compare to the name of this asset
                        if (!m_match.empty())
                        {
                                string asset = (*elem)->getAssetName();
                                if (!regex_match(asset, *m_regex))
                                {
                                        continue;
                                }
                        }

                        // We are modifying this asset so put an entry in the asset tracker
                        AssetTracker::getAssetTracker()->addAssetTrackingTuple(getName(), (*elem)->getAssetName(), string("Filter"));

                        // Get a reading DataPoints
                        const vector<Datapoint *>& dataPoints = (*elem)->getReadingData();

                        // Iterate over the datapoints
                        for (vector<Datapoint *>::const_iterator it = dataPoints.begin(); it != dataPoints.end(); ++it)
                        {
                                // Get the reference to a DataPointValue
                                DatapointValue& value = (*it)->getData();

                                /*
                                 * Deal with the T_INTEGER and T_FLOAT types.
                                 * Try to preserve the type if possible but
                                 * if a floating point log function is applied
                                 * then T_INTEGER values will turn into T_FLOAT.
                                 * If the value is zero we do not apply the log function
                                 */
                                if (value.getType() == DatapointValue::T_INTEGER)
                                {
                                        long ival = value.toInt();
                                        if (ival != 0)
                                        {
                                                double newValue = log((double)ival);
                                                value.setValue(newValue);
                                        }
                                }
                                else if (value.getType() == DatapointValue::T_FLOAT)
                                {
                                        double dval = value.toDouble();
                                        if (dval != 0.0)
                                        {
                                                value.setValue(log(dval));
                                        }
                                }
                                else
                                {
                                        // do nothing for other types
                                }
                        }
                }
        }

        // Pass on all readings in this case
        (*m_func)(m_data, readingSet);
}

/**
 * Reconfiguration entry point to the filter.
 *
 * This method runs holding the configMutex to prevent
 * ingest using the regex class that may be destroyed by this
 * call.
 *
 * Pass the configuration to the base FilterPlugin class and
 * then call the private method to handle the filter specific
 * configuration.
 *
 * @param newConfig  The JSON of the new configuration
 */
void
LogFilter::reconfigure(const std::string& newConfig)
{
        lock_guard<mutex> guard(m_configMutex);
        setConfig(newConfig);                // Pass the configuration to the base class
        handleConfig(m_config);
}

/**
 * Handle the filter specific configuration. In this case
 * it is just the single item "match" that is a regex
 * expression
 *
 * @param config     The configuration category
 */
void
LogFilter::handleConfig(ConfigCategory& config)
{
        if (config.itemExists("match"))
        {
                m_match = config.getValue("match");
                if (m_regex)
                        delete m_regex;
                m_regex = new regex(m_match);
        }
}

Python Filter API

Filters may also be written in Python, the API is very similar to that of a C++ filter and consists of the same set of entry points.

Plugin Information

As with C++ filters this is the first entry point called, it returns a Python dictionary that describes the filter.

def plugin_info():
    """ Returns information about the plugin
    Args:
    Returns:
        dict: plugin information
    Raises:
    """

Plugin Initialisation

The plugin_init call is used to pass the resolved configuration to the plugin and also pass in the handle of the next filter in the pipeline and a callback that should be called with the output data of the filter.

def plugin_init(config, ingest_ref, callback):
    """ Initialise the plugin
    Args:
        config: JSON configuration document for the Filter plugin configuration category
        ingest_ref:
        callback:
    Returns:
        data: JSON object to be used in future calls to the plugin
    Raises:
    """

Plugin Ingestion

The plugin_ingest method is used to pass data into the plugin, the plugin will then process that data and call the callback that was passed into the plugin_init entry point with the ingest_ref handle and the data to send along the filter pipeline.

def plugin_ingest(handle, data):
    """ Modify readings data and pass it onward

    Args:
        handle: handle returned by the plugin initialisation call
        data: readings data
    """

The data is arranged as an array of Python dictionaries, each of which is a Reading. Typically the data can be processed by traversing the array

for elem in data:
    process(elem)

Plugin Reconfigure

The plugin_reconfigure entry point is called whenever a configuration change occurs for the filters configuration category.

def plugin_reconfigure(handle, new_config):
    """ Reconfigures the plugin

    Args:
        handle: handle returned by the plugin initialisation call
        new_config: JSON object representing the new configuration category for the category
    Returns:
        new_handle: new handle to be used in the future calls
    """

Plugin Shutdown

Called when the plugin is to be shutdown to allow it to perform any cleanup operations.

def plugin_shutdown(handle):
    """ Shutdowns the plugin doing required cleanup.

    Args:
        handle: handle returned by the plugin initialisation call
    Returns:
        plugin shutdown
    """

Python Filter Example

The following is an example of a Python filter that calculates an exponential moving average.

# -*- coding: utf-8 -*-

# FogLAMP_BEGIN
# See: http://foglamp.readthedocs.io/
# FogLAMP_END

""" Module for EMA filter plugin

Generate Exponential Moving Average
The rate value (x) allows to include x% of current value
and (100-x)% of history
A datapoint called 'ema' is added to each reading being filtered
"""

import time
import copy
import logging

from foglamp.common import logger
import filter_ingest

__author__ = "Massimiliano Pinto"
__copyright__ = "Copyright (c) 2020 Dianomic Systems"
__license__ = "Apache 2.0"
__version__ = "${VERSION}"

_LOGGER = logger.setup(__name__, level = logging.WARN)

# Filter specific objects
the_callback = None
the_ingest_ref = None

# latest ema value
latest = None
# rate value
rate = None
# datapoint name
datapoint = None
# plugin shutdown indicator
shutdown_in_progress = False

_DEFAULT_CONFIG = {
    'plugin': {
        'description': 'Exponential Moving Average filter plugin',
        'type': 'string',
        'default': 'ema',
        'readonly': 'true'
    },
    'enable': {
        'description': 'Enable ema plugin',
        'type': 'boolean',
        'default': 'false',
        'displayName': 'Enabled',
        'order': "3"
    },
    'rate': {
        'description': 'Rate value: include % of current value',
        'type': 'float',
        'default': '0.07',
        'displayName': 'Rate',
        'order': "2"
    },
    'datapoint': {
        'description': 'Datapoint name for calculated ema value',
        'type': 'string',
        'default': 'ema',
        'displayName': 'EMA datapoint',
        'order': "1"
    }
}


def compute_ema(reading):
    """ Compute EMA

    Args:
        A reading data
    """
    global rate, latest, datapoint
    for attribute in list(reading):
        if not latest:
            latest = reading[attribute]
        latest = reading[attribute] * rate + latest * (1 - rate)
        reading[datapoint] = latest


def plugin_info():
    """ Returns information about the plugin
    Args:
    Returns:
        dict: plugin information
    Raises:
    """
    return {
        'name': 'ema',
        'version': '1.8.2',
        'mode': "none",
        'type': 'filter',
        'interface': '1.0',
        'config': _DEFAULT_CONFIG
    }


def plugin_init(config, ingest_ref, callback):
    """ Initialise the plugin
    Args:
        config: JSON configuration document for the Filter plugin configuration category
        ingest_ref:
        callback:
    Returns:
        data: JSON object to be used in future calls to the plugin
    Raises:
    """
    data = copy.deepcopy(config)

    global the_callback, the_ingest_ref, rate, datapoint

    the_callback = callback
    the_ingest_ref = ingest_ref
    rate = float(config['rate']['value'])
    datapoint = config['datapoint']['value']

    _LOGGER.debug("plugin_init for filter EMA called")

    return data


def plugin_reconfigure(handle, new_config):
    """ Reconfigures the plugin

    Args:
        handle: handle returned by the plugin initialisation call
        new_config: JSON object representing the new configuration category for the category
    Returns:
        new_handle: new handle to be used in the future calls
    """
    global rate, datapoint
    rate = float(new_config['rate']['value'])
    datapoint = new_config['datapoint']['value']
    _LOGGER.debug("Old config for ema plugin {} \n new config {}".format(handle, new_config))
    new_handle = copy.deepcopy(new_config)

    return new_handle


def plugin_shutdown(handle):
    """ Shutdowns the plugin doing required cleanup.

    Args:
        handle: handle returned by the plugin initialisation call
    Returns:
        plugin shutdown
    """
    global shutdown_in_progress, the_callback, the_ingest_ref, rate, latest, datapoint
    shutdown_in_progress = True
    time.sleep(1)
    the_callback = None
    the_ingest_ref = None
    rate = None
    latest = None
    datapoint = None

    _LOGGER.info('filter ema plugin shutdown.')


def plugin_ingest(handle, data):
    """ Modify readings data and pass it onward

    Args:
        handle: handle returned by the plugin initialisation call
        data: readings data
    """
    global shutdown_in_progress, the_callback, the_ingest_ref
    if shutdown_in_progress:
        return

    if handle['enable']['value'] == 'false':
        # Filter not enabled, just pass data onwards
        filter_ingest.filter_ingest_callback(the_callback, the_ingest_ref, data)
        return

    # Filter is enabled: compute EMA for each reading
    for elem in data:
        compute_ema(elem['readings'])

    # Pass data onwards
    filter_ingest.filter_ingest_callback(the_callback, the_ingest_ref, data)

    _LOGGER.debug("ema filter_ingest done")