Raspberry Pi(MQTT連携)

目次Raspberry Piロボット|センサー(温度・気圧・湿度人感)|赤外線リモコン読み上げMQTT連携IFTTT連携

「MQTT」(Message Queueing Telemetry Transport)はネットワークで利用できる、Publish(パブリッシュ)/Subscribe(サブスクライブ)型の通信プロトコルです。
家電や自動車など多種多様な「モノ」がインターネットにつながり、お互いに情報をやり取りするIoT(Internet of Things)を実現するのに適しており、
短いメッセージを頻繁に送受信することを想定して、シンプルで軽量(通信量やCPU負荷、少電力消費量)なプロトコルです。
ここでは、Raspberry Piとの相互通信を試します。

MQTTの構成

Publish(パブリッシュ)/Subscribe(サブスクライブ)モデルではメッセージの送信者をPublisher、メッセージの受信者をSubscriber、メッセージの仲介をするのがBrokerです。
PublisherはTopicを指定してメッセージを送信し、SubscriberはBrokerから受信したいTopicをfilterとして指定することで、 必要なメッセージを手に入れることができ、n対nの通信ができます。
Topicは「/」で区切られた階層構造になっており、メッセージの交換のキーになります。

ライブラリーのインストール

多くで利用しているMQTTのオープンソースであるMosquitto(モスキート)を利用します。
Mosquitto(Broker)とクライアントをインストールします。

$ sudo apt-get install mosquitto
$ sudo apt-get install mosquitto-clients

状態を「$ sudo service mosquitto status」コマンドで確認出来ます。

Mosquittoで通信の確認

コンソールよりメッセージ受信状態にして、別コンソールよりメッセージを送信すると、メッセージ受信が出来ることが確認出来ます。

pi@raspberrypi:~ $ mosquitto_sub -h localhost -t tp1/sub1
Hello

別コンソールよりメッセージ送信します。

$ mosquitto_pub -h localhost -t tp1/sub1 -m Hello

PythonプログラムでPub/Sub通信をする

ローカルネットワークで、Raspberry Pi相互の通信を確認します。

Raspberry Pi上にpub/subのPythonプログラムを書いてみます。ここでは以下のPahoを使います。 Eclipse Paho Raspberry Piからは、以下のようにインストール出来ます。

$ sudo pip install paho-mqtt

メッセージ受信プログラム(Subscriveプログラム(pisub.py))は、下記の通りです。

import paho.mqtt.client as mqtt
 
def on_connect(client,userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("pi/sub")
 
def on_message(client, userdata, msg):
     print(msg.topic+" "+str(msg.payload))
 
client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message 
client.connect("localhost",1883)
client.loop_forever() 

メッセージ送信プログラム(publishプログラム(pipub.py))は、下記の通りです。

import paho.mqtt.client as mqtt
 
def on_connect(client,userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.publish("pi/sub", "Hello from localhost", 0)
 
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
 
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
 
client.connect("localhost", 1883) 
client.loop_forever()

subscrive側のコンソールで(pisub.py)プログラムを実行して、publish側のコンソールで(pipub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/pisub.py
Connected with result code 0
pi/sub b'Hello from localhost'

publish側のコンソールで(pipub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/pipub.py
Connected with result code 0

CloudMQTTを試す。

MQTTのBrokerであるCloudMQTTのサービスを使ってRaspberry PiとWeb Client相互の通信を確認します。

CloudMQTTサービスの設定します。

Raspberry Pi間の通信

CloudMQTTの詳細情報より、ユーザー名、パスワード、Server、SSL Portを設定して接続します。

メッセ時受信プログラム(Subscriberプログラム(cloudsub.py))は、下記の通りです。

import paho.mqtt.client as mqtt
 
#def on_connect(client, userdata, rc):
def on_connect(client,userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("pi/sub1")
 
def on_message(client, userdata, msg):
     print(msg.topic+" "+str(msg.payload))
 
client = mqtt.Client()

client.on_connect = on_connect
client.on_message = on_message
 
client.tls_set("/etc/ssl/certs/ca-certificates.crt")

client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
 
client.loop_forever()

メッセージ送信プログラム(Publisherプログラム(cloudpub.py))は、下記の通りです。

import paho.mqtt.client as mqtt
 
def on_connect(client,userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.publish("pi/sub1", "Hello from cloud(localhost)", 0)
 
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))
 
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
 
client.tls_set("/etc/ssl/certs/ca-certificates.crt")
 
client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
 
client.loop_forever()

subscrive側のコンソールで(cloudsub.py)プログラムを実行して、publish側のコンソールで(cloudpub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/cloudsub.py
Connected with result code 0
pi/sub1 b'Hello from cloud(localhost)'

publish側のコンソールで(cloudpub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/cloudpub.py
Connected with result code 0

RaspberryPiとWeb Clientとの通信

RaspberryPi側のメッセージ受信/送信プログラム(cloudsub2.py)は下記の通りです。

import paho.mqtt.client as mqtt
 
#def on_connect(client, userdata, rc):
def on_connect(client,userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("pi/sub2")
 
def on_message(client, userdata, msg):
     print(msg.topic+" "+str(msg.payload))
     client.publish("cloud/sub2", "received ok form cloud(localhost)", 0) 

client = mqtt.Client(protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_message = on_message
 
client.tls_set("/etc/ssl/certs/ca-certificates.crt")

client.username_pw_set("ユーザー名", "パスワード")
client.connect("driver.cloudmqtt.com", 28607)
 
client.loop_forever()

Web Client側のメッセ時送信/受信プログラム(cloudpub.html)の場合、サーバの接続はWebsockets Portを指定します。

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>CloudMQTT</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<script type="text/javascript">
  // Create a client instance
  //client = new Paho.MQTT.Client("host", port,"client_id");
  client = new Paho.MQTT.Client("driver.cloudmqtt.com", 38607,"web_" + parseInt(Math.random() * 100, 10));
  //Example client = new Paho.MQTT.Client("m11.cloudmqtt.com", 32903, "web_" + parseInt(Math.random() * 100, 10));

  // set callback handlers
  client.onConnectionLost = onConnectionLost;
  client.onMessageArrived = onMessageArrived;
  var options = {
    useSSL: true,
    userName: "ユーザー名",
    password: "パスワード",
    onSuccess:onConnect,
    onFailure:doFail
  }
  // connect the client
  client.connect(options);
  tmp = "" ;
  // called when the client connects
  function onConnect() {
    // Once a connection has been made, make a subscription and send a message.
    console.log("onConnect");
    client.subscribe("cloud/sub2");
  }

  function doFail(e){ console.log(e); }

  // called when the client loses its connection
  function onConnectionLost(responseObject) {
    if (responseObject.errorCode !== 0) {
      console.log("onConnectionLost:"+responseObject.errorMessage);
    }
  }
  function sendm() {
    message = new Paho.MQTT.Message("Hello from cloud(web)");
    message.destinationName = "pi/sub2" ;
    client.send(message);
  }
  // called when a message arrives
  function onMessageArrived(message) {
    console.log("onMessageArrived:"+message.payloadString);
    tmp += message.payloadString + "<br>" ;
    msg.innerHTML = tmp ;
  }
</script>
</head>
<body>
<h1>CloudMQTT</h1>
<button onclick="sendm()">送信</button>
<p id="msg"></p>
</body>
</html>

subscrive側のコンソールで(cloudsub2.py)プログラムを実行して、WebClientで(cloudpub.html)を実行します。

pi@raspberrypi:~ $ python3 mqtt/cloudsub2.py
Connected with result code 0
pi/sub2 b'Hello from cloud(web)'

WebClientで(cloudpub.html)を実行します。

Beebotteを試す。

MQTTのBrokerであるBeebotteサービスを使ってメッセージの送受信を試してみます。

BeeBotteサービスの設定します

Beebotte(https://beebotte.com)にアクセスしてEmaiアドレスとパスワードを登録してアカウントを作成します。
作成したアカウントのメニューよりChannelsを選択して、設定画面より、新規にチャンネルを作ります。

コンソール画面よりからメッセージを送受信して確認します。

RaspberryPi間の通信

メッセージ送信プログラム(Subscriber(bbtsub.py)プログラム)は、下記の通りです。
接続先のHost、Port、Topic(Channel、Resource)、該当するTokenを指定します。

#!/usr/bin/python3
# -*- coding: utf-8 -*- 
#smartremocon.py
 
import paho.mqtt.client as mqtt  # MQTTのライブラリをインポート
import subprocess
import json
from time import sleep
 
HOST = 'mqtt.beebotte.com'
PORT = 8883
CA_CERTS = 'mqtt.beebotte.com.pem'
TOKEN = 'token_xxxxxxxx'    #Beebotteで作成したチャンネルのトークンを入力
TOPIC = 'xxxxx/xxxxxxxx'    #Beebotteで作成したトピック名を入力
 
def on_connect(client, userdata, flags, respons_code):  # ブローカーに接続できたときの処理
    print('status {0}'.format(respons_code))
 
def on_disconnect(client, userdata, flags, respons_code):# ブローカーが切断したときの処理
    print("Unexpected disconnection.")
    client.loop_stop()
 
def on_message(client, userdata, msg):  # メッセージが届いたときの処理
    print(msg.topic + ' ' + str(msg.payload))
    print(msg.payload.decode("utf-8"))
    try:
        data = json.loads(msg.payload.decode("utf-8"))["data"]
        print(data)
        for dat in data:
            print(dat)
    except :
        print("jsonフォーマットエラー")
#   finally:

if __name__ == '__main__':
    client = mqtt.Client()        # クラスのインスタンスの作成
    client.on_connect = on_connect     # 接続時のコールバック関数を登録
    client.on_disconnect = on_disconnect   # 切断時のコールバックを登録
    client.on_message = on_message  # メッセージ到着時のコールバック
    client.username_pw_set('token:%s' % TOKEN)
    client.tls_set("/etc/ssl/certs/ca-certificates.crt")
    client.connect(HOST, PORT)   # 接続
    client.subscribe(TOPIC)
    client.loop_forever() # 永久ループして待ち続ける

メッセージj送信プログラム(Publisher(bbtpub.py)プログラム)は、下記の通りです。

#!/usr/bin/python3
# -*- coding: utf-8 -*- 
import time
#smartremocon.py
 
import paho.mqtt.client as mqtt     # MQTTのライブラリをインポート
import subprocess
import json
import time
#from time import sleep
 
HOST = 'mqtt.beebotte.com'
PORT = 8883
CA_CERTS = 'mqtt.beebotte.com.pem'
TOKEN = 'token_xxxxxxxx'    #Beebotteで作成したチャンネルのトークンを入力
TOPIC = 'xxxxx/xxxxxxxx'    #Beebotteで作成したトピック名を入力
 
def on_connect(client, userdata, flags, respons_code):    # ブローカーに接続できたときの処理
    print('status {0}'.format(respons_code))

def on_publish(client, userdata, mid):    # publishが完了したときの処理
    print("publish: {0}".format(mid))

def on_disconnect(client, userdata, flags, respons_code): # ブローカーが切断したときの処理
    print("Unexpected disconnection.")
    client.loop_stop()

if __name__ == '__main__':
    client = mqtt.Client()                # クラスのインスタンス(実体)の作成
    client.on_connect = on_connect        # 接続時のコールバック関数を登録
    client.on_disconnect = on_disconnect  # 切断時のコールバックを登録
    client.on_publish = on_publish        # メッセージ送信時のコールバック
    client.username_pw_set('token:%s' % TOKEN)
    client.tls_set("/etc/ssl/certs/ca-certificates.crt")
    client.connect(HOST, PORT)            # 接続

    client.loop_start()   # 通信処理スタート
    client.publish(TOPIC, '{"data":["message1","message2","message3"]}', 0)
    time.sleep(3)
    client.loop_stop()    # 通信処理ストップ

subscrive側のコンソールで(bbtsub.py)プログラムを実行して、publish側のコンソールで(bbtpub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/bbtsub.py
status 0
{"data":["message1","message2","message3"]}
message1
message2
message3

publish側のコンソールで(cloudpub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/bbtpub.py
publish: 1
status 0

WebClientのメッセージ送信(bbtpub.html)プログラムは、下記の通りです。
アクセスは、指定のURL(Channel、Resource、Token情報を含みます。)でアクセスします。

<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Beebotte</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<script type="text/javascript">
//ボタンが押された時のイベント
//document.getElementById("release").addEventListener('click', function(){ releaseShot();});
function releaseShot(){
    var url = 'https://api.beebotte.com/v1/data/publish/xxxxx/xxxxxxxx?token=token_xxxxxxxx';
    var param = {"data" : [form.inp.value],};
    const headers = new Headers( { "Content-type" : "application/json" } );
    return new Promise((resolve, reject) =>{
        fetch(url, {
            method : 'POST',
            body : JSON.stringify(param),
            headers: headers
        }).then((response) => {
            target = document.getElementById("output");
            target.innerText = "完了しました。"
        })
    });
}
</script>
</head>
<body>
<h1>Beebotte</h1>
<div class="container">
<form name="form"><input type="text" name="inp"></form>  
<button id="release" class="button" onclick="releaseShot();">実行</button> 
<span id="output"></span>
 
</div>
<p id="msg"></p>
</body>
</html>

subscrive側のコンソールで(bbtsub.py)プログラムを実行して、Web Clientで(bbtpub.py)を実行します。

pi@raspberrypi:~ $ python3 mqtt/bbtsub.py
status 0
{"data":["室温の要求"],"ispublic":true,"ts":1646114996849}
室温の要求

Web Clientで(bbtpub.html)を実行します。

RaspberryPiよりセンサー情報を取得する

Webプラウザより、RaspberryPiに室内温度の取得要求を出して、Webプラウザに表示します。
プログラムのソース作成したHTML(cloudsensor.html)とRaspberryPiのPythonスクリプト(cloudsensor.py)です。
Raspberry Piのセンサー情報取得は「Raspberry Pi(温度・気圧・湿度センサー)」を参照してください。

目次Raspberry Piロボット|センサー(温度・気圧・湿度人感)|赤外線リモコン読み上げMQTT連携IFTTT連携