MQTT Cookbook: Logging Activity

It’s been a while since I actually posted here. Between finishing my Ph.D., the frustrations of the academic job market, and working on my book, I haven’t had nearly as much time for hobbies. But I have recently been learning about MQTT and have decided to post some simple “recipe” style Python scripts that would have helped me when I was just starting out.

The first script was inspired by the sudden upsurge of questions on Stack Exchange (such as this one and this one) which deal with logging activity on an MQTT topic. Since Mosquitto (and most other brokers) do not include a database to store messages, you have to implement logging on top of MQTT.

The simplest way to do this is simply to print the output of a topic to a text file. For testing purposes I am using the Mosquitto test broker and a few of the server’s internal metrics.

import paho.mqtt.client as mqtt
from time import time

def on_connect(client, userdata, flags, rc):
    print(flags, rc)
    client.subscribe('$SYS/broker/load/messages/received/1min')
    client.subscribe('$SYS/broker/load/messages/sent/1min')
    client.subscribe('$SYS/broker/load/bytes/received/1min')
    client.subscribe('$SYS/broker/load/bytes/sent/1min')

def on_message(client, userdata, message):
    timestamp = time()
    topic = message.topic
    payload = message.payload.decode('utf-8')
    with open('log.txt', 'a+') as f:
        f.write('%s %s %s\n' % (timestamp, topic, payload))


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("test.mosquitto.org", 1883, 60)
client.loop_forever()

 

This Python 3 script is pretty self-explanatory but it illustrates the basic process. The important function is def on_message which is triggered whenever the client.on_message callback is triggered (which happens when a message is received). It proceeds to decode the payload and print it to a new line in the log file.

Sending the output of a topic to a file like this works if you are dealing with a few messages but it will become unwieldy if you need to search through thousands. The most common idea that people have is to use a MySQL database but that seems like overkill to me. To deal with a logging problem I had in my home lab instead I decided to use tinyDB–a JSON-based, minimal database. This simplifies implementation, portability, and maintenance while allowing for fairly complex queries.

import paho.mqtt.client as mqtt
from time import time
from tinydb import TinyDB


def on_connect(client, userdata, flags, rc):
    print(flags, rc)
    client.subscribe('$SYS/broker/load/messages/received/1min')
    client.subscribe('$SYS/broker/load/messages/sent/1min')
    client.subscribe('$SYS/broker/load/bytes/received/1min')
    client.subscribe('$SYS/broker/load/bytes/sent/1min')


def on_message(client, userdata, message):
    timestamp = time()
    topic = message.topic
    payload = message.payload.decode('utf-8')
    entry = {'time':timestamp, 'topic':topic, 'payload':payload}
    print(entry)
    db = TinyDB('db.json')
    db.insert(entry)


client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("test.mosquitto.org", 1883, 60)
client.loop_forever()

 

Here instead of simply outputting to a file, this script adds a database object to db.json which contains the timestamp, topic, and payload of each message on a subscribed topic. You can then query the database through simple Python. For example here is a script that will print all messages in the database which have been received in the last day.

from tinydb import TinyDB, Query
from time import time

one_day = time() - (24 * 60 * 60)
print(one_day)
db = TinyDB('db.json')
qu = Query()
results = db.search(qu.time > one_day)
for x in results:
    print('%s %s %s' % (x['time'], x['topic'], x['payload']))

 

The code is fairly self-explanatory, but basically after loading the database db.search loads all of the matching records into results. Then the for loop iterates through the results. In this case I set it up so that all records after the current time minus one day are retrieved.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s