Device Telemetry
Device telemetry is the most common way to send data from the device to the server. This is the hot path and is used to send data that does not require a reply. This type of data is of timeseries as each datapoint sent is attached to a timestamp of different precision (you choose on your needs). The Mir telemetry module will ingest and store it in InfluxDB:
InfluxDB is a time series database designed to handle high write and query loads. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics.
Editing the Schema
First, lets define a telemetry message in your schema:
message Env {
option (mir.device.v1.message_type) = MESSAGE_TYPE_TELEMETRY;
int64 ts = 1 [(mir.device.v1.timestamp) = TIMESTAMP_TYPE_NANO];
int32 temperature = 2;
int32 pressure = 3;
int32 humidity = 4;
}
Here we define a message Env that will be used. The options are used to annotate the message with metadata:
mir.device.v1.message_type: This tell the server that this message is of telemetry type.mir.device.v1.timestamp: This tell the server that the fieldtsis the main timestamp and the precision is NANOSECONDS. Second, Microsecond and Millisecond are also available.
Lets regenerate the schema:
just proto
#or
make proto
Send Telemetry to Mir
Let's create a function that send telemetry data to the server every 3 seconds.
To do so, we use the m.SendTelemetry function that take any proto message:
package main
import (
"context"
"math/rand/v2"
"mir-device/schemav1"
"os"
"os/signal"
"syscall"
"time"
"github.com/maxthom/mir/pkgs/device/mir"
schemav1 "github.com/maxthom/mir.device.buff/proto/gen/schema/v1"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
m, err := mir.Builder().
DeviceId("weather").
Target("nats://127.0.0.1:4222").
LogPretty(false).
Schema(schemav1.File_schema_proto).
Build()
if err != nil {
panic(err)
}
wg, err := m.Launch(ctx)
if err != nil {
panic(err)
}
dataRate := 3
// Start go routine for not to block main thread
go func() {
for {
select {
case <-ctx.Done():
// If context get cancelled, stop sending telemetry and
// decrease the wait group for graceful shutdown
wg.Done()
return
case <-time.After(time.Duration(dataRate) * time.Second):
if err := m.SendTelemetry(&schemav1.Env{
Ts: time.Now().UTC().UnixNano(),
Temperature: rand.Int32N(101),
Pressure: rand.Int32N(101),
Humidity: rand.Int32N(101),
}); err != nil {
m.Logger().Error().Err(err).Msg("error sending telemetry")
}
}
}
}()
osSignal := make(chan os.Signal, 1)
signal.Notify(osSignal, syscall.SIGINT, syscall.SIGTERM)
<-osSignal
cancel()
wg.Wait()
}
Run the project:
just run
#or
make run
Visualize the Data
Just like that, we now have telemetry that is stored server side
mir dev tlm list weather
1. weather/default
schema.v1.Env{} localhost:3000/explore
Click on the link to open Grafana and visualize the data. The default user/password is:
user: admin
password: mir-operator
You can also see the data in InfluxDB:
localhost:8086
user: admin
password: mir-operator
Voila! You have successfully sent telemetry data to the server. Add more message to the schema and send more data! Use the CLI to quickly get link to the telemetry data in Grafana and use the generated query to create powerful dashboard.
! Note: All protobuf definition are supported except OneOf
Mir