MQTT Cookbook: Round-trip Time

Most MQTT brokers provide statistics on what they are doing (number of messages sent etc.) but do not provide meaningful information about how well the broker and client are performing. To gather that sort of information you need to consider the entire chain between the client and the broker. Although there may be several different ways to approach this, the simplest is to measure the round-trip time or RTT. In the case of MQTT that means measuring how long it takes from the publication of a message to its distribution to subscribers.

Thankfully the loose coupling between publishers and subscribers in MQTT allows the same client to be both. This allows us to measure the entire chain in a few lines of code.

import paho.mqtt.client as mqtt
from time import time, sleep
import uuid

INTERVAL = 1
QOS = 0


def on_connect(client, userdata, flags, rc):
    client.subscribe(topic)
    client.publish(topic, time(), qos=QOS)


def on_message(client, userdata, message):
    msg = message.payload.decode('utf-8')
    rtt = time() - float(msg)
    rtt_array.append(rtt)
    rtt_max = max(rtt_array)
    rtt_average = sum(rtt_array) / len(rtt_array)
    rtt_min = min(rtt_array)
    print('Current: %s' % rtt)
    print('Maximum: %s' % rtt_max)
    print('Average: %s' % rtt_average)
    print('Minimum: %s' % rtt_min)
    sleep(INTERVAL)
    client.publish(topic, time(), qos=QOS)


def on_log(client, userdata, level, buf):
    print(level, buf)


rtt_array = []
topic = str(uuid.uuid4())
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.on_log = on_log
client.connect("test.mosquitto.org")
client.loop_forever()

 

The script might seem a bit odd unless you understand how the call backs are structured in MQTT. The program flow goes something like this:

  1. The client connects to the broker triggering on_connect
  2. The client subscribes to the randomly determined topic
  3. The client publishes to the randomly determined topic
  4. The reception of the message that the client published triggers the on_message callback.
  5. The payload of the message is the time the message was sent which is then compared to the current time to get the RTT
  6. The RTT is added to the list rtt_array and used to calculate average, maximum, and minimum.
  7. After sleeping for INTERVAL seconds, a new message is published to the the topic returning the script to step 3.

As this script runs indefinitely you will have to break it manually by hitting CTRL+C. Also, the preset interval is pretty short (1 second), you will likely want to extend that to 10 or 60 if you are running this for more than a short burst.

Right now the script is configured to connect to the test.mosquitto.org server but it can easily be adapted to connect to your local broker.

Another interesting use of this script is that it makes it simple to test the performance consequences of using higher QoS settings. I notice around a 50% performance hit at QoS 2.

That’s pretty much it. If anyone has suggestions about how to improve it, tell me about them in the comments.

Advertisements

MQTT Cookbook: Thingspeak to MQTT

One of the most common problems in IoT is the need to bridge between two different infrastructures. It is a long story, but I ran into a situation where I needed to bridge a pre-existing ESP8266 connected to Thingspeak with my broader MQTT network. Thankfully, hacking together a quick bridge between the two services proved fairly easy.

import requests
import paho.mqtt.client as mqtt

CHANNEL =
API = ""

def get_thingspeak_field(channel, api, field):
    _url = 'https://api.thingspeak.com/channels/%s/fields/%s.json?api_key=%s&results=1' % (channel, field, api)
    _r = requests.get(_url).json()
    return _r['feeds'][0]['field%s' % field]


def on_connect(client, userdata, flags, rc):
    client.publish('thingspeak/field1', field1)
    client.publish('thingspeak/field2', field2)
    client.disconnect()


def on_log(client, userdata, level, buf):
    print(level, buf)


field1 = get_thingspeak_field(CHANNEL, API, 1)
field2 = get_thingspeak_field(CHANNEL, API, 2)
client = mqtt.Client()
client.on_connect = on_connect
client.on_log = on_log
client.connect("test.mosquitto.org", 1883, 60)
client.loop_forever()

 

The code is fairly self-explanatory but lets walk through it. There is a Thingspeak library for Python but it is actually easier to just import requests and make a standard API request. To facilitate this I wrote a simple function which takes the channel, api key, and the field that you want to read. After reading the fields it connects to an MQTT broker–currently the Mosquitto test broker–and publishes the fields.

Simple as that! Now you can hook your Thingspeak and MQTT infrastructures together in any way you want.