今回はラズパイに接続した温度・湿度・気圧センサーの情報をAWS IoT CoreにPublishしてみます。 センサーは2つ使用しました。接続の取り回しが簡単なI2Cインターフェースで使用できるものを選びました。
使用した環境:
ラズパイにセンサーを接続する
温湿度・気圧センサー(BME280)
配線方法
BME280 | ラズパイ | 備考 |
---|---|---|
1(VDD) | 1(3.3V) | |
2(GND) | 6(GND) | |
3(CSB) | --- | 未使用 |
4(SDI) | 3(SDA) | |
5(SDO) | 6(GND) | アドレス選択 GND:0x76, VDD:0x77 |
6(SCK) | 5(SCL) |
i2cdetectで信号が取れているか確認します。
$ sudo i2cdetect -y 1 0 1 2 3 4 5 6 7 8 9 a b c d e f 00: -- -- -- -- -- -- -- -- -- -- -- -- -- 10: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 20: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 30: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 40: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 50: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 60: -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- 70: -- -- -- -- -- -- 76 --
アドレス0x76が検出されていれば、配線はOKです。
動作確認
python3用のサンプルコードはここで配布されています。 https://github.com/PrinzEugen7/Lesson/tree/master/Python/raspi3/gpio/sensor/bem280
$ wget https://raw.githubusercontent.com/PrinzEugen7/Lesson/master/Python/raspi3/gpio/sensor/bem280/bme280.py $ pip3 install smbus2 $ python3 bme280.py Temp: 28.272287298791344 C Humid: 52.54910825396235 % Pressure: 1003.1744785362799 [%]
floatの精度で温度、湿度、気圧が取得できます。
照度センサー(BH1750)
配線
BH1750 | ラズパイ | 備考 |
---|---|---|
1(VCC) | 1(3.3V) | |
2(GND) | 6(GND) | |
3(SCL) | 5(SCL) | |
4(SDA) | 3(SDA) | |
5(ADDR) | 6(GND) | アドレス選択 OPEN or GND:0x23, VCC:0x5c |
5ピンのADDRでは開放時は0x23ですが、安定しない場合があるので、しっかりとGNDに落とします。
sudo i2cdetect -y 1
で0x23が検出されることを確認します。
動作確認
BH1750は単純にSMBusでデータを読み出すだけで照度を取得することができます。
bh1750.py:
import smbus class Bh1750(): def get_lux(): bus = smbus.SMBus(1) addr = 0x23 luxRead = bus.read_i2c_block_data(addr, 0x11) lux = luxRead[1] * 10 return lux if __name__ == '__main__': lux = Bh1750.get_lux() print("Lux:", lux)
$ pip3 install smbus $ python3 bh1750.py Lux: 1100
野外の温湿度と気圧
野外の気温と室内の温度をモニタリングしたかったので、天気サイトから気温、湿度、気圧をスクレイピングするコードを用意しました。天気の地域はWeathernewsのサイトで地名を検索し、表示されたページのURLを使います。また、パースしやすいようにURLの末尾のlang指定は lang=en
で英語表示に切り替えます。
weather.py:
import urllib.request import cchardet from bs4 import BeautifulSoup class Weathernews(): def get_weather(url): with urllib.request.urlopen(url) as res: byte = res.read() html = byte.decode(cchardet.detect(byte)['encoding']) soup = BeautifulSoup(html, 'html.parser') for block in soup.find_all('div', {'class':{'weather-now__cont'}}): items = block.text.split() #print('[items]:', items, '\n') #['14:30', 'WeatherCloudy', 'Temp.31.4℃', 'RH68%', 'Pres.999hPa', 'Wind', 'S', '3m/s', 'Sunrise', '05:05', '|', 'Sunset', '18:27'] # 0 1 2 3 4 5 6 7 8 9 10 11 12 time = items[0] weather = items[1].replace('Weather', '') temp = items[2].replace('Temp.', '').replace('℃', '') humid = items[3].replace('RH', '').replace('%', '') pres = items[4].replace('Pres.', '').replace('hPa', '') return time, weather, temp, humid, pres def main(): #千代田区 url = 'https://weathernews.jp/onebox/35.680030/139.762025/q=%E6%9D%B1%E4%BA%AC%E9%83%BD%E5%8D%83%E4%BB%A3%E7%94%B0%E5%8C%BA&temp=c&lang=en' time, weather, temp, humid, pres = Weathernews.get_weather(url) print('time:', time) print('weather:', weather) print('temp:', temp) print('humid:', humid) print('pres:', pres) if __name__ == '__main__': main()
AWS IoT Device SDK for Pythonのインストール
テストコード
「AWS IoT Device SDK for Python on GitHub」のreadmeを参考に作成します。 https://github.com/aws/aws-iot-device-sdk-python
証明書ファイルやキーファイルはAWSコンソールからダウンロードしたものを、あらかじめラズパイに転送しておきます。
client.py:
# Import SDK packages from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient # For certificate based connection myMQTTClient = AWSIoTMQTTClient("myClientID") # For Websocket connection # myMQTTClient = AWSIoTMQTTClient("myClientID", useWebsocket=True) # Configurations # For TLS mutual authentication myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 8883) # For Websocket # myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 443) # For TLS mutual authentication with TLS ALPN extension # myMQTTClient.configureEndpoint("YOUR.ENDPOINT", 443) myMQTTClient.configureCredentials("YOUR/ROOT/CA/PATH", "PRIVATE/KEY/PATH", "CERTIFICATE/PATH") # For Websocket, we only need to configure the root CA # myMQTTClient.configureCredentials("YOUR/ROOT/CA/PATH") myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec myMQTTClient.configureMQTTOperationTimeout(5) # 5 sec myMQTTClient.connect() myMQTTClient.publish("myTopic", "myPayload", 0) myMQTTClient.disconnect()
動作確認
ラズパイに「AWS IoT Device SDK for Python」をインストールします。
$ pip3 install --user AWSIoTPythonSDK
AWS IoT Coreのコンソールの「テスト」タブでトピックをサブスクリプションします。
ラズパイからトッピングを発行します。
$ python3 client.py
AWSコンソールにメッセージが表示されます。
「myPayload」が表示されれば準備OKです。
センサー情報を発行する
準備ができたので、いよいよ実際にセンサー情報をAWS IoT Coreに送ってみます。SDKのサンプルを参考にクライアントコードを作成します。
クライアントコード
import json import time from datetime import datetime from bme280 import Bme280 from bh1750 import Bh1750 from weathernews import Weathernews # Import SDK packages from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient def support_datetime_default(o): if isinstance(o, datetime): return o.isoformat() def get_data(): # 温湿度センサー bme280 = Bme280(0x76, 1) temp, humid, pres = bme280.get_data() # 照度センサー lux = Bh1750.get_lux() # 天気情報 # 千代田区の天気 url = 'https://weathernews.jp/onebox/35.680030/139.762025/q=%E6%9D%B1%E4%BA%AC%E9%83%BD%E5%8D%83%E4%BB%A3%E7%94%B0%E5%8C%BA&temp=c&lang=en' wt_time, wt_tenki, wt_temp, wt_humid, wt_pres = Weathernews.get_weather(url) sensor = {} sensor['temp'] = temp sensor['humid'] = humid sensor['pres'] = pres sensor['lux'] = lux weather = {} weather['time'] = wt_time weather['tenki'] = wt_tenki weather['temp'] = wt_temp weather['humid'] = wt_humid weather['pres'] = wt_pres message = {} message['timestamp'] = datetime.now() message['sensor'] = sensor message['weather'] = weather messageJson = json.dumps(message, default=support_datetime_default) return messageJson def main(): # For certificate based connection myMQTTClient = AWSIoTMQTTClient("myClientID") myMQTTClient.configureEndpoint("xxxxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com", 8883) myMQTTClient.configureCredentials("AmazonRootCA1.pem", "private.pem.key", "certificate.pem.crt") myMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing myMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz myMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec myMQTTClient.configureMQTTOperationTimeout(5) # 5 sec myMQTTClient.connect() messageJson = get_data() myMQTTClient.publish("myTopic", messageJson, 1) myMQTTClient.disconnect() if __name__ == '__main__': main()
メモ:Date側をJSONに変換する方法はこの記事を参考にしました。 pythonでjson出力する際で対応していない型(e.g. datetime)の値を変換しながら出力したい - Qiita
センサー情報をPublishする
ラズパイからPublishします。
$ python3 publish.py
JSON形式でセンサー情報と天気情報が表示されます。
まとめ
ラズパイを使って手軽にAWSのIoT Coreを試すことができました。
次回はセンサーからの情報をグラフを使って視覚化してみたいと思います。