Real-time Data Agent¶
The Real-time Data Agent provides a unified and real-time way to read and write data from and to a shared memory and a demo program to parse, pack, and send data to a container stack for storage and visualization.
It includes the following three major parts:
Block-based lockless shared memory ring buffer library
Python3 example program
Example time series database container stack
Real-time Data Agent supports both Xenomai kernel, Preempt RT kernel.
Install Real-time Data Agent¶
You can install this component from the ECI APT repository. Setup the ECI APT repository, then perform either of the following commands to install this component:
- Install from meta-package
$ sudo apt install eci-realtime-data-agent
- Install from individual Deb packages
# For non-Xenomai kernels $ sudo apt install rt-data-agent libshmringbuf # For Xenomai kernels $ sudo apt install rt-data-agent libshmringbuf-xenomai
Install Real-time Data Agent Development Headers¶
This component also provides header files for development use. Setup the ECI APT repository, then perform the following command to install this component:
- Install from individual Deb packages
# For non-Xenomai kernels $ sudo apt install libshmringbuf-dev # For Xenomai kernels $ sudo apt install libshmringbuf-xenomai-dev
How to Use¶
The following section is applicable to:

In Intel® Edge Controls for Industrial (Intel® ECI or ECI) installation, the library is in /usr/lib/libshmringbuf.so
and the header file is in /usr/include/shmringbuf.h
.
Generic usage
Call
shm_blkbuf_init
to initialize a named ring buffer with provided maximum block size and block number and return a handle.Use that handle or call
shm_blkbuf_open
with the name to get the handle.Use
shm_blkbuf_write
to write data to the ring buffer, only one writer is accepted for one opened ring buffer handle.Use
shm_blkbuf_read
to read data from the ring buffer, only one reader is accepted for one opened ring buffer handle.Use
shm_blkbuf_empty
andshm_blkbuf_full
to check the status of the ring buffer.
Sample code
int main(int argc, char **argv)
{
int i, len;
uint8_t data[1200];
shm_handle_t handle;
handle = shm_blkbuf_init("shm_test", 10, 1024);
for (i=0; i<5; i++)
{
len = shm_blkbuf_write(handle, data, 800);
printf("len written %d\n", len);
}
shm_dump(handle);
for (i=0; i<6; i++)
{
len = shm_blkbuf_read(handle, data, 1100);
printf("len read %d\n", len);
}
shm_dump(handle);
for (i=0; i<12; i++)
{
len = shm_blkbuf_write(handle, data, 600);
printf("len written %d\n", len);
}
shm_dump(handle);
for (i=0; i<7; i++)
{
len = shm_blkbuf_read(handle, data, 350);
printf("len read %d\n", len);
}
shm_dump(handle);
shm_blkbuf_close(handle);
}
Demonstration: Agent Program to Process Real-time Data¶
The following section is applicable to:

In the ECI installation, the example demonstration is in /opt/rt-data-agent/agent/
. The program entry is the agent.py
.
python3 agent.py --host <mqtt broker ip> --port <mqtt port> --qos <mqtt qos> --lib < shared memory library path>
<mqtt broker ip>
: Default islocalhost
<mqtt port>
: Default is 1883<mqtt qos>
: Default is 1<shared memory library path>
: Default is/usr/lib/libshmringbuf.so.0
By default, the following three tasks start:
ShmTask
: Wraps thelibshmringbuf.so
in Python3 by ctypes, continuously reads data from the shared memory (usually data written by another real-time thread), and then places in a predefined data queue.DataFmtTask
: Reads data from the data queue, parses it (based on providedconfig.json
) and re-packs it in MQTT format, and then places in a predefined MQTT queue.MqttPubTask
: Reads data from MQTT queue, and then publishes it to MQTT broker.
Example: Time Series Database Container Stack¶
The following section is applicable to:

In the ECI installation, there is an example time series database container stack in /opt/rt-data-agent/stack
. It includes the following four major container images:
mqtt broker
: There is a default MATT broker running in the ECI image,http://localhost:1883
. If the stack is not running in the ECI image, start the MQTT broker using/opt/rt-data-agent/stack/mqtt-broker.yml
.telegraf
: Configuration is in/opt/rt-data-agent/stack/telegraf/
. It sets the input plugin as the MQTT broker and output plugin as the InfluxDB database.influxdb
: The InfluxDB database, recording the time series data.grafana
: Visualizes the time series data from the InfluxDB database. The example configuration or dashboard is in/opt/rt-data-agent/stack/grafana
.
Run Example Simulation¶
The following section is applicable to:

The ECI installation includes a simulation tool (/opt/rt-data-agent/sim/sim_rt_send
) to send data (as per the format described in config.json
) to a shared memory block ring buffer (name shm_test
).
In real scenario, replace the simulation tool with the actual data generator, in which EtherCat will read the data, and then send the data to a shared memory with the Shared memory ring buffer library
.
Then, start the agent to read the data from shm_test
shared memory block ring buffer, and parse, pack, send the MQTT format data to the MQTT broker.
The container stack starts to receive the data from the MQTT broker, saves to InfluxDB database, which could be visualized by Grafana.
Do the following:
In one terminal, start the simulation tool to send data in 10ms interval.
$ cd /opt/rt-data-agent/sim $ sudo ./sim_rt_send -i 10
In another terminal, start the container stack.
$ cd /opt/rt-data-agent/stack $ docker-compose up
In the third terminal, start the agent.
$ cd /opt/rt-data-agent/agent $ sudo python3 agent.py
If there are no issues, visualize the data in
http://localhost:3000
.