2023-07-25
“物联网”(Internet of Things,缩写 IoT)一词诞生于 15 年前。近几年,在 5G、窄带物联网、云计算、大数据、人工智能、区块链和边缘计算等新一代信息技术加持下,物联网获得高速发展。目前,物联网的实际应用,已在制造业、农业、家居、交通和车联网、医疗健康等多个领域取得显著成果。
在物联网多年高速发展中,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)成为使用最广泛的传输协议。
基于 MQTT,如何简单高效的实现端到端、一站式解决“万物互联”的方案,也备受关注。
我们观察到,在工业物联网和车联网行业中,有诸多采用 EMQ 来实现端到端、一站式解决“万物互联”的方案,本文将为大家演示在亚马逊云平台上如何利用 EMQ 实现数据接入和展现。
云边一体的 IoT 接入解决方案
EMQ 是一个云原生 MQTT 消息服务器和流处理数据库,为企业云边端的海量物联网数据提供高可靠、高性能的实时连接、移动、处理与集成,助力构建「面向未来」的物联网平台与应用。
我们就以工业物联网解决方案为例,展示整个方案的实施方式,和效果展现。
完整的物联网端到端数据解决方案
为了更便于阅读和理解,本文通过四部分,为大家讲解。
组件名称 | 版本 | 功能介绍 | 部署方式 |
EMQX | Enterprise 4.4.19 | 分布式物联网接入平台 | Amazon Linux 2 |
Neuron | 2.5.1 | 开源的、轻量级工业协议网关软件,支持数十种工业协议的一站式设备连接、数据接入、MQTT 协议转换 | Docker |
HstreamDB Cloud | N/A | 实时数据流的接入、存储、处理、分发等环节进行全生命周期管理的流数据库 | Cloud platform |
HstreamDB | v0.15.0 | Docker | |
MQTTX | 1.9.3 | MQTTX 跨平台 MQTT 桌面和 CLI 客户端,用于学习和验证 EMQX 功能,发布订阅消息等。 | Windows Server |
PeakHMI Slave Simulators | 3.0.0 | 模拟器,用于模拟 Modbus 设备数据 | Windows Server |
资源类型 | 系统类型 | 运行组件 |
EC2 | Amazon Linux 2 | EMQX |
Neuron | ||
HstreamDB | ||
EC2 | Windows Server | PeakHMI Slave Simulators |
MQTTX |
EMQX 企业版是一款「随处运行,无限连接,任意集成」云原生分布式物联网接入平台。EMQX 企业版提供一体化的分布式 MQTT 消息服务和强大的 IoT 规则引擎,为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业快速构建关键业务的 IoT 平台与应用。
本文我们使用 EC2 的方式进行部署。
版本选择链接,本文选择 4.4.19 版本
https://www.emqx.com/zh/downloads/enterprise
下载
wget https://www.emqx.com/zh/downloads/enterprise/v4.4.19/emqx-ee-4.4.19-otp24.3.4.2-1-amzn2-amd64.rpm
安装
yum install emqx-ee-4.4.19-otp24.3.4.2-1-amzn2-amd64.rpm -y
运行
sudo systemctl start emqx
在安装完成后,使用 IP+18083 端口,即可通过浏览器访问 EMQX 的控制台
http://<EC2 IP>:18083
1.3.1 初次登录用户名:admin,密码 public
1.3.2 初次登录后,需要修改密码
1.3.3 EMQX 控制台展示
MQTTX 是由 EMQ 开发的一款开源跨平台 MQTT 5.0 桌面客户端,它兼容 macOS,Linux 以及 Windows 系统。
MQTTX 的用户界面 UI 采用聊天式设计,使得操作逻辑更加简明直观。它支持用户快速创建和保存多个 MQTT 连接,便于测试 MQTT/MQTTS 连接,以及 MQTT 消息的订阅和发布。
https://mqttx.app/zh/downloads
创建 EMQX 连接,输入 EMQX IP,用户名密码。然后点击 connect 即可。
选择新建订阅,输入消息,并发送到主题 topic。在对话框内,可以立即接收到主题 topic 的信息。说明 EMQX 运行正常。
HStreamDB 是一款专为流式数据设计的, 针对大规模实时数据流的接入、存储、处理、分发等环节进行全生命周期管理的流数据库。 它使用标准 SQL(及其流式拓展)作为主要接口语言,以实时性作为主要特征,旨在简化数据流的运维管理以及实时应用的开发。
HStreamDB 可以通过 Docker,Amazon EKS 部署,也可以使用 HStreamDB Cloud。
3.1.1 创建一个 quick-start.yaml
version: '3.5'
services:
hserver:
image: hstreamdb/hstream:latest
depends_on:
- zookeeper
- hstore
ports:
- '127.0.0.1:6570:6570'
expose:
- 6570
networks:
- hstream-quickstart
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /tmp:/tmp
- data_store:/data/store
command:
- bash
- '-c'
- |
set -e
/usr/local/script/wait-for-storage.sh hstore 6440 zookeeper 2181 600
/usr/local/bin/hstream-server
--bind-address 0.0.0.0 --port 6570
--internal-port 6571
--server-id 100
--seed-nodes '$$(hostname -I | awk '{print $$1}'):6571'
--advertised-address $$(hostname -I | awk '{print $$1}')
--metastore-uri zk://zookeeper:2181
--store-config /data/store/logdevice.conf
--store-admin-host hstore --store-admin-port 6440
--store-log-level warning
--io-tasks-path /tmp/io/tasks
--io-tasks-network hstream-quickstart
hstore:
image: hstreamdb/hstream:latest
networks:
- hstream-quickstart
volumes:
- data_store:/data/store
command:
- bash
- '-c'
- |
set -ex
# N.B. 'enable-dscp-reflection=false' is required for linux kernel which
# doesn't support dscp reflection, e.g. centos7.
/usr/local/bin/ld-dev-cluster --root /data/store
--use-tcp --tcp-host $$(hostname -I | awk '{print $$1}')
--user-admin-port 6440
--param enable-dscp-reflection=false
--no-interactive
zookeeper:
image: zookeeper
expose:
- 2181
networks:
- hstream-quickstart
volumes:
- data_zk_data:/data
- data_zk_datalog:/datalog
networks:
hstream-quickstart:
name: hstream-quickstart
volumes:
data_store:
name: quickstart_data_store
data_zk_data:
name: quickstart_data_zk_data
data_zk_datalog:
name: quickstart_data_zk_datalog
3.1.2 在同一个文件夹中运行,即可后台启动 HstreamDB
docker-compose -f quick-start.yaml up -d
3.1.3 启动 HStreamDB 的 SQL 命令行界面
docker exec -it some-hstream-cli hstream --port 6570 sql
3.1.4 如果所有的步骤都正确运行,您将会进入到命令行界面,并且能看见一下帮助信息
终端 1,创建一个 stream,然后执行一个持久查询
CREATE STREAM test;
SELECT * FROM test WHERE humidity > 70 EMIT CHANGES;
终端 2,插入数据
docker exec -it some-hstream-cli hstream --port 6570 sql
INSERT INTO test (temperature, humidity) VALUES (22, 80);
INSERT INTO test (temperature, humidity) VALUES (15, 20);
INSERT INTO test (temperature, humidity) VALUES (31, 76);
终端 1,输出结果如下:
> SELECT * FROM test WHERE humidity > 70 EMIT CHANGES;
{'humidity':{'$numberLong':'80'},'temperature':{'$numberLong':'22'}}
{'humidity':{'$numberLong':'76'},'temperature':{'$numberLong':'31'}}
更多操作命令,请参考:
https://docs.hstream.io/zh/platform/write-in-platform.html
HStream Platform 是以 SaaS 的方式在云上摄取、存储、处理和分发你的海量数据流。基于开源的 HStreamDB 构建。有更清晰易用的控制台访问和使用。并且当前处于免费使用阶段。
Neuron 是一款开源的、轻量级工业协议网关软件,支持数十种工业协议的一站式设备连接、数据接入、MQTT 协议转换,为工业设备赋予工业 4.0 时代关键的物联网连接能力。
Neuron 提供多种安装方式,本实例采用容器化部署的方式,以便于最快开始体验 Neuron。
获取 Docker 镜像
docker pull emqx/neuron:latest
启动 Docker 容器
docker run -d --name neuron -p 7000:7000 --privileged=true --restart=always emqx/neuron:latest
同步浏览器访问 Neuron http://neuron-ip:7000
初始用户名 admin,密码 0000
控制台页面
安装 PeakHMI Slave Simulators 软件,安装包可在 PeakHMI 官网(opens new window)中下载。
安装后,运行 Modbus TCP slave EX。设置模拟器点位数值及站点号,如下图所示:
提示:须保证 Neuron 与模拟器运行在同一局域网内。
Windows 中尽量关闭防火墙,否则可能会导致 Neuron 连接不上模拟器。
至此,我们已经完成 EMQ 相关组件的部署。
本节演示工业物联网数据接入,以及数据的查询展示。
物联网通常会采集很多信息,这里以一些常见指标来演示。会通过 Modbus 模拟器,以下面格式生成数据。
采集指标 | 指标缩略名 |
id | ID |
电量 | Elec |
转数 | Speed |
温度 | Temp |
经纬度 | GPS |
风速 | Air-Wind |
温度 | Air-temp |
添加南向设备和采集指标
配置->南向设备->添加设备
选择 Modbus TCP
配置设备信息,填入 PeakHMI Slave Simulator IP
创建设备组
添加点位列表,即需要采集数据指标
南向设备信息
配置->北向应用->添加应用
输入名称,插件选择 MQTT
应用配置
输入主题名称,EMQX 服务器地址,用户名及密码。
添加订阅
Neuron 配置完成。
Neuron 针对北向应用和南向设备,有大量的插件支持,只需点选,既可以完成适配。
在 HStream Plarform 获取 Service URL,下载 Certificates 备用
创建 stream
资源类型:HStreamDB
HStreamDB 服务器:hstreams://us-east1.endpoint.hstream.io
开启SSL:true
HStreamDB 证书分别选择:root_ca.crt、client.key、client.crt
输入规则的 SQL,为实验目的,这里选择全部数据。可根据实际业务场景,输入相应 SQL
响应动作->添加动作
响应动作,是根据规则的 SQL 查询结果,输出到消费的应用。这里选择 HStreamDB
注意:消息内容模版,针对 HStreamDB,需要输入“${payload}”
至此,已经完成全部配置。可以通过 EMQX,HStreamDB 进行数据流状态,及数据结果的查询。
5.3.1 Streams 的详细信息
5.3.2 Hstream 查询数据记录
具体连接方式,可以点击相应的编程语言类型获取。
我们也可以通过 MQTTX 直接订阅 EMQX 的信息,也可以看到相应的内容。
作为物联网的细分领域,车联网也是备受关注。本小节,会针对车联网场景,进行演示。
车联网数据传输通常使用的是 MQTT 协议。我们使用 MQTTX 来模拟车载数据的发布,进行测试。
https://mqttx.app/zh/downloads
命令行下载
curl -LO https://www.emqx.com/en/downloads/MQTTX/v1.9.3/mqttx-cli-linux-x64
sudo install ./mqttx-cli-linux-x64 /usr/local/bin/mqttx
MQTTX 内置测试场景列表:
场景名称 | 描述 |
tesla | EMQ tesla module,用于模拟车辆数据 |
IEM | 模拟工业能源监控数据 |
smart_home | 模拟生成智能家居数据 |
weather | 模拟生成天气站数据 |
在 Linux 终端执行以下命令,模拟 100 辆车的数据上报
mqttx simulate -sc tesla -c 100 -t 'Topic'
终端输出
[7/11/2023] [5:25:24 PM] › Start simulation publishing, scenario: tesla, connections: 100, req interval: 10ms, message interval: 1000ms success [100/100] - Connected
[7/11/2023] [5:25:25 PM] › Created 100 connections in 1.075s
[7/11/2023] [5:25:40 PM] › Published total: 1500, message rate: 100/s
HStreamDB 中,查询记录,可以看到模拟的车载数据会持续写入。
单条数据值内容
{
'car_id': 'Z5HCY53YSYLP71542',
'display_name': 'Celestine's Tesla',
'model': 'X',
'trim_badging': 'officia',
'exterior_color': 'orange',
'wheel_type': 'veritatis',
'spoiler_type': 'adipisci',
'geofence': 'Vaughntown',
'state': 'asleep',
'since': '2023-07-11T11:53:13.912Z',
'healthy': true,
'version': '1.6.5',
'update_available': true,
'update_version': '9.1.4',
'latitude': '4.7053',
'longitude': '-146.0241',
'shift_state': 'D',
'power': 5124,
'speed': 69,
'heading': 11,
'elevation': 3710,
'locked': false,
'sentry_mode': true,
'windows_open': true,
'doors_open': true,
'trunk_open': true,
'frunk_open': true,
'is_user_present': false,
'is_climate_on': true,
'inside_temp': -1.8,
'outside_temp': 16,
'is_preconditioning': true,
'odometer': 453885,
'est_battery_range_km': 433.2,
'rated_battery_range_km': 979.1,
'ideal_battery_range_km': 904.4,
'battery_level': 52,
'usable_battery_level': 34,
'plugged_in': true,
'charge_energy_added': 27.41,
'charge_limit_soc': 24,
'charge_port_door_open': true,
'charger_actual_current': 71.05,
'charger_power': 17,
'charger_voltage': 223,
'charge_current_request': 24,
'charge_current_request_max': 34,
'scheduled_charging_start_time': '2026-01-09T05:51:00.807Z',
'time_to_full_charge': 2.25,
'tpms_pressure_fl': 3,
'tpms_pressure_fr': 2.2,
'tpms_pressure_rl': 3.3,
'tpms_pressure_rr': 2.9,
'timestamp': 1689095696286
}
至此,我们已经完整的展示数据是如何从终端传感器,上传到云端。通过 Neuron,EMQX,HStream 我们可以快捷的实现物联网的接入需求。
在数据接入之后,我们可以进一步把数据存放到 S3,依托亚马逊云科技的智能湖仓架构,通过 Athena,Redshift,Quicksight 等组件,挖掘更多的数据价值。