Mastering Secure MQTT with Python
Do you need secure communication via MQTT? Yes, then this is the right article for you!
We show you how to implement an MQTT Publisher and Subscriber in Python. We will then establish a secure connection via an MQTT broker. If you want to know how to set up an MQTT broker with Docker, check out our article on this topic.
We recommend reading the article on MQTT Broker first. This article introduces some basics.
First, we will introduce the basic functionalities of MQTT. Then, we show you how to create an MQTT Publisher and Subscriber in Python. So be curious! The steps are the following:
- Recap
- Technical requirements
- Create an MQTT Publisher in Python
- Create an MQTT Subscriber in Python
- Conclusion
- Useful links
Recap
MQTT stands for Message Queuing Telemetry Transport. It is a messaging protocol for IoT devices with resource constraints. It is specialized for low bandwidth and high latency environments, making it an ideal protocol for machine-to-machine (M2M) communication.
MQTT works according to the publisher / subscriber principle with a central broker. This principle means that the sender and receiver have no direct connection. The senders publish the data on a topic, and all recipients who have subscribed to this topic receive the data.
Technical requirements
Prerequisites
You will need the following prerequisites:
- Installed Python (≥ 3.7)
- Installed conda and pip
- Access to a bash (macOS, Linux or Windows)
- Code editor of your choice (We use VSCode.)
The implementation was tested on macOS Ventura 13.2.1.
Setup
Enter the following in your terminal:
- Create a conda environment (env):
conda create -n mqtt-env python=3.9.12
-> Answer the question Proceed ([y]/n)? with y. - Activate the conda env:
conda activate mqtt-env
- Install the necessary libraries:
pip install pandas==1.5.3 paho-mqtt==1.6.1
Create an MQTT Publisher in Python
In this section, we implement the MQTT Publisher with Python.
First, we create an empty Python file and call it pub.py
. In this file, we start a menu. Insert the following code into the file.
from menu import Menu
menu = Menu()
menu.start_menu()
Then we create the file menu.py
. In this file, we implement the menu.
from simulator import Simulator
class Menu:
def start_menu(self):
value = True
while value:
try:
print("### Simulator ###")
print("(1) Simulate data")
print("(0) Exit")
print("> ", end="")
choice = int(input())
if choice == 0:
value = False
if choice == 1:
Simulator.simulate_data("data")
except ValueError:
print("Wrong input!!!")
You can start the data simulation under menu item 1. In the next step, we create the simulator. Create a file called simulator.py
and insert the following code:
import json
from os import path
import time
import pandas as pd
from mqtt_publisher import Publisher
class Simulator:
@staticmethod
def simulate_data(file_name):
# read data
data = pd.read_csv(path.join(file_name + ".csv"), parse_dates=["Date"])
# MQTT publisher
mqtt_publisher = Publisher()
while True:
for rows in range(len(data.iloc[:,1])):
for cols in range(len(data.iloc[1, 2:])):
cols = cols + 2
sensor_value = str(data.iloc[rows, cols])
key_value = str(data.columns[cols])
mqtt_publisher.set_topic(key_value)
mqtt_msg = json.dumps({
"type": "sample data",
"cycle_counter": str(data.iloc[rows, 0]),
"timestamp": str(data.iloc[rows, 1]),
"value": sensor_value
})
mqtt_publisher.publish_value(mqtt_msg)
time.sleep(1) # the cycle time is 1 second.
First, we import all the necessary libraries. Then we create the class Simulator
that contains the method simulate_data()
. The method reads sample data from a CSV file. Create a file with the name data.csv
and the following content (You can also use your own data.):
"ID","Date","Param1","Param2","Param3"
"0","2023-03-24 20:31:00","3.1","0.7","4.3"
"1","2023-03-24 20:31:01","3.2","0.6","4.3"
"2","2023-03-24 20:31:02","3.2","0.6","4.4"
"3","2023-03-24 20:31:03","3.2","0.6","4.4"
"4","2023-03-24 20:31:04","3.1","0.6","4.4"
"5","2023-03-24 20:31:05","3.1","0.6","4.3"
"6","2023-03-24 20:31:06","3","0.6","4.3"
"7","2023-03-24 20:31:07","3","0.6","4.3"
Next, we create an object of the Publisher class. In the while-loop, we publish each parameter individually to the MQTT broker. The function publish_value()
publishes the parameters as a JSON string to the MQTT broker.
In the next step, we look at the MQTT Publisher. Create a Python file with the name mqtt_publisher.py
and add the following:
import paho.mqtt.client as mqtt
import mqtt_constants
def on_connect(client, userdata, flags, rc):
"""
The callback for when the client receives a CONNACK response from the server.
"""
print("Client: " + str(client))
print("User data: " + str(userdata))
print("Flags: " + str(flags))
print("Connected with result code " + str(rc))
class Publisher:
"""
This class contains the attributes and functions for MQTT publisher.
For more information about the paho.mqtt.python library see:
https://github.com/eclipse/paho.mqtt.python
"""
username = mqtt_constants.USER_NAME
password = mqtt_constants.PASSWORD
ip_broker = mqtt_constants.IP_ADDRESS
port_broker = mqtt_constants.PORT_BROKER
keep_alive = mqtt_constants.KEEP_ALIVE
qos_level = mqtt_constants.QOS_LEVEL
topic = ""
client = mqtt.Client()
def __init__(self):
print("Broker-IP: " + str(self.ip_broker) + ", Broker-Port: " + str(self.port_broker))
if mqtt_constants.TLS_ON:
self.client.tls_set(ca_certs="./certs/ca.crt",
certfile="./certs/client.crt",
keyfile="./certs/client.key")
self.client.username_pw_set(self.username, self.password)
self.client.connect(self.ip_broker, self.port_broker, self.keep_alive) # mqtt broker
self.client.on_connect = on_connect
self.client.loop_start()
def __del__(self):
self.client.disconnect() # disconnect
self.client.loop_stop() # stop loop
def set_topic(self, topic):
"""
Setter method for the MQTT topic.
:param topic: This parameter contains the MQTT topic.
:return: None
"""
self.topic = topic
def publish_value(self, value):
"""
This function publishes the JSON string to MQTT broker.
:param value: This parameter contains the JSON string to be published.
:return: None
"""
print("Published JSON: " + str(value))
self.client.publish(self.topic, value, self.qos_level)
The on_connect()
function informs about the connection status. It is a callback for when the client receives a CONNACK response from the server. In the Publisher class, we set the connection data. The file mqtt_constants.py
contains the connection data. You still have to create this file. Then insert the following into the file.
"""
This file contains environment variables for the MQTT broker.
Further information:
No TLS: PORT_BROKER = 1883, TLS_ON = False
TLS: PORT_BROKER = 8883, TLS_ON = True
"""
USER_NAME = "mosquitto"
PASSWORD = "mosquitto"
IP_ADDRESS = "<ip> or <computer-name>"
PORT_BROKER = 8883
QOS_LEVEL = 2
KEEP_ALIVE = 8000
TLS_ON = True
The file contains the connection data, the QoS level and whether encryption should be used or not. If we set the variable TLS_ON
to True and the port to 8883, there is an encrypted connection between the Publisher and the MQTT Broker.
In the Publisher class, we then create a Client object. On this client object, we set the connection data in the constructor. Furthermore, we set the certificates for an encrypted connection. To do this, create a folder called certs
and place the three files there. We generated the certificate files in a previous article.
Then we define a setter method to set the MQTT topic and a publish method to publish the JSON string.
If you created everything correctly, your MQTT Publisher is ready to use. To start the MQTT Publisher, run the following command in the terminal:
$ python pub.py
For testing, you can use the MQTT Broker from the previous article. The article describes how you can use the client application MQTT Explorer. With the MQTT Explorer, you can test whether your publisher publishes the data correctly on the broker.
Create an MQTT Subscriber in Python
In this section, we implement the MQTT Subscriber with Python. First, we create a new Python file called sub.py
. Insert the following content into this file:
from mqtt_subscriber import Subscriber
sub = Subscriber()
We import the class Subscriber
and create an object of the class. Next, create the file in which we implement the Subscriber
class. We call this file mqtt_subscriber.py
. In this file, we add the following:
import paho.mqtt.client as mqtt
import mqtt_constants
def on_connect(client, userdata, flags, rc):
"""
The callback for when the client receives a CONNACK response from the server.
"""
relevant_topics = [("Param1", 2), ("Param2", 2), ("Param3", 2)]
print("Client: " + str(client))
print("User data: " + str(userdata))
print("Flags: " + str(flags))
print("Connected with result code " + str(rc))
client.subscribe(relevant_topics)
def on_message(client, userdata, msg):
"""
This function is called when a message is received on a topic.
"""
print("Client: " + str(client))
print("User data: " + str(userdata))
print("Topic: " + msg.topic + " and " + "QoS level: " + str(msg.qos))
print(str(msg.payload))
class Subscriber:
"""
This class contains the attributes and functions for MQTT subscriber.
For more information about the paho.mqtt.python library see:
https://github.com/eclipse/paho.mqtt.python
"""
username = mqtt_constants.USER_NAME
password = mqtt_constants.PASSWORD
ip_broker = mqtt_constants.IP_ADDRESS
port_broker = mqtt_constants.PORT_BROKER
keep_alive = mqtt_constants.KEEP_ALIVE
qos_level = mqtt_constants.QOS_LEVEL
topic = ""
client = mqtt.Client()
def __init__(self):
print("Broker-IP: " + str(self.ip_broker) + ", Broker-Port: " + str(self.port_broker))
if mqtt_constants.TLS_ON:
self.client.tls_set(ca_certs="./certs/ca.crt",
certfile="./certs/client.crt",
keyfile="./certs/client.key")
self.client.username_pw_set(self.username, self.password)
self.client.connect(self.ip_broker, self.port_broker, self.keep_alive) # mqtt broker
self.client.on_message = on_message
self.client.on_connect = on_connect
self.client.loop_forever()
def __del__(self):
self.client.disconnect() # disconnect
self.client.loop_stop() # stop loop
First, we import the library mqtt and our file mqtt_constants.py
. In the file mqtt_constants.py
, we save the relevant constants for the Subscriber. Create a file with the name mqtt_constants.py
and add the following content:
"""
This file contains environment variables for the MQTT broker.
Further information:
No TLS: PORT_BROKER = 1883, TLS_ON = False
TLS: PORT_BROKER = 8883, TLS_ON = True
"""
USER_NAME = "mosquitto"
PASSWORD = "mosquitto"
IP_ADDRESS = "<ip> or <computer-name>"
PORT_BROKER = 8883
QOS_LEVEL = 2
KEEP_ALIVE = 8000
TLS_ON = True
The file contains the connection data, the QoS level and whether encryption should be used or not. If we set the variable TLS_ON
to True and the port to 8883, there is an encrypted connection between the Subscriber and the MQTT Broker.
Now, back to the mqtt_subscriber.py
file. In this file, we first define the on_connect()
method. In this method, we subscribe to the relevant topics (“Param1”, “Param2”, and “Param3”). The number 2 indicates the QoS level.
Next, we define the on_message()
method. In this method, we receive messages from the MQTT Broker. There we can print the message payload (JSON string).
In addition, we define the class Subscriber
. In this class, we set the class attributes based on the values in the file mqtt_constants.py
. Then we create an object of the class Client
. In the constructor, we check whether an encrypted connection should be used or not used. If yes, we load the certificates in the certs
folder. Then we set the connection data and the relevant callbacks. After that, we start the subscriber in a loop forever. In the destructor, we terminate the connection to the MQTT broker.
If you created everything correctly, your MQTT Subscriber is ready to use. You can start the Subscriber with the following command (Make sure you are in the Python environment mqtt-env.):
$ python sub.py
You should then see the following output in your terminal:
The output shows the transmitted JSON string, information about the client, the topic and the QoS level. Congratulations, you have implemented a secure MQTT Publisher and Subscriber in Python.
Conclusion
In this article, we learned how to implement an MQTT Publisher and Subscriber in Python. The Publisher and Subscriber can establish a secure connection to the MQTT broker by securing the connection via TLS. You can use the article as a code base for your next MQTT project. Based on this, you can implement a secure MQTT variant for your project.
💡 Do you enjoy our content and want to read super-detailed articles about data science topics? If so, be sure to check out our premium offer!
Thanks so much for reading. Have a great day!
Useful literature
Books
- Practical Python Programming for IoT: Build advanced IoT projects using a Raspberry Pi 4, MQTT, RESTful APIs, WebSockets, and Python 3
- Hands-On MQTT Programming with Python: Work with the lightweight IoT protocol in Python
Links
The authors are NOT LIABLE for any damages arising from the software. Use of the software is at your own risk.
Leave a comment