「MQTT」(Message Queueing Telemetry Transport)はネットワークで利用できる、Publish(パブリッシュ)/Subscribe(サブスクライブ)型の通信プロトコルです。
家電や自動車など多種多様な「モノ」がインターネットにつながり、お互いに情報をやり取りするIoT(Internet of Things)を実現するのに適しており、
短いメッセージを頻繁に送受信することを想定して、シンプルで軽量(通信量やCPU負荷、少電力消費量)なプロトコルです。
ここでは、Raspberry Piとの相互通信を試します。
Publish(パブリッシュ)/Subscribe(サブスクライブ)モデルではメッセージの送信者をPublisher、メッセージの受信者をSubscriber、メッセージの仲介をするのがBrokerです。
PublisherはTopicを指定してメッセージを送信し、SubscriberはBrokerから受信したいTopicをfilterとして指定することで、
必要なメッセージを手に入れることができ、n対nの通信ができます。
Topicは「/」で区切られた階層構造になっており、メッセージの交換のキーになります。
多くで利用しているMQTTのオープンソースであるMosquitto(モスキート)を利用します。
Mosquitto(Broker)とクライアントをインストールします。
状態を「$ sudo service mosquitto status」コマンドで確認出来ます。
コンソールよりメッセージ受信状態にして、別コンソールよりメッセージを送信すると、メッセージ受信が出来ることが確認出来ます。
別コンソールよりメッセージ送信します。
ローカルネットワークで、Raspberry Pi相互の通信を確認します。
Raspberry Pi上にpub/subのPythonプログラムを書いてみます。ここでは以下のPahoを使います。 Eclipse Paho Raspberry Piからは、以下のようにインストール出来ます。
メッセージ受信プログラム(Subscriveプログラム(pisub.py))は、下記の通りです。
import paho.mqtt.client as mqtt def on_connect(client,userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("pi/sub") def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("localhost",1883) client.loop_forever()
メッセージ送信プログラム(publishプログラム(pipub.py))は、下記の通りです。
import paho.mqtt.client as mqtt def on_connect(client,userdata, flags, rc): print("Connected with result code "+str(rc)) client.publish("pi/sub", "Hello from localhost", 0) def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect("localhost", 1883) client.loop_forever()
subscrive側のコンソールで(pisub.py)プログラムを実行して、publish側のコンソールで(pipub.py)を実行します。
publish側のコンソールで(pipub.py)を実行します。
MQTTのBrokerであるCloudMQTTのサービスを使ってRaspberry PiとWeb Client相互の通信を確認します。
CloudMQTTの詳細情報より、ユーザー名、パスワード、Server、SSL Portを設定して接続します。
メッセ時受信プログラム(Subscriberプログラム(cloudsub.py))は、下記の通りです。
import paho.mqtt.client as mqtt #def on_connect(client, userdata, rc): def on_connect(client,userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("pi/sub1") def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.tls_set("/etc/ssl/certs/ca-certificates.crt") client.username_pw_set("ユーザー名", "パスワード") client.connect("driver.cloudmqtt.com", 28607) client.loop_forever()
メッセージ送信プログラム(Publisherプログラム(cloudpub.py))は、下記の通りです。
import paho.mqtt.client as mqtt def on_connect(client,userdata, flags, rc): print("Connected with result code "+str(rc)) client.publish("pi/sub1", "Hello from cloud(localhost)", 0) def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.tls_set("/etc/ssl/certs/ca-certificates.crt") client.username_pw_set("ユーザー名", "パスワード") client.connect("driver.cloudmqtt.com", 28607) client.loop_forever()
subscrive側のコンソールで(cloudsub.py)プログラムを実行して、publish側のコンソールで(cloudpub.py)を実行します。
publish側のコンソールで(cloudpub.py)を実行します。
RaspberryPi側のメッセージ受信/送信プログラム(cloudsub2.py)は下記の通りです。
import paho.mqtt.client as mqtt #def on_connect(client, userdata, rc): def on_connect(client,userdata, flags, rc): print("Connected with result code "+str(rc)) client.subscribe("pi/sub2") def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client.publish("cloud/sub2", "received ok form cloud(localhost)", 0) client = mqtt.Client(protocol=mqtt.MQTTv311) client.on_connect = on_connect client.on_message = on_message client.tls_set("/etc/ssl/certs/ca-certificates.crt") client.username_pw_set("ユーザー名", "パスワード") client.connect("driver.cloudmqtt.com", 28607) client.loop_forever()
Web Client側のメッセ時送信/受信プログラム(cloudpub.html)の場合、サーバの接続はWebsockets Portを指定します。
<html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>CloudMQTT</title> <meta name="viewport" content="width=device-width,initial-scale=1"> <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script> <script type="text/javascript"> // Create a client instance //client = new Paho.MQTT.Client("host", port,"client_id"); client = new Paho.MQTT.Client("driver.cloudmqtt.com", 38607,"web_" + parseInt(Math.random() * 100, 10)); //Example client = new Paho.MQTT.Client("m11.cloudmqtt.com", 32903, "web_" + parseInt(Math.random() * 100, 10)); // set callback handlers client.onConnectionLost = onConnectionLost; client.onMessageArrived = onMessageArrived; var options = { useSSL: true, userName: "ユーザー名", password: "パスワード", onSuccess:onConnect, onFailure:doFail } // connect the client client.connect(options); tmp = "" ; // called when the client connects function onConnect() { // Once a connection has been made, make a subscription and send a message. console.log("onConnect"); client.subscribe("cloud/sub2"); } function doFail(e){ console.log(e); } // called when the client loses its connection function onConnectionLost(responseObject) { if (responseObject.errorCode !== 0) { console.log("onConnectionLost:"+responseObject.errorMessage); } } function sendm() { message = new Paho.MQTT.Message("Hello from cloud(web)"); message.destinationName = "pi/sub2" ; client.send(message); } // called when a message arrives function onMessageArrived(message) { console.log("onMessageArrived:"+message.payloadString); tmp += message.payloadString + "<br>" ; msg.innerHTML = tmp ; } </script> </head> <body> <h1>CloudMQTT</h1> <button onclick="sendm()">送信</button> <p id="msg"></p> </body> </html>
subscrive側のコンソールで(cloudsub2.py)プログラムを実行して、WebClientで(cloudpub.html)を実行します。
WebClientで(cloudpub.html)を実行します。
MQTTのBrokerであるBeebotteサービスを使ってメッセージの送受信を試してみます。
Beebotte(https://beebotte.com)にアクセスしてEmaiアドレスとパスワードを登録してアカウントを作成します。
作成したアカウントのメニューよりChannelsを選択して、設定画面より、新規にチャンネルを作ります。
コンソール画面よりからメッセージを送受信して確認します。
メッセージ送信プログラム(Subscriber(bbtsub.py)プログラム)は、下記の通りです。
接続先のHost、Port、Topic(Channel、Resource)、該当するTokenを指定します。
#!/usr/bin/python3 # -*- coding: utf-8 -*- #smartremocon.py import paho.mqtt.client as mqtt # MQTTのライブラリをインポート import subprocess import json from time import sleep HOST = 'mqtt.beebotte.com' PORT = 8883 CA_CERTS = 'mqtt.beebotte.com.pem' TOKEN = 'token_xxxxxxxx' #Beebotteで作成したチャンネルのトークンを入力 TOPIC = 'xxxxx/xxxxxxxx' #Beebotteで作成したトピック名を入力 def on_connect(client, userdata, flags, respons_code): # ブローカーに接続できたときの処理 print('status {0}'.format(respons_code)) def on_disconnect(client, userdata, flags, respons_code):# ブローカーが切断したときの処理 print("Unexpected disconnection.") client.loop_stop() def on_message(client, userdata, msg): # メッセージが届いたときの処理 print(msg.topic + ' ' + str(msg.payload)) print(msg.payload.decode("utf-8")) try: data = json.loads(msg.payload.decode("utf-8"))["data"] print(data) for dat in data: print(dat) except : print("jsonフォーマットエラー") # finally: if __name__ == '__main__': client = mqtt.Client() # クラスのインスタンスの作成 client.on_connect = on_connect # 接続時のコールバック関数を登録 client.on_disconnect = on_disconnect # 切断時のコールバックを登録 client.on_message = on_message # メッセージ到着時のコールバック client.username_pw_set('token:%s' % TOKEN) client.tls_set("/etc/ssl/certs/ca-certificates.crt") client.connect(HOST, PORT) # 接続 client.subscribe(TOPIC) client.loop_forever() # 永久ループして待ち続ける
メッセージj送信プログラム(Publisher(bbtpub.py)プログラム)は、下記の通りです。
#!/usr/bin/python3 # -*- coding: utf-8 -*- import time #smartremocon.py import paho.mqtt.client as mqtt # MQTTのライブラリをインポート import subprocess import json import time #from time import sleep HOST = 'mqtt.beebotte.com' PORT = 8883 CA_CERTS = 'mqtt.beebotte.com.pem' TOKEN = 'token_xxxxxxxx' #Beebotteで作成したチャンネルのトークンを入力 TOPIC = 'xxxxx/xxxxxxxx' #Beebotteで作成したトピック名を入力 def on_connect(client, userdata, flags, respons_code): # ブローカーに接続できたときの処理 print('status {0}'.format(respons_code)) def on_publish(client, userdata, mid): # publishが完了したときの処理 print("publish: {0}".format(mid)) def on_disconnect(client, userdata, flags, respons_code): # ブローカーが切断したときの処理 print("Unexpected disconnection.") client.loop_stop() if __name__ == '__main__': client = mqtt.Client() # クラスのインスタンス(実体)の作成 client.on_connect = on_connect # 接続時のコールバック関数を登録 client.on_disconnect = on_disconnect # 切断時のコールバックを登録 client.on_publish = on_publish # メッセージ送信時のコールバック client.username_pw_set('token:%s' % TOKEN) client.tls_set("/etc/ssl/certs/ca-certificates.crt") client.connect(HOST, PORT) # 接続 client.loop_start() # 通信処理スタート client.publish(TOPIC, '{"data":["message1","message2","message3"]}', 0) time.sleep(3) client.loop_stop() # 通信処理ストップ
subscrive側のコンソールで(bbtsub.py)プログラムを実行して、publish側のコンソールで(bbtpub.py)を実行します。
publish側のコンソールで(cloudpub.py)を実行します。
WebClientのメッセージ送信(bbtpub.html)プログラムは、下記の通りです。
アクセスは、指定のURL(Channel、Resource、Token情報を含みます。)でアクセスします。
<!DOCTYPE html> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Beebotte</title> <meta name="viewport" content="width=device-width,initial-scale=1"> <script type="text/javascript"> //ボタンが押された時のイベント //document.getElementById("release").addEventListener('click', function(){ releaseShot();}); function releaseShot(){ var url = 'https://api.beebotte.com/v1/data/publish/xxxxx/xxxxxxxx?token=token_xxxxxxxx'; var param = {"data" : [form.inp.value],}; const headers = new Headers( { "Content-type" : "application/json" } ); return new Promise((resolve, reject) =>{ fetch(url, { method : 'POST', body : JSON.stringify(param), headers: headers }).then((response) => { target = document.getElementById("output"); target.innerText = "完了しました。" }) }); } </script> </head> <body> <h1>Beebotte</h1> <div class="container"> <form name="form"><input type="text" name="inp"></form> <button id="release" class="button" onclick="releaseShot();">実行</button> <span id="output"></span> </div> <p id="msg"></p> </body> </html>
subscrive側のコンソールで(bbtsub.py)プログラムを実行して、Web Clientで(bbtpub.py)を実行します。
Web Clientで(bbtpub.html)を実行します。
Webプラウザより、RaspberryPiに室内温度の取得要求を出して、Webプラウザに表示します。
プログラムのソース作成したHTML(cloudsensor.html)とRaspberryPiのPythonスクリプト(cloudsensor.py)です。
Raspberry Piのセンサー情報取得は「Raspberry Pi(温度・気圧・湿度センサー)」を参照してください。