租户设备通信
通信子系统配置SiteWhere与设备的通信方式。在入站侧,设备数据通过事件源被引入系统。入站数据转换成SiteWhere事件和传递到入站处理链由入站处理策略。在出站端(作为出站处理链的一部分),命令通过命令目标发送到外部设备。出站命令路由选择哪个目的命令将被用于驱动命令的有效负载
事件来源
事件源负责将数据带入SiteWhere。所有事件源都实现 IInboundEventSource 接口,它由一个或多个事件接收器(实现IInboundEventReceiver)和一个事件解码器(实现 IDeviceEventDecoder)组成。事件接收器负责处理收集数据的协议。数据然后由事件解码器处理,以创建SiteWhere事件,这些事件提供设备数据的通用表示,以便可以由入站处理链处理。
mqtt代理主题:SiteWhere/input/json
MQTT事件源
由于使用MQTT数据在IoT应用程序中很常见,因此SiteWhere包含可简化流程的组件。在下面的示例中,将事件源配置为侦听给定主题上的消息,然后使用标准SiteWhere Google Protocol Buffers消息格式使用protobuf事件解码器解码消息负载。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for protobuf messages over MQTT -->
<sw:mqtt-event-source sourceId="protobuf" hostname="localhost"
port="1883" topic="SiteWhere/input/protobuf">
<sw:protobuf-event-decoder/>
可以为mqtt-event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
SourceID | 需要 | 事件来源ID。 |
hostname | 需要 | MQTT代理服务器主机名或IP地址。 |
port | 需要 | MQTT代理服务器端口。 |
protocol | 可选的 | 用于连接(tcp或tls)的协议。默认为tcp。 |
trustStorePath | 可选的 | 通过tls连接时信任商店的路径。 |
trustStorePassword | 可选的 | 通过tls连接时,信任商店的密码。 |
topic | 需要 | 设备将发布事件的MQTT主题。 |
ActiveMQ事件源
Apache ActiveMQ是一个开源消息平台,支持许多有线格式,如AMQP,OpenWire,XMPP和MQTT。它还支持用于消息处理的标准Java JMS API。SiteWhere包含一个创建嵌入式ActiveMQ代理的事件源,该代理用于监听已配置的传输。多线程的消费者池监听已配置的主题,并将二进制有效负载交给已配置的解码器。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for protobuf messages over ActiveMQ queue -->
<sw:activemq-event-source sourceId="activemq" transportUri="tcp://localhost:1234"
queueName="SITEWHERE.IN" numConsumers="150">
<sw:protobuf-event-decoder/>
</sw:activemq-event-source>
上面的示例通过TCP / IP侦听JMS连接,其中有150个消费者线程从配置的队列中读取数据,使用SiteWhere Google协议缓冲区格式解码数据,然后发送解码后的事件进行处理。
可以为activemq-event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
SourceID | 需要 | 事件来源ID。 |
transportUri | 需要 | 配置可供客户端连接的ActiveMQ传输。 |
QUEUENAME | 需要 | 排队外部客户端发送事件。 |
numConsumers | 需要 | 用于处理队列中数据的线程使用者数。默认为3。 |
套接字事件源
许多设备通过直接套接字连接来连接以报告事件。例如,许多GPS追踪器具有蜂窝连接并通过GPRS报告位置或其他事件。所述插座事件源可以被用来创建监听一个给定端口上,接收客户端的连接,并使用多线程的方法处理它们的服务器套接字。套接字交互通常是复杂且有状态的,因此处理委托给 处理设备和服务器之间对话的ISocketInteractionHandler实现 。套接字交互处理程序返回一个有效载荷,该有效载荷被传递给配置的解码器以构建SiteWhere事件。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for protobuf messages from socket connections -->
<sw:socket-event-source port="8585" numThreads="10" sourceId="socket">
<sw:read-all-interaction-handler-factory/>
<sw:protobuf-event-decoder/>
</sw:socket-event-source>
配置read-all-interaction-handler-factory将读取来自客户端套接字的所有输入,并将二进制信息传递给配置的解码器。在某些情况下(例如以标准SiteWhere Google Protocol Buffers格式发送有效载荷),这就足够了。但是,在大多数情况下,用户需要创建一个理解设备和服务器之间会话逻辑的交互处理程序。通过使用引用包含套接字交互处理程序工厂的Spring bean 的interaction-handler-factory元素可以引用自定义实现。工厂实现了 ISocketInteractionHandlerFactory 接口并创建管理设备对话的套接字交互处理程序的实例。
可以为socket-event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
的SourceID | 需要 | 事件来源ID。 |
港口 | 可选的 | 监听服务器端口。默认为8484。 |
numThreads | 需要 | 排队外部客户端发送事件。 |
numConsumers | 需要 | 用于处理客户端请求的线程数。默认为5。 |
轮询REST事件源
许多系统不提供实时流数据的选项,但提供REST服务来访问其数据。在这种情况下,polling-rest-event-source 可以配置为以给定的时间间隔对外部REST服务进行调用,并将结果数据解析为有效载荷进行解码。事件源配置用于访问REST服务的基本URL以及用于控制对服务的请求的脚本。Groovy脚本每隔一段时间调用一次,进行REST调用,然后返回用于解码的有效负载列表。然后每个有效载荷与任何其他事件源一样传递给解码器。配置如下所示:
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for polling an external REST service -->
<sw:polling-rest-event-source baseUrl="http://rest.example.com/service"
username="admin" password="password" pollIntervalMs="10000"
scriptPath="parseREST.groovy" sourceId="rest" />
在上面的配置中,SiteWhere将每10秒钟调用一次parseREST.groovy脚本(它应位于conf / globals / scripts / groovy文件夹中)。其中一个变量,休息传递给Groovy脚本是一个辅助类的句柄,该类 包含一个已用基本URL初始化的Spring RestTemplate。通过访问剩余 对象的getJsonNode方法并将子路径添加到基本URL中,REST调用被创建并 返回一个 JsonNode。在使用JsonNode解析JSON响应之后,应将有效负载添加到有效负载中 变量,其中包含将传递给解码器以生成SiteWhere事件的有效负载列表。
import com.fasterxml.jackson.databind.*
import com.fasterxml.jackson.databind.node.*
def nodes = rest.getJsonNode "nodes"
for (JsonNode node : nodes) {
def deveui = node.get("deveui")
def entries = rest.getJsonNode "nodes/${deveui}/payloads/ul"
for (JsonNode entry : entries) {
((ObjectNode) entry).put('id', deveui)
def payload = entry.toString().getBytes()
可以为polling-rest-event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
的SourceID | 需要 | 事件来源ID。 |
pollIntervalMs | 需要 | 在轮询请求之间等待的时间间隔(以毫秒为单位)。 |
SCRIPTPATH | 需要 | Groovy脚本的路径,该脚本执行生成REST请求和解析响应所需的逻辑。 |
的baseUrl | 需要 | 用于REST请求的基本URL。在Groovy脚本中,任何REST调用都是相对于此基值而言的。 |
用户名 | 需要 | 用户名,如果REST服务需要验证。 |
密码 | 需要 | 如果REST服务需要验证,则为密码。 |
WebSocket事件源
物联网应用程序的常见连接选项是与远程WebSocket的交互 。的网络套接字事件源可用于数据连接到WebSocket和流进入系统。数据有效载荷可以是二进制或文本,事件解码器应根据预期的数据类型进行配置。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for WebSocket connectivity -->
<sw:web-socket-event-source sourceId="websocket"
webSocketUrl="ws://localhost:6543/sitewhere/stringsender" payloadType="string">
<sw:groovy-string-event-decoder scriptPath="customDecoder.groovy"/>
</sw:web-socket-event-source>
请注意,有效负载类型是'字符串',并且groovy-string-event-decoder解码器需要一个字符串输入。如果二进制解码器配置为字符串负载类型,反之亦然,系统将在启动时生成错误。
可以为web-socket-event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
的SourceID | 需要 | 事件来源ID。 |
webSocketUrl | 需要 | 要连接的WebSocket的URL。 |
载荷类型 | 需要 | 根据从服务器套接字发送的消息类型,可以是“字符串”或“二进制”。 |
Hazelcast队列事件源
该事件源用于从Hazelcast队列中提取已解码的设备事件。通常的使用场景是,一个SiteWhere实例使用入站处理链上的 hazelcast-queue-processor将所有解码的事件发送到队列,而下级实例使用hazelcast-queue-event-source 元素来处理事件。多个从属实例可以附加到相同的队列,从而可以并行处理事件。请注意,为了处理队列,所有从属实例必须位于同一个Hazelcast组中。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Event source for pulling events from Hazelcast queue -->
<sw:hazelcast-queue-event-source sourceId="hzQueue"/>
可以为hazelcast-queue-event-source元素指定以下属性。
RabbitMQ事件源
此事件源使用AMQP协议连接到RabbitMQ实例,并注册自己以处理排队的消息。可配置数量的使用者设置为从队列中读取二进制有效负载,并将它们传递给为事件源配置的二进制解码器。
注:如果连接到现有队列,则持久属性必须与队列的设置一致。
<sw:device-communication>
<!-- Inbound event sources -->
<sw:rabbit-mq-event-source sourceId="rabbit" connectionUri="amqp://localhost"
durable="false" numConsumers="10" queueName="sitewhere.input">
<sw:protobuf-event-decoder/>
</sw:rabbit-mq-event-source>
属性 | 需要 | 描述 |
---|
的SourceID | 需要 | 事件来源ID。 |
connectionUri | 需要 | 确定RabbitMQ连接设置的URI |
耐用 | 可选的 | 指示队列在重新启动时是否持续。默认为false。 |
numConsumers | 可选的 | 从队列中拉出的消费者线程的数量。默认为5。 |
QUEUENAME | 可选的 | 正在消耗的队列的名称。默认为sitewhere.input。 |
自定义事件源
如果需要自定义协议来支持设备的入站事件,SiteWhere可以轻松插入自定义事件源。自定义事件源类必须实现IInboundEventSource 接口。SiteWhere提供了提供大部分常用事件源功能的基类。例如sitewhere-core中的com.sitewhere.device.communication.BinaryInboundEventSource提供了一个处理二进制数据的事件源。通过创建一个BinaryInboundEventSource的实例并插入一个自定义的 IInboundEventReceiver 和IDeviceEventDecoder 实施后,行为可以完全自定义。事件接收器负责从设备接收二进制数据,解码器将数据转换为可以处理的SiteWhere事件。
<sw:device-communication>
<!-- Inbound event sources -->
<!-- Custom event source referencing a Spring bean -->
<sw:event-source ref="customEventSourceBean"/>
可以为event-source元素指定以下属性。
属性 | 需要 | 描述 |
---|
REF | 需要 | 引用外部定义的Spring bean。 |
批量操作管理器
批处理操作管理器负责应用于许多设备的异步处理操作。批处理操作可以通过管理控制台或通过REST服务提交。批处理操作管理器循环遍历批处理操作元素列表,执行每个操作元素并保持关于执行进度的状态。默认批处理操作管理器可以通过使用default-batch-operation-manager元素进行配置, 如下所示。
<sw:device-communication>
<!-- Batch operation management -->
<sw:default-batch-operation-manager throttleDelayMs="10000"/>
节流延迟值可用于减慢处理元素的速率,以便系统不会因大操作而过载。
可以通过创建实现IBatchOperationManager的类 并使用batch-operation-manager元素添加对其的引用来添加自定义批量操作管理 器。
可以为default-batch-operation-manager元素指定以下属性。
属性 | 需要 | 描述 |
---|
throttleDelayMs | 可选的 | 处理批处理操作元素之间等待的毫秒数。默认为。 |
MQTT命令目的地
对于在MQTT主题上侦听命令的设备,可以使用mqtt-command-destination元素轻松配置目标。编码器和参数提取器应根据预期的命令格式和MQTT路由信息的位置进行配置。所述 硬件-ID-主题提取器元件配置基于包括所述设备的硬件ID加以解决的表达式用于递送MQTT主题。在不适合的情况下,可以注入自定义参数提取器。
<sw:device-communication>
<!-- Outbound command destinations -->
<sw:command-destinations>
<!-- Delivers commands via MQTT -->
<sw:mqtt-command-destination destinationId="default" hostname="localhost" port="1883">
<sw:protobuf-command-encoder/>
<sw:hardware-id-topic-extractor commandTopicExpr="SiteWhere/commands/%s"
systemTopicExpr="SiteWhere/system/%s"/>
</sw:mqtt-command-destination>
可以为mqtt-command-destination元素指定以下属性。
属性 | 需要 | 描述 |
---|
destinationId | 需要 | 目的地的ID。 |
hostname | 需要 | MQTT代理主机名。 |
port | 需要 | MQTT代理端口。 |
Twilio命令目的地
对于通过SMS消息接收命令的设备,可以使用twilio-command-destination通过Twilio在线服务传送命令。要使用该服务,您需要创建一个Twilio帐户并支付出站SMS服务(包括将发送邮件的电话号码)。
<sw:device-communication>
<!-- Outbound command destinations -->
<sw:command-destinations>
<!-- Delivers commands via Twilio SMS messages -->
<sw:twilio-command-destination destinationId="laipac"
accountSid="${twilio.account.sid}" authToken="${twilio.auth.token}"
fromPhoneNumber="${twilio.from.phone.number}">
<sw:protobuf-command-encoder/>
<sw:parameter-extractor ref="laipacExtractor"/>
</sw:twilio-command-destination>
帐户SID,身份验证令牌和发送电话号码都是与Twilio帐户相关的所有数据。参数提取器实现应该是提供SmsParameters类型参数的参数,供应商通过它来确定传递命令的SMS电话号码。
可以为twilio-command-destination元素指定以下属性。
属性 | 需要 | 描述 |
---|
目标-ID | 需要 | 目的地的ID。 |
accountSid | 需要 | Twilio帐户SID(来自Twilio网站)。 |
的authToken | 需要 | Twilio帐户身份验证令牌(来自Twilio网站)。 |
fromPhoneNumber | 需要 | 用于发起短信的Twilio电话号码。 |
调试设备通信
在开发使用设备通信子系统的解决方案时,通常有助于准确了解SiteWhere处理入站和出站数据的过程。要打开通信调试,请向下滚动到lib / log4j.xml文件中的以下块:
<sw:device-communication>
<category name="com.sitewhere.device.communication">
<priority value="INFO" />
将INFO值更新为DEBUG并重新启动服务器以查看更详细的通信信息。
参考:http://documentation.sitewhere.io/userguide/tenant/device-communication.html