Real-time Data Agent

The Real-time Data Agent provides a unified and real-time way to read and write data from and to a shared memory and a demo program to parse, pack, and send data to a container stack for storage and visualization.

It includes the following three major parts:

  • Block-based lockless shared memory ring buffer library

  • Python3 example program

  • Example time series database container stack

Real-time Data Agent supports both Xenomai kernel, Preempt RT kernel.

Main Structure and Data Flow

The following figure illustrates the structure and data flow.

../_images/structure.png

Shared Memory Ring Buffer Library

This library implements unified and real-time APIs to read and write data from and to a shared memory without requiring a lock.

Design Principle

  • Block based ring buffer, fixed maximum block size on initialization

  • Block number set on initialization

  • No lock required, one producer (write), one consumer (read) for one named ring buffer

  • Buffer empty: write index == read index

  • Buffer full: write index + 1 == read index

  • Each read/write never across block boundary

Example:

The following figure shows an example with eight blocks ring buffer states.

../_images/ringbuf.png

Install Real-time Data Agent

  1. Real-time Data Agent requires the Docker Engine. Install Docker if not already done.

  2. Install the Docker Compose plugin on the target system.

  3. Setup the ECI repository, then perform either of the following commands to install this component:

    Install from meta-package
    $ sudo apt install eci-realtime-data-agent
    
    Install from individual Deb packages
    # For non-Xenomai kernels
    $ sudo apt install rt-data-agent libshmringbuf
    
    # For Xenomai kernels
    $ sudo apt install rt-data-agent libshmringbuf-xenomai
    

Install Real-time Data Agent Development Headers

This component also provides header files for development use. Setup the ECI repository, then perform the following command to install this component:

Install from individual Deb packages
# For non-Xenomai kernels
$ sudo apt install libshmringbuf-dev

# For Xenomai kernels
$ sudo apt install libshmringbuf-xenomai-dev

How to Use

The following section is applicable to:

../_images/target_generic.png

In Intel® Edge Controls for Industrial (Intel® ECI or ECI) installation, the library is in /usr/lib/libshmringbuf.so and the header file is in /usr/include/shmringbuf.h.

Generic usage

  • Call shm_blkbuf_init to initialize a named ring buffer with provided maximum block size and block number and return a handle.

  • Use that handle or call shm_blkbuf_open with the name to get the handle.

  • Use shm_blkbuf_write to write data to the ring buffer, only one writer is accepted for one opened ring buffer handle.

  • Use shm_blkbuf_read to read data from the ring buffer, only one reader is accepted for one opened ring buffer handle.

  • Use shm_blkbuf_empty and shm_blkbuf_full to check the status of the ring buffer.

Sample code

#include <stdio.h>
#include <stdint.h>
#include "shmringbuf.h"

int main(int argc, char **argv)
{
    int i, len;
    uint8_t data[1200];
    shm_handle_t handle;

    handle = shm_blkbuf_init("shm_test", 10, 1024);

    for (i=0; i<5; i++)
    {
        len = shm_blkbuf_write(handle, data, 800);
        printf("len written %d\n", len);
    }

    shm_dump(handle);

    for (i=0; i<6; i++)
    {
        len = shm_blkbuf_read(handle, data, 1100);
        printf("len read %d\n", len);
    }

    shm_dump(handle);

    for (i=0; i<12; i++)
    {
        len = shm_blkbuf_write(handle, data, 600);
        printf("len written %d\n", len);
    }

    shm_dump(handle);

    for (i=0; i<7; i++)
    {
        len = shm_blkbuf_read(handle, data, 350);
        printf("len read %d\n", len);
    }

    shm_dump(handle);
    shm_blkbuf_close(handle);
}

Compile the code like so:

$ gcc example.c -lshmringbuf -o example

Run the example application like so:

$ ./example

The expected output should be:

Click to view the output of the example application

$ ./example
len written 800
len written 800
len written 800
len written 800
len written 800
share memory name shm_test
share memory size 11800
share memory blocks 11
share memory block size 1024
p_ring->rd 0
p_ring->wr 5
blocks 0: read 0, len 800
blocks 1: read 0, len 800
blocks 2: read 0, len 800
blocks 3: read 0, len 800
blocks 4: read 0, len 800
blocks 5: read 0, len 0
blocks 6: read 0, len 0
blocks 7: read 0, len 0
blocks 8: read 0, len 0
blocks 9: read 0, len 0
blocks 10: read 0, len 0
len read 800
len read 800
len read 800
len read 800
len read 800
len read 0
share memory name shm_test
share memory size 11800
share memory blocks 11
share memory block size 1024
p_ring->rd 5
p_ring->wr 5
blocks 0: read 0, len 800
blocks 1: read 0, len 800
blocks 2: read 0, len 800
blocks 3: read 0, len 800
blocks 4: read 0, len 800
blocks 5: read 0, len 0
blocks 6: read 0, len 0
blocks 7: read 0, len 0
blocks 8: read 0, len 0
blocks 9: read 0, len 0
blocks 10: read 0, len 0
len written 600
len written 600
len written 600
len written 600
len written 600
len written 600
len written 600
len written 600
len written 600
len written 600
len written 0
len written 0
share memory name shm_test
share memory size 11800
share memory blocks 11
share memory block size 1024
p_ring->rd 5
p_ring->wr 4
blocks 0: read 0, len 600
blocks 1: read 0, len 600
blocks 2: read 0, len 600
blocks 3: read 0, len 600
blocks 4: read 0, len 800
blocks 5: read 0, len 600
blocks 6: read 0, len 600
blocks 7: read 0, len 600
blocks 8: read 0, len 600
blocks 9: read 0, len 600
blocks 10: read 0, len 600
len read 350
len read 250
len read 350
len read 250
len read 350
len read 250
len read 350
share memory name shm_test
share memory size 11800
share memory blocks 11
share memory block size 1024
p_ring->rd 8
p_ring->wr 4
blocks 0: read 0, len 600
blocks 1: read 0, len 600
blocks 2: read 0, len 600
blocks 3: read 0, len 600
blocks 4: read 0, len 800
blocks 5: read 0, len 600
blocks 6: read 0, len 600
blocks 7: read 0, len 600
blocks 8: read 350, len 600
blocks 9: read 0, len 600
blocks 10: read 0, len 600

Demonstration: Agent Program to Process Real-time Data

The following section is applicable to:

../_images/target_generic.png

In the ECI installation, the example demonstration is in /opt/rt-data-agent/agent/. The program entry is the agent.py.

$ python3 agent.py --host <mqtt broker ip> --port <mqtt port> --qos <mqtt qos> --lib < shared memory library path>
  • <mqtt broker ip>: Default is localhost

  • <mqtt port>: Default is 1883

  • <mqtt qos>: Default is 1

  • <shared memory library path>: Default is /usr/lib/libshmringbuf.so.0

By default, the following three tasks start:

  • ShmTask: Wraps the libshmringbuf.so in Python3 by ctypes, continuously reads data from the shared memory (usually data written by another real-time thread), and then places in a predefined data queue.

  • DataFmtTask: Reads data from the data queue, parses it (based on provided config.json) and re-packs it in MQTT format, and then places in a predefined MQTT queue.

  • MqttPubTask: Reads data from MQTT queue, and then publishes it to MQTT broker.

Example: Time Series Database Container Stack

The following section is applicable to:

../_images/target_generic.png

In the ECI installation, there is an example time series database container stack in /opt/rt-data-agent/stack. It includes the following four major container images:

  • mqtt broker: There is a default MATT broker running in the ECI image, http://localhost:1883. If the stack is not running in the ECI image, start the MQTT broker using /opt/rt-data-agent/stack/mqtt-broker.yml.

  • telegraf: Configuration is in /opt/rt-data-agent/stack/telegraf/. It sets the input plugin as the MQTT broker and output plugin as the InfluxDB database.

  • influxdb: The InfluxDB database, recording the time series data.

  • grafana: Visualizes the time series data from the InfluxDB database. The example configuration or dashboard is in /opt/rt-data-agent/stack/grafana.

Run Example Simulation

The following section is applicable to:

../_images/target_generic.png

The ECI installation includes a simulation tool (/opt/rt-data-agent/sim/sim_rt_send) to send data (as per the format described in config.json) to a shared memory block ring buffer (name shm_test).

In real scenario, replace the simulation tool with the actual data generator, in which EtherCat will read the data, and then send the data to a shared memory with the Shared memory ring buffer library.

Then, start the agent to read the data from shm_test shared memory block ring buffer, and parse, pack, send the MQTT format data to the MQTT broker. The container stack starts to receive the data from the MQTT broker, saves to InfluxDB database, which could be visualized by Grafana.

Do the following:

  1. In one terminal, start the simulation tool to send data in 10ms interval.

    $ cd /opt/rt-data-agent/sim
    $ sudo ./sim_rt_send -i 10
    
  2. In another terminal, start the container stack.

    $ cd /opt/rt-data-agent/stack
    $ docker compose up
    
  3. In the third terminal, start the agent.

    $ cd /opt/rt-data-agent/agent
    $ sudo python3 agent.py
    
  4. If there are no issues, visualize the data in http://localhost:3000.