CSV Writer

The plugin collects the readings from south service into csv files and compresses them when limit set per file is exceeded. The files are collected in date-wise directories where a single directory contains all the files collected on that day. The directories are rotated when the limit set is exceeded. We may collect data continuously, periodically or using a CRON string.

config1

  • ‘inputAssets’: type: string default: ‘’:

    The names of assets (comma separated) to be stored in the csv file. All the data points of these assets will become columns in the csv file. Other assets that are not included will be forwarded. If empty all asset names will be taken.

  • ‘forwardData’: type: boolean default: false:

    Forces data to be forwarded upstream, normally data is written to csv and not sent to storage.

  • ‘destDir’: type: string default: ‘FOGLAMP_DATA/readings-out’:

    Destination directory inside $FOGLAMP_DATA e.g. /usr/local/foglamp/data/readings-out if FOGLAMP_DATA/ is prefixed otherwise it will be created as specified. Default is FOGLAMP_DATA/readings-out. If the path given without FOGLAMP_DATA/ then directory will be created inside $FOGLAMP_ROOT/services/<path>, path can be recursive.

  • ‘filterName’(Subdirectory name): type: string default: ‘south-storage’:

    Name of the specific sub-directory under the “dest dir” where records are to be written. Useful when we have multiple instances of csv writer filter. Just for convention use the name source-destination to indicate that the filter is applied between source and destination. For example use filterName rms-database if the filter is applied between a rms filter and sqlite database.

  • ‘fileType’: type: string default: ‘csv’:

    The file type (csv or pickle) in order to store readings.

  • ‘samplingRate’: type: integer default: ‘8000’:

    The number of readings per second to be stored in the csv/pickle file.

  • ‘cronMode’: type: enumeration default: continuous:

    Cron style, either periodic, continuous, or table. In continuous mode files are continuously, in periodic mode the files are collected at every given interval of time (configurable). In table mode the files are collected according to a cron string similar to cron in Unix environments.

  • ‘cronPeriodStart’: type: string default: ‘’:

    The time at which the collection should start. If empty the collection will start immediately after the first reading. If a time stamp (a sample time stamp could be 2021-04-27 09:25:35.300875+00:00) is given then the plugin will start collection when the timestamp (will use user_ts of reading, if no user_ts then ts) of a reading that has arrived becomes greater than this value. Note cronPeriodStart is useful for periodic mode and won’t be used when cronMode is table. For periodic mode we can give cronPeriodStart either in the past or in the future.

  • ‘cronPeriodDuration’: type: string default: 100ms:

    Amount of time records are written per file (in ms, sec[s], min[s], hr[s], day[s], week[s]). For example if cronPeriodDuration is 3 mins and sample rate is 8000, Then each file will have 3*60*8000=1440000 records. Note: It won’t be used when cronMode is table. The limit will be picked from cron string. It also won’t be used for periodic mode.

  • ‘eventRepetitionTime’: type: string default: 16min:

    Only for periodic mode. The time after which the collection starts again. (in ms, sec[s], min[s], hr[s], day[s], week[s]).

  • ‘eventPreTime’: type: string default: 1min:

    Only for periodic mode. The amount of time to consider for collection before the desired event [eg., 1min].

  • ‘eventDuration’: type: string default: 1min:

    Only for periodic mode. The duration for desired event [eg., 1min].

config2

  • ‘eventPostTime’: type: string default: 1min:

    Only for periodic mode. The amount of time to consider for collection after the desired event [eg., 1min]. Note the cronPeriodDuration for periodic mode is the sum eventPreTime, eventDuration and eventPostTime.

  • ‘rotateAfter’: type: string default: 10min:

    Total time after which rotation will occur. (eg., 4wks). For example if rotateAfter is 7 days. Then on 12:00:00 AM of ninth day then the first directory will be deleted.

  • ‘cronTabSpec’: type: string default: ‘’:

    This parameter controls the time at which collection takes place.

    Note

    It is only used when cronMode is table. It is a string which consists of seven parts separated by spaces. It takes the form ‘seconds(0-59) minute(0-59) hour(0-23) day-of-month (1-31) month(1-12/names) day-of-week(0-7 or names) duration(seconds[float]’

    Examples:

    1. use string ‘0 0,15,30,45 * * * * 60’ if you want to collect at one minute worth of data zeroth, fifteenth, thirtieth, forty fifth minute of every hour.

    2. use string ‘0,15,30,35 * * * * * 5’ if you want to collect at five seconds worth of data zeroth, fifteenth, thirtieth, forty fifth second of every minute.

  • ‘addTimestamp’: type: boolean default: false:

    Add a timestamp to each csv entry.

  • ‘’enableCompress’: type:boolean default: true:

    Compress files after they have reached their maximum size.

  • ‘compressionType’: enumeration [‘bzip2’, ‘gzip’, ‘7za’] default bzip2:

    Select compression type to be used when ‘enableCompress’ is true. if 7za is selected then the the files will get encrypted.

  • ‘encryptPw’: type:password default: ‘’:

    The password used to to encrypt files if 7za compression is selected.

  • ‘enable’: type: boolean default: ‘false’:

    Enable / Disable plugin operation.

Execution

Part 1: Get some south service running

For starting a south service use any of the following commands.

  1. Use csvplayback

    Assuming you have a csv file named vibration.csv inside FOGLAMP_ROOT/data/csv_data (Can give a pattern like vib. The plugin will search for all the files starting with vib and therefore find out the file named vibration.csv). The csv file has fixed number of columns per row. Also assuming the column names are present in the first line. The plugin will rename the file with suffix .tmp after playing. Here is the curl command for that.

     res=$(curl -sX POST http://localhost:8081/foglamp/service -d  @- << EOF | jq '.'
    {
     "name":"My_south",
     "type":"south",
     "plugin":"csvplayback",
     "enabled":false,
     "config": {
          "assetName":{"value":"My_csv_asset"},
          "csvDirName":{"value":"FOGLAMP_DATA/csv_data"},
          "csvFileName":{"value":"vib"},
          "headerMethod":{"value":"do_not_skip"},
          "variableCols":{"value":"false"},
          "columnMethod":{"value":"pick_from_file"},
          "rowIndexForColumnNames":{"value":"0"},
          "ingestMode":{"value":"burst"},
          "sampleRate":{"value":"8000"},
          "postProcessMethod":{"value":"rename"},
          "suffixName":{"value":".tmp"}
              }
     }
     EOF
     )
    
     echo $res
    
  2. Use dt9837 plugin

    Assuming you have connected accelerometers to the DAQ, run the following command. This command uses 4 channel data.

    curl -sX POST http://localhost:8081/foglamp/service -d '{"name": "My_south", "type": "south", "plugin": "dt9837", "enabled": "true", "config": {"range": {"value": "BiPolar 10 Volts"}, "lowChannel": {"value": "0"}, "highChannel": {"value": "3"}}}' |jq
    

Part 2: Add the filter & attach to service

curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_continuous","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"","addTimestamp":"true","filterName":"continuous" ,"cronMode":"continuous", "cronPeriodDuration":"5min","rotateAfter":"7days", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_continuous"]}' |jq

Modes

Periodic

The following command will collect data after every 16 minutes and will collect 3 minutes (pre + post + event duration) worth of data in every file. The data will get rotated after 14 + 1 = 15 days. The collection will start at 2021-07-05 11:00:00.000000+00:00 (subtract the pre time of 1 minutes). If this time is of the past the plugin will calculate the time accordingly. Note times are considered in utc. The plugin will convert the time zone to utc.

# assign start time to a variable.
start_time="2021-07-05 11:01:00.000000+00:00"
curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_periodic","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"","addTimestamp":"true","filterName":"periodic" , "cronPeriodStart":"'"$start_time"'" ,"cronMode":"periodic", "eventRepetitionTime":"16min","eventDuration":"1min","eventPreTime":"1min","eventPostTime":"1min" ,"rotateAfter":"14days", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_periodic"]}' |jq

Some sample files will be as follows:

foglamp@foglamp:~/usr/local/foglamp/data/readings-out/periodic/2021-07-05.d$ ls
    2021-07-05-11-00-00-0000.csv.bz2
    2021-07-05-11-16-00-0000.csv.bz2
    2021-07-05-11-32-00-0000.csv.bz2
    ..
    ..
    ..

Continuous

The following command collects files continuously. Each files has 5 minutes worth of data.

curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_continuous","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"","addTimestamp":"true","filterName":"continuous" ,"cronMode":"continuous", "cronPeriodDuration":"5min","rotateAfter":"7days", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_continuous"]}' |jq

Some sample files will be as follows:

foglamp@foglamp:~/usr/local/foglamp/data/readings-out/continuous/2021-05-07.d$ ls
2021-05-07-10-00-00-0000.csv.bz2
2021-05-07-10-05-00-0000.csv.bz2
2021-05-07-10-10-00-0000.csv.bz2
..
..
..

Cron style collection

The following command collects 5 minutes of data in every two hours.

curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_discontinuous","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"0 0 0,2,4,6,8,10,12,14,16,18,20,22 * * * 300","addTimestamp":"true","filterName":"discontinuous" ,"cronMode":"table","rotateAfter":"4weeks", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_discontinuous"]}' |jq

The sample files will be like

foglamp@foglamp:~/usr/local/foglamp/data/readings-out/discontinuous/2021-05-07.d$ ls
2021-05-07-10-00-00-0000.csv.bz2
2021-05-07-12-00-00-0000.csv.bz2
2021-05-07-14-00-00-0000.csv.bz2
..
..
..

Cascading CSV writer filter

We can apply multiple instances of csv writer filter. Let’s say we want to apply three filters. Then we need to keep forwardData of first two filters to be true. The third filter’s forwardData may or may not be true.

Consider the following example

# continuous
curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_continuous","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"","addTimestamp":"true","filterName":"continuous" ,"cronMode":"continuous", "cronPeriodDuration":"5m","rotateAfter":"7days", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_continuous"]}' |jq

# discontinuous
curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_discontinuous","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"0 0 0,2,4,6,8,10,12,14,16,18,20,22 * * * 300","addTimestamp":"true","filterName":"discontinuous" ,"cronMode":"table","rotateAfter":"4weeks", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_discontinuous"]}' |jq

# periodic
# assigning the start time to a variable.
start_time="2021-07-05 11:01:00.000000+00:00"
curl -sX POST http://localhost:8081/foglamp/filter -d '{"name":"csv_writer_periodic","plugin":"csv_writer","filter_config":{"samplingRate":"8000","enable":"true","enableCompress":"true","cronTabSpec":"","addTimestamp":"true","filterName":"periodic" , "cronPeriodStart":"'"$start_time"'" ,"cronMode":"periodic", "eventRepetitionTime":"16min","eventDuration":"1min","eventPreTime":"1min","eventPostTime":"1min" ,"rotateAfter":"14days", "forwardData":"true"}}' |jq
curl -sX PUT 'http://localhost:8081/foglamp/filter/My_south/pipeline?allow_duplicates=true&append_filter=true' -d '{"pipeline":["csv_writer_periodic"]}' |jq

If forwardData of first filter is false then only the first filter will collect data.

If forwardData of first filter is true and second filter is false only first and second filter will collect data.

The forwardData of third filter may or may not be true. It is advisable to switch it off to prevent ingesting into database.

The following table sums it up.

Two CSV filters cascaded together

Filter 1 forwardData

Filter 2 forwardData

Behaviour

True

True

Both filters collect data and data is ingested into database.

True

False

Both filters collect data and data is NOT ingested into database.

False

True

Only filter 1 collects data and data is NOT ingested into database.

False

False

Only filter 1 collects data and data is NOT ingested into database.

Behaviour on restart and reconfigure

After restart the collection will resume normally which means collection will begin in the same directory as it was earlier. However it may happen that the plugin was writing a file and the file is uncompressed. This uncompressed file will be compressed when the plugin will restart.

Note that the plugin won’t wait for compression as it would be offloaded to some other thread for compression.

If this is a csv file and is empty it will be deleted.

It may also happen the directory name is changed inside configuration of the plugin. Then collection will begin inside different directory without deleting existing files.

On reconfigure the plugin will behave similar to restart.

How data is rotated?

The plugin picks rotateAfter config parameter and converts into days. Since each day collection has got its own directory therefore when the number of directories exceed this number the first directory will get deleted and so on. (Assuming we already had transferred these files to somewhere else before rotation.)

Note

If rotateAfter is 1week, then limit calculated will be 8. (We are talking one more day to compensate the case when collection was started at let’s say at 2 PM on first day. Had we taken 7 days then this is actually 6 days data.) Now at 12:00:00 AM at the ninth day the first directory will get deleted and so on.

Decryption

If you had selected 7z for compression then you will obtain encrypted files.

Use the following command to decrypt the file.

7za x -p<password> <file_name>

# example 7za x -ppassword123 vibration.7z
# assuming password123 is password and file name is vibration.7z.

For bzip2 and gzip compression use -d flag to uncompress the file.

bzip2 -d <file_name>
gzip -d <file_name>