租户事件处理
在通过事件源将数据拉入系统后,标准化数据由事件处理子系统处理。事件被交给一个入站处理策略 实现,该实现控制着待处理事件的排队和线程等方面。这些事件被传递给入站处理链 - 入站事件处理器的有序列表 ,这些处理器以链接的方式处理每个未保存的事件。如果事件存储处理器配置在入站链中,则事件将保存到基础数据存储区。成功保存后,事件将转交给出站处理策略它处理出站处理的排队和线程。然后,每个事件都由出站处理链处理,该出站处理链是出站事件处理器的有序链。每个出站处理器都处理已保存的事件以启动事件的自定义处理。
入站处理策略
入站处理策略负责将事件从事件源移入入站处理链。它负责处理线程并可靠地交付处理事件。入站处理策略必须实现 IInboundProcessingStrategy接口。
阻塞队列入站处理策略
入站处理链
在事件源将数据解码为SiteWhere设备事件后,入站处理策略将事件排队等待 入站处理链处理。链是一系列入站事件处理器(实现IInboundEventProcessor),每个事件处理器都处理串联的入站事件。新的入站事件处理器可以添加到链中以增强现有功能。例如,度量处理器可以保持每秒处理事件的数量。
由于REST调用(或直接调用设备管理API的其他调用)不会通过事件源进入系统,因此不会由入站处理链进行处理。
事件存储处理器
默认情况下,在入站链中配置事件存储处理器的实例。该处理器通过设备管理服务提供商接口来维护设备事件。如果这个处理器被移除,事件将不会被存储。默认配置如下所示:
<!-- Add processing logic to inbound events -->
<sw:inbound-processing-chain>
<sw:event-storage-processor/>
</sw:inbound-processing-chain>
注册处理器
默认情况下,在入站链中配置注册处理器的实例。该处理器处理设备的动态注册,其中包括为请求注册的设备创建新设备和分配。如果该处理器被移除,注册请求将被忽略。默认配置如下所示:
<!-- Add processing logic to inbound events -->
<sw:inbound-processing-chain>
<!-- Allow devices to dynamically register -->
<sw:registration-processor/>
</sw:inbound-processing-chain>
设备流处理器
默认情况下,在入站链中配置设备流处理器的实例。该处理器处理来自设备的流数据。如果该处理器被移除,则流创建请求以及将数据添加到流的请求将被忽略。默认配置如下所示:
<!-- Add processing logic to inbound events -->
<sw:inbound-processing-chain>
<!-- Allow devices to create streams and send stream data -->
<sw:device-stream-processor/>
</sw:inbound-processing-chain>
Hazelcast队列处理器
可以在入站处理链中配置榛树 - 队列处理器的实例,以将所有解码的事件转发到Hazelcast队列中。这允许多个从属SiteWhere实例使用hazelcast-queue-event-source来提取事件并对其进行处理。事件以循环方式传递给下级实例,因此可以分配处理负载。如果配置了此处理器,则通常会删除用于存储,注册和流处理的其他缺省处理器,因为处理发生在下级实例中。
<!-- Add processing logic to inbound events -->
<sw:inbound-processing-chain>
<!-- Note that other processors have been removed -->
<!-- Send all events to a Hazelcast queue -->
<sw:hazelcast-queue-processor/>
</sw:inbound-processing-chain>
出站处理策略
出站处理策略负责将事件保存到数据存储后进行后处理。它负责处理线程并可靠地将事件传递给出站事件处理器链。入站处理策略必须实现IOutboundProcessingStrategy 接口。
阻塞队列出站处理策略
SiteWhere CE的此默认出站处理策略使用有界队列在事件存储后保存用于后处理的事件。它创建一个线程池,该线程池使用队列将事件传递到入站处理链。如果事件的传递速度比线程池可以处理它们的速度快,则队列终将开始阻塞进入处理的线程。增加出站事件处理的线程数量会从队列中加载负载,但会增加核心系统的处理负载。SiteWhere CE不保留出站队列,因此关闭服务器可能会导致数据丢失。SiteWhere EE通过持久队列和事务语义提供更的出站处理策略实施。
<!-- Outbound Processing Strategy -->
<sw:outbound-processing-strategy>
<sw:blocking-queue-outbound-processing-strategy
maxQueueSize="10000" numEventProcessorThreads="10"/>
</sw:outbound-processing-strategy>
可以为blocking-queue-outbound-processing-strategy元素指定以下属性。
属性 | 需要 | 描述 |
---|
maxQueueSize | 可选的 | 阻塞前队列中的大项目数。默认为10000。 |
numEventProcessorThreads | 可选的 | 用于处理传入事件的线程数。默认为10。 |
出站处理链
在默认配置实现中,每次通过设备管理服务提供者接口保存事件时,调用出站事件处理链。以同样的方式,入站处理链对未保存的入站事件数据起作用,可疑处理链作用于已成功保存到数据存储区的数据。每个出站事件处理器(实现 IOutboundEventProcessor)都是按顺序执行的。可以将新的出站事件处理器添加到链中以增强现有功能。例如,SiteWhere有一个事件处理器,用于将所有出站事件发送给Hazelcast用户,从而允许外部客户对事件采取行动。
REST调用(或直接调用设备管理API的其他调用)由出站处理链以与来自事件源的事件相同的方式进行处理。
可用的事件处理器
Apache Solr事件处理器
Azure事件中心事件处理器
命令传递事件处理器
默认情况下,在出站链中配置命令交付事件处理器的实例。该处理器将设备命令调用切换到通信子系统进行处理。如果该处理器被移除,设备命令调用将被保留,但不会被处理。默认配置如下所示:
<sw:device-communication>
<sw:outbound-processing-chain>
<!-- Routes commands for outbound processing -->
<sw:command-delivery-event-processor/>
<!-- Send outbound device events over Hazelcast -->
<sw:outbound-event-processor ref="hazelcastDeviceEventProcessor"/>
</sw:outbound-processing-chain>
这个例子还显示了添加一个自定义出站事件处理器,该处理器引用了配置中其他地方定义的Spring bean。事件将在配置处理器处理后传递给自定义处理器。
Dweet.io事件处理器
Groovy事件处理器
Hazelcast事件处理器
InitialState.com事件处理器
MQTT事件处理器
事件可以使用mqtt事件处理器元素直接转发到MQTT主题。这些事件被封送为JSON数据并发送到主题。通过配置关联的出站事件处理器过滤器,只能转发某些事件。示例配置如下所示:
<sw:outbound-processing-chain>
<sw:mqtt-event-processor hostname="localhost" port="1883"
protocol="tcp" topic="interested.devices"></sw:mqtt-event-processor>
</sw:outbound-processing-chain>
可以为mqtt事件处理器元素指定以下属性。
属性 | 需要 | 描述 |
---|
主机名 | 需要 | MQTT代理服务器主机名或IP地址。 |
港口 | 需要 | MQTT代理服务器端口。 |
协议 | 可选的 | 用于连接(tcp或tls)的协议。默认为tcp。 |
trustStorePath | 可选的 | 通过tls连接时信任商店的路径。 |
trustStorePassword | 可选的 | 通过tls连接时,信任商店的密码。 |
话题 | 需要 | MQTT主题,其中事件将被转发。 |
Siddhi(CEP)事件处理器
区域测试事件处理器
注:待补充
参考:http://documentation.sitewhere.io/userguide/tenant/event-processing.html