Telegraf e MQTT per lo storage di dati

Nel precedente post sullo stack TICK (Telegraf, InfluxDB, Chronograf, Kapacitor) ci siamo lasciati con la promessa di vedere all’opera Telegraf e uno dei suoi plugin per l’acquisizione di dati da una sorgente IoT.

In questi giorni ho ripreso il progetto relativo alla Weather Station con Arduino per cui vediamo di mettere insieme tutti i pezzi del puzzle e tirar fuori qualcosa di utile.

Facciamo un piccolo riepilogo:

  • lo stack TICK, limitandoci per ora a due sole componenti, ci mette a disposizione il database per time series InfluxDB e Telegraf, ossia il collector che utilizzeremo per acquisire i dati dalla stazione meteo
  • la Weather Station pubblica i dati letti dai sensori su un broker MQTT rappresentato nell’esempio proposto da un container Docker basato su un’immagine di mosquitto.

Non sto ad elencare tutti i vantaggi di Docker ma mi limito a dire che nel momento in cui dovessimo decidere di cancellare tutto ci basterà fermare e rimuovere i container, cancellare le immagini scaricate senza che il nostro sistema abbia subito alcuna modifica.

Per utilizzare Telegraf con MQTT dobbiamo abilitare uno specifico plugin all’interno della sezione “INPUTS” del file di configurazione telegraf.conf. A voler essere pignoli ho inserito il  codice nella sezione “SERVICE INPUTS” ma solo per una migliore leggibilità e per distinguerlo dagli input di default che Telegraf aggiunge per monitorare l’host su cui è in esecuzione. Questi ultimi possono comunque essere rimossi se ritenuti superflui.

Ecco il codice che ho lasciato nella sua forma più completa con tutte le opzioni che si possono abilitare rimuovendo il “#” dalla rispettiva riga.

[[inputs.mqtt_consumer]]
servers = ["192.168.1.50:1883"]
## MQTT QoS, must be 0, 1, or 2
qos = 0

## Topics to subscribe to
topics = [
"/weathershield/temperature",
"/weathershield/humidity",
"/weathershield/pressure"
]

# if true, messages that can't be delivered while the subscriber is offline
# will be delivered when it comes back (such as on service restart).
# NOTE: if true, client_id MUST be set
persistent_session = false
# If empty, a random client ID will be generated.
client_id = "weathershield"

## username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type="float"

In realtà la configurazione è estremamente semplice e sono richieste pochissime informazioni.

In primo luogo specifichiamo l’indirizzo del broker

servers = ["192.168.1.50:1883"]

Una piccola nota al riguardo: sia il broker che lo stack TICK girano sulla stessa macchina quindi sarebbe logico utilizzare “localhost:1883” oppure “127.0.0.1:1883” ma questo non funziona perchè siamo in un contesto Docker dove l’indirizzo di loopback fa riferimento al container stesso. Quindi Telegraf si aspetterebbe di trovare il broker nello stesso container in cui sta girando. Per tale motivo occorre specificare l’indirizzo dell’host su cui è in esecuzione mosquitto.

Proseguiamo elencando i topic MQTT di interesse

topics = [
"/weathershield/temperature",
"/weathershield/humidity",
"/weathershield/pressure"
]

e indichiamo un nome per il client

client_id = "weathershield"

In questo esempio non stiamo dando particolare importanza alla sicurezza, che invece non andrebbe assolutamente trascurata in un ambiente di produzione, per cui tralasciamo tutte le opzioni relative all’autenticazione sul broker e alla configurazione SSL.

Infine specifichiamo il formato dei dati da “consumare”

data_format = "value"
data_type="float"

In questo caso si tratta di stringhe che riportano il valore letto dai sensori (“value”) e che va interpretato come un numero decimale (“float”).
Sono possibili altri formati come json ma in questo caso occorre pubblicare i dati sul topic opportunamente formattati per evitare errori in fase di parsing.

A tal proposito suggerisco di abilitare il debug mode, sempre nel file telegraf.conf,

## Run telegraf in debug mode
debug = true

in modo da individuare facilmente eventuali errori di configurazione.
Telegraf può risultare talvolta criptico nel senso che il container può stopparsi in fase di avvio senza che si riesca a capirne la reale causa.

Siamo pronti per una prova sul campo.

Facciamo partire il broker mosquitto con

sudo docker run -d -p 1883:1883 -p 9001:9001 --name=mosquitto eclipse-mosquitto

mosquitto

Dopo esserci assicurati che la stazione meteo sia alimentata e connessa alla lan possiamo utilizzare un client MQTT come mqtt-spy (che oltre ad essere un ottimo tool raccoglie donazioni per l’Unicef) per verificare il flusso di dati.

mqtt-spy

E’ tutto OK!

Quindi diamo il via allo stack TICK, dopo esserci posizionati nella cartella contenente il file docker-compose.yaml, con

sudo docker-compose up -d

e come ulteriore check assicuriamoci che tutti i container siano attivi con

sudo docker-compose ps

tick-stack

Considerato che i dati vengono pubblicati dalla stazione ad intervalli di 5 secondi, lasciamo trascorrere un po’ di tempo e poi andiamo a sbirciare nel database InfluxDB utilizzando la CLI fornita di serie ed avviabile con

sudo docker-compose run influxdb-cli

influxdb-data

La precedente immagine racchiude i comandi da impartire per visualizzare i dati che sono memorizzati nel database telegraf sotto la measurement  mqtt_consumer (che in InfluxDB è l’equivalente di una tabella del modello relazionale).

Con una semplice query

select * from mqtt_consumer

recuperiamo tutti i dati archiviati nel database ed organizzati nel seguente modo:

  • time: timestamp aggiunto automaticamente da InfluxDB e che funge da primary key
  • host: identificatore dell’host utile per discriminare sorgenti di dati multiple
  • topic: topic MQTT
  • value: valore letto dal topic

Per i più esperti di InfluxDB host e topic sono due tag mentre value è un field.

Per ora è tutto. Vedremo in seguito come creare query più complesse e generare dei grafici per monitorare in tempo reale lo stato dei sensori, ma anche come analizzare le serie storiche dei dati.

PS. Per chi volesse fare dei test trova tutto l’occorrente sul mio repository GitHub. Non disponendo dell’hardware si può comunque ovviare simulando l’invio dei dati tramite un client come mqtt-spy. Basterà pubblicare i “fake values” sui topic specificati nel file di configurazione di Telegraf.

 

Ritieni che il post sia interessante? Se ti va puoi confermare le mie competenze o aggiungere una segnalazione sul mio profilo LinkedIn.

Lascia una risposta

L'indirizzo email non verrà pubblicato. I campi obbligatori sono contrassegnati *

Utilizzando il sito, accetti l'invio dei cookies da parte nostra. Maggiori informazioni

Questo sito utilizza i cookies per fornire la migliore esperienza di navigazione possibile. Continuando ad utilizzarlo senza modificare le impostazioni o cliccando su "Accetta" acconsenti al loro utilizzo.

Chiudi