提交 65e6961d authored 作者: wangqiang's avatar wangqiang

reset

上级 0b37f68d
...@@ -3,8 +3,11 @@ package com.priusis.client.service; ...@@ -3,8 +3,11 @@ package com.priusis.client.service;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.priusis.client.service.conf.PcConnectionConfiguration; import com.priusis.client.service.conf.PcConnectionConfiguration;
import com.priusis.client.service.conf.PcPersistenceConfiguration; import com.priusis.client.service.conf.PcPersistenceConfiguration;
import com.priusis.client.service.core.MqttService;
import com.priusis.client.service.core.MqttServiceImpl;
import com.priusis.monitor.mqtt.MqttClient; import com.priusis.monitor.mqtt.MqttClient;
import com.priusis.monitor.mqtt.MqttConnectResult; import com.priusis.monitor.mqtt.MqttConnectResult;
import com.priusis.monitor.mqtt.MqttHandler;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
...@@ -23,20 +26,28 @@ public class MqttMessageSender implements Runnable { ...@@ -23,20 +26,28 @@ public class MqttMessageSender implements Runnable {
private PersistentFileService persistentFileService; private PersistentFileService persistentFileService;
private PcPersistenceConfiguration persistence; private PcPersistenceConfiguration persistence;
private MqttHandler mqttService;
private final PcConnectionConfiguration connection; private final PcConnectionConfiguration connection;
private BlockingQueue<MessageFuturePair> incomingQueue; private BlockingQueue<MessageFuturePair> incomingQueue;
private Queue<Future<Void>> outgoingQueue; private Queue<Future<Void>> outgoingQueue;
private static final String DEVICE_RPC_RES_TOPIC = "devices/rpc/req";
private static final String DEVICE_RPC_TOPIC = DEVICE_RPC_RES_TOPIC + "/+";
private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC = "devices/attrs/res/+";
private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC = "devices/attrs/res/1";
public MqttMessageSender(PcPersistenceConfiguration persistence, public MqttMessageSender(PcPersistenceConfiguration persistence,
PcConnectionConfiguration connection, PcConnectionConfiguration connection,
MqttClient tbClient, MqttClient tbClient,
MqttHandler mqttService,
PersistentFileService persistentFileService, PersistentFileService persistentFileService,
BlockingQueue<MessageFuturePair> incomingQueue) { BlockingQueue<MessageFuturePair> incomingQueue) {
this.persistence = persistence; this.persistence = persistence;
this.connection = connection; this.connection = connection;
this.tbClient = tbClient; this.tbClient = tbClient;
this.mqttService = mqttService;
this.persistentFileService = persistentFileService; this.persistentFileService = persistentFileService;
this.incomingQueue = incomingQueue; this.incomingQueue = incomingQueue;
outgoingQueue = new ConcurrentLinkedQueue(); outgoingQueue = new ConcurrentLinkedQueue();
...@@ -87,6 +98,10 @@ public class MqttMessageSender implements Runnable { ...@@ -87,6 +98,10 @@ public class MqttMessageSender implements Runnable {
log.info("Attempting to reconnect to Priusisiot."); log.info("Attempting to reconnect to Priusisiot.");
MqttConnectResult result = tbClient.reconnect().get(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS); MqttConnectResult result = tbClient.reconnect().get(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
if (result.isSuccess()) { if (result.isSuccess()) {
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC, mqttService).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC, mqttService).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_RPC_TOPIC, mqttService).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
log.info("Successfully reconnected to Priusisiot."); log.info("Successfully reconnected to Priusisiot.");
} }
} catch (TimeoutException e) { } catch (TimeoutException e) {
......
...@@ -356,8 +356,8 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall ...@@ -356,8 +356,8 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
private void onRpcCommand(String message) { private void onRpcCommand(String message) {
JsonNode payload = fromString(message); JsonNode payload = fromString(message);
MqttRpcDataMessage mqttRpcDataMessage = MqttRpcDataMessage.builder() MqttRpcDataMessage mqttRpcDataMessage = MqttRpcDataMessage.builder()
.method(payload.get("method").asText()) .method(payload.get("method").asText()).sendTime(System.currentTimeMillis())
.params(Optional.ofNullable(payload.get("params")).map(JsonNode::asText).orElse(null)).build(); .params(Optional.ofNullable(payload.get("params")).map(JsonNode::toString).orElse(null)).build();
// 存储rpc下发的数据 // 存储rpc下发的数据
...@@ -503,7 +503,7 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall ...@@ -503,7 +503,7 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
private void initMqttSender(BlockingQueue<MessageFuturePair> incomingQueue) { private void initMqttSender(BlockingQueue<MessageFuturePair> incomingQueue) {
mqttSenderExecutor = Executors.newSingleThreadExecutor(); mqttSenderExecutor = Executors.newSingleThreadExecutor();
mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, persistentFileService, incomingQueue)); mqttSenderExecutor.submit(new MqttMessageSender(persistence, connection, tbClient, this, persistentFileService, incomingQueue));
} }
private void initMqttReceiver(BlockingQueue<MessageFuturePair> incomingQueue) { private void initMqttReceiver(BlockingQueue<MessageFuturePair> incomingQueue) {
...@@ -558,7 +558,6 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall ...@@ -558,7 +558,6 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
//tbClient.on(DEVICE_ATTRIBUTES_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS); //tbClient.on(DEVICE_ATTRIBUTES_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_EVENT_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS); tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS); tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_RPC_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS); tbClient.on(DEVICE_RPC_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
......
#apq:
# iot-gateway: 192.168.124.19:7002
#PC_HOST: 192.168.124.29
apq: apq:
iot-gateway: 192.168.6.23:7033 iot-gateway: 192.168.124.19:7002
PC_HOST: 192.168.6.23 PC_HOST: 192.168.124.29
\ No newline at end of file #apq:
# iot-gateway: 192.168.6.23:7033
#PC_HOST: 192.168.6.23
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论