「MQTT」(Message Queueing Telemetry Transport)はネットワークで利用できる、Publish(パブリッシュ)/Subscribe(サブスクライブ)型の通信プロトコルです。
家電や自動車など多種多様な「モノ」がインターネットにつながり、お互いに情報をやり取りするIoT(Internet of Things)を実現するのに適しており、
短いメッセージを頻繁に送受信することを想定して、シンプルで軽量(通信量やCPU負荷、少電力消費量)なプロトコルです。
ここでは、Raspberry Piとの相互通信を試します。
Publish(パブリッシュ)/Subscribe(サブスクライブ)モデルではメッセージの送信者をPublisher、メッセージの受信者をSubscriber、メッセージの仲介をするのがBrokerです。
PublisherはTopicを指定してメッセージを送信し、SubscriberはBrokerから受信したいTopicをfilterとして指定することで、
必要なメッセージを手に入れることができ、n対nの通信ができます。
Topicは「/」で区切られた階層構造になっており、メッセージの交換のキーになります。

多くで利用しているMQTTのオープンソースであるMosquitto(モスキート)を利用します。
Mosquitto(Broker)とクライアントをインストールします。
状態を「$ sudo service mosquitto status」コマンドで確認出来ます。
コンソールよりメッセージ受信状態にして、別コンソールよりメッセージを送信すると、メッセージ受信が出来ることが確認出来ます。
別コンソールよりメッセージ送信します。
ローカルネットワークで、Raspberry Pi相互の通信を確認します。
Raspberry Pi上にpub/subのPythonプログラムを書いてみます。ここでは以下のPahoを使います。 Eclipse Paho Raspberry Piからは、以下のようにインストール出来ます。
メッセージ受信プログラム(Subscriveプログラム(pisub.py))は、下記の通りです。
import paho.mqtt.client as mqtt
def on_connect(client,userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("pi/sub")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost",1883)
client.loop_forever()
メッセージ送信プログラム(publishプログラム(pipub.py))は、下記の通りです。
import paho.mqtt.client as mqtt
def on_connect(client,userdata, flags, rc):
print("Connected with result code "+str(rc))
client.publish("pi/sub", "Hello from localhost", 0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883)
client.loop_forever()
subscrive側のコンソールで(pisub.py)プログラムを実行して、publish側のコンソールで(pipub.py)を実行します。
publish側のコンソールで(pipub.py)を実行します。
MQTTのBrokerであるCloudMQTTのサービスを使ってRaspberry PiとWeb Client相互の通信を確認します。

CloudMQTTの詳細情報より、ユーザー名、パスワード、Server、SSL Portを設定して接続します。
メッセ時受信プログラム(Subscriberプログラム(cloudsub.py))は、下記の通りです。
import paho.mqtt.client as mqtt
#def on_connect(client, userdata, rc):
def on_connect(client,userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("pi/sub1")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
client.loop_forever()
メッセージ送信プログラム(Publisherプログラム(cloudpub.py))は、下記の通りです。
import paho.mqtt.client as mqtt
def on_connect(client,userdata, flags, rc):
print("Connected with result code "+str(rc))
client.publish("pi/sub1", "Hello from cloud(localhost)", 0)
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
client.loop_forever()
subscrive側のコンソールで(cloudsub.py)プログラムを実行して、publish側のコンソールで(cloudpub.py)を実行します。
publish側のコンソールで(cloudpub.py)を実行します。
RaspberryPi側のメッセージ受信/送信プログラム(cloudsub2.py)は下記の通りです。
import paho.mqtt.client as mqtt
#def on_connect(client, userdata, rc):
def on_connect(client,userdata, flags, rc):
print("Connected with result code "+str(rc))
client.subscribe("pi/sub2")
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client.publish("cloud/sub2", "received ok form cloud(localhost)", 0)
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.on_connect = on_connect
client.on_message = on_message
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
client.loop_forever()
Web Client側のメッセ時送信/受信プログラム(cloudpub.html)の場合、サーバの接続はWebsockets Portを指定します。
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>CloudMQTT</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<script type="text/javascript">
// Create a client instance
//client = new Paho.MQTT.Client("host", port,"client_id");
client = new Paho.MQTT.Client("driver.cloudmqtt.com", 38607,"web_" + parseInt(Math.random() * 100, 10));
//Example client = new Paho.MQTT.Client("m11.cloudmqtt.com", 32903, "web_" + parseInt(Math.random() * 100, 10));
// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
var options = {
useSSL: true,
userName: "ユーザー名",
password: "パスワード",
onSuccess:onConnect,
onFailure:doFail
}
// connect the client
client.connect(options);
tmp = "" ;
// called when the client connects
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("cloud/sub2");
}
function doFail(e){ console.log(e); }
// called when the client loses its connection
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
function sendm() {
message = new Paho.MQTT.Message("Hello from cloud(web)");
message.destinationName = "pi/sub2" ;
client.send(message);
}
// called when a message arrives
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.payloadString);
tmp += message.payloadString + "<br>" ;
msg.innerHTML = tmp ;
}
</script>
</head>
<body>
<h1>CloudMQTT</h1>
<button onclick="sendm()">送信</button>
<p id="msg"></p>
</body>
</html>
subscrive側のコンソールで(cloudsub2.py)プログラムを実行して、WebClientで(cloudpub.html)を実行します。
WebClientで(cloudpub.html)を実行します。
MQTTのBrokerであるBeebotteサービスを使ってメッセージの送受信を試してみます。

Beebotte(https://beebotte.com)にアクセスしてEmaiアドレスとパスワードを登録してアカウントを作成します。
作成したアカウントのメニューよりChannelsを選択して、設定画面より、新規にチャンネルを作ります。
コンソール画面よりからメッセージを送受信して確認します。
メッセージ送信プログラム(Subscriber(bbtsub.py)プログラム)は、下記の通りです。
接続先のHost、Port、Topic(Channel、Resource)、該当するTokenを指定します。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#smartremocon.py
import paho.mqtt.client as mqtt # MQTTのライブラリをインポート
import subprocess
import json
from time import sleep
HOST = 'mqtt.beebotte.com'
PORT = 8883
CA_CERTS = 'mqtt.beebotte.com.pem'
TOKEN = 'token_xxxxxxxx' #Beebotteで作成したチャンネルのトークンを入力
TOPIC = 'xxxxx/xxxxxxxx' #Beebotteで作成したトピック名を入力
def on_connect(client, userdata, flags, respons_code): # ブローカーに接続できたときの処理
print('status {0}'.format(respons_code))
def on_disconnect(client, userdata, flags, respons_code):# ブローカーが切断したときの処理
print("Unexpected disconnection.")
client.loop_stop()
def on_message(client, userdata, msg): # メッセージが届いたときの処理
print(msg.topic + ' ' + str(msg.payload))
print(msg.payload.decode("utf-8"))
try:
data = json.loads(msg.payload.decode("utf-8"))["data"]
print(data)
for dat in data:
print(dat)
except :
print("jsonフォーマットエラー")
# finally:
if __name__ == '__main__':
client = mqtt.Client() # クラスのインスタンスの作成
client.on_connect = on_connect # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect # 切断時のコールバックを登録
client.on_message = on_message # メッセージ到着時のコールバック
client.username_pw_set('token:%s' % TOKEN)
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
client.connect(HOST, PORT) # 接続
client.subscribe(TOPIC)
client.loop_forever() # 永久ループして待ち続ける
メッセージj送信プログラム(Publisher(bbtpub.py)プログラム)は、下記の通りです。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import time
#smartremocon.py
import paho.mqtt.client as mqtt # MQTTのライブラリをインポート
import subprocess
import json
import time
#from time import sleep
HOST = 'mqtt.beebotte.com'
PORT = 8883
CA_CERTS = 'mqtt.beebotte.com.pem'
TOKEN = 'token_xxxxxxxx' #Beebotteで作成したチャンネルのトークンを入力
TOPIC = 'xxxxx/xxxxxxxx' #Beebotteで作成したトピック名を入力
def on_connect(client, userdata, flags, respons_code): # ブローカーに接続できたときの処理
print('status {0}'.format(respons_code))
def on_publish(client, userdata, mid): # publishが完了したときの処理
print("publish: {0}".format(mid))
def on_disconnect(client, userdata, flags, respons_code): # ブローカーが切断したときの処理
print("Unexpected disconnection.")
client.loop_stop()
if __name__ == '__main__':
client = mqtt.Client() # クラスのインスタンス(実体)の作成
client.on_connect = on_connect # 接続時のコールバック関数を登録
client.on_disconnect = on_disconnect # 切断時のコールバックを登録
client.on_publish = on_publish # メッセージ送信時のコールバック
client.username_pw_set('token:%s' % TOKEN)
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
client.connect(HOST, PORT) # 接続
client.loop_start() # 通信処理スタート
client.publish(TOPIC, '{"data":["message1","message2","message3"]}', 0)
time.sleep(3)
client.loop_stop() # 通信処理ストップ
subscrive側のコンソールで(bbtsub.py)プログラムを実行して、publish側のコンソールで(bbtpub.py)を実行します。
publish側のコンソールで(cloudpub.py)を実行します。
WebClientのメッセージ送信(bbtpub.html)プログラムは、下記の通りです。
アクセスは、指定のURL(Channel、Resource、Token情報を含みます。)でアクセスします。
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Beebotte</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<script type="text/javascript">
//ボタンが押された時のイベント
//document.getElementById("release").addEventListener('click', function(){ releaseShot();});
function releaseShot(){
var url = 'https://api.beebotte.com/v1/data/publish/xxxxx/xxxxxxxx?token=token_xxxxxxxx';
var param = {"data" : [form.inp.value],};
const headers = new Headers( { "Content-type" : "application/json" } );
return new Promise((resolve, reject) =>{
fetch(url, {
method : 'POST',
body : JSON.stringify(param),
headers: headers
}).then((response) => {
target = document.getElementById("output");
target.innerText = "完了しました。"
})
});
}
</script>
</head>
<body>
<h1>Beebotte</h1>
<div class="container">
<form name="form"><input type="text" name="inp"></form>
<button id="release" class="button" onclick="releaseShot();">実行</button>
<span id="output"></span>
</div>
<p id="msg"></p>
</body>
</html>
subscrive側のコンソールで(bbtsub.py)プログラムを実行して、Web Clientで(bbtpub.py)を実行します。
Web Clientで(bbtpub.html)を実行します。
Webプラウザより、RaspberryPiに室内温度の取得要求を出して、Webプラウザに表示します。
プログラムのソース作成したHTML(cloudsensor.html)とRaspberryPiのPythonスクリプト(cloudsensor.py)です。
Raspberry Piのセンサー情報取得は「Raspberry Pi(温度・気圧・湿度センサー)」を参照してください。