提交 699b8516 authored 作者: wangqiang's avatar wangqiang

集成mqtt客户端

上级 f5d7b383
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
*/target/
*/.settings/
.settings
target
### STS ###
.apt_generated
# built application files
*.apk
*.ap_
# Eclipse project files
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
.DS_Store
### IntelliJ IDEA ###
# idea files
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
out
bin
# Built application files and Maven
target/
pom.xml.tag
pom.xml.releaseBackup
pom.xml.versionsBackup
pom.xml.next
release.properties
dependency-reduced-pom.xml
buildNumber.properties
.mvn/timing.properties
.pdb
# Compiled class files
*.class
# Log Files
*.log
# About IntelliJ
*.iml
/.idea/
/out/
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# macOS
.DS_Store
# Package Files
#*.jar
#*.war
#*.ear
#*.zip
#*.tar.gz
#*.rar
# CMake
cmake-build-debug/
# File-based project format
*.iws
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
#PowderDesigner
*.pdb
*.pjb
*.prj
#IDEA http-client 环境文件
*.env.json
/qz-services/qz-admin/.gitignore
/qz-services/qz-admin/mvnw
/qz-services/qz-admin/mvnw.cmd
/qz-services/qz-admin/.mvn/
/qz-services/qz-pond/.gitignore
/qz-services/qz-pond/mvnw
/qz-services/qz-pond/mvnw.cmd
/qz-services/qz-pond/qz-pond.iml
/qz-services/qz-pond/.mvn/
/qz-services/qz-report/.gitignore
/qz-services/qz-report/mvnw
/qz-services/qz-report/.mvn/
/qz-services/qz-report/mvnw.cmd
/qz-services/qz-report/qz-report.iml
# apq-pc-client
客户端API-SERVER + IOT-CLIENT
# apq-pc-control
客户端硬件监控
# apq-pc-info
客户端PC信息采集器(cpu, memory。。。)
# apq-pc-register
客户端PC软件检测控制器(qq。。。)
差异被折叠。
package com.priusis.client.service;
import com.google.common.collect.Lists;
import com.priusis.client.service.conf.PcConnectionConfiguration;
import com.priusis.client.service.conf.PcPersistenceConfiguration;
import com.priusis.monitor.mqtt.MqttClient;
import com.priusis.monitor.mqtt.MqttConnectResult;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
@Slf4j
public class MqttMessageSender implements Runnable {
private MqttClient tbClient;
private PersistentFileService persistentFileService;
private PcPersistenceConfiguration persistence;
private final PcConnectionConfiguration connection;
private BlockingQueue<MessageFuturePair> incomingQueue;
private Queue<Future<Void>> outgoingQueue;
public MqttMessageSender(PcPersistenceConfiguration persistence,
PcConnectionConfiguration connection,
MqttClient tbClient,
PersistentFileService persistentFileService,
BlockingQueue<MessageFuturePair> incomingQueue) {
this.persistence = persistence;
this.connection = connection;
this.tbClient = tbClient;
this.persistentFileService = persistentFileService;
this.incomingQueue = incomingQueue;
outgoingQueue = new ConcurrentLinkedQueue();
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
checkClientConnected();
List<MqttPersistentMessage> storedMessages = getMessages();
if (!storedMessages.isEmpty()) {
Iterator<MqttPersistentMessage> iter = storedMessages.iterator();
while (iter.hasNext()) {
if (!checkClientConnected()) {
persistentFileService.saveForResend(Lists.newArrayList(iter));
break;
}
MqttPersistentMessage message = iter.next();
log.debug("Sending message [{}]", message);
publishMqttMessage(message);
}
} else {
Thread.sleep(persistence.getPollingInterval());
}
} catch (InterruptedException e) {
log.trace(e.getMessage());
Thread.currentThread().interrupt();
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
}
private Future<Void> publishMqttMessage(MqttPersistentMessage message) {
return tbClient.publish(message.getTopic(), Unpooled.wrappedBuffer(message.getPayload()), MqttQoS.AT_MOST_ONCE).addListener(
future -> incomingQueue.put(new MessageFuturePair(future, message))
);
}
private boolean checkClientConnected() {
if (!tbClient.isConnected()) {
try {
clearOutgoingQueue();
log.info("Priusisiot MQTT connection failed. Reconnecting in [{}] milliseconds", connection.getRetryInterval());
Thread.sleep(connection.getRetryInterval());
log.info("Attempting to reconnect to Priusisiot.");
MqttConnectResult result = tbClient.reconnect().get(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
if (result.isSuccess()) {
log.info("Successfully reconnected to Priusisiot.");
}
} catch (TimeoutException e) {
log.trace(e.getMessage(), e);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return false;
}
return true;
}
private void clearOutgoingQueue() {
outgoingQueue.forEach(future -> {
try {
future.cancel(true);
} catch (CancellationException e) {
log.warn("Failed to cancel outgoing message on disconnected client. Reason: " + e.getMessage(), e);
}
});
outgoingQueue.clear();
}
private List<MqttPersistentMessage> getMessages() {
try {
List<MqttPersistentMessage> resendMessages = persistentFileService.getResendMessages();
if (!resendMessages.isEmpty()) {
return resendMessages;
} else {
return persistentFileService.getPersistentMessages();
}
} catch (IOException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
}
package com.priusis.client.service.core;
import com.priusis.client.data.kv.KvEntry;
import com.priusis.client.service.MqttDeliveryFuture;
import com.priusis.client.service.MqttRpcDataMessage;
import com.priusis.client.service.conf.PcExtensionConfiguration;
import com.priusis.client.service.data.AttributeRequest;
import com.priusis.client.service.data.AttributeResponse;
import com.priusis.client.service.data.AttributesUpdateSubscription;
import com.priusis.client.service.data.RpcCommandResponse;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
/**
* Created by priusis on 16.01.17.
*/
public interface MqttService {
void init() throws Exception;
void destroy() throws Exception;
String getTenantLabel();
/**
* Report device attributes change to Priusisiot
*
* @param attributes - the attribute values list
*/
MqttDeliveryFuture onDeviceAttributesUpdate(List<KvEntry> attributes);
/**
* Report attributes request to Priusisiot
*
* @param attributeRequest - attributes request
* @param listener - attributes response
*/
void onDeviceAttributeRequest(AttributeRequest attributeRequest, Consumer<AttributeResponse> listener);
/**
* Report response from device to the server-side RPC call from Priusisiot
*
* @param response - the device response to RPC call
*/
void onDeviceRpcResponse(RpcCommandResponse response);
/**
* Subscribe to attribute updates from Priusisiot
*
* @param subscription - the subscription
* @return true if successful, false if already subscribed
*/
boolean subscribe(AttributesUpdateSubscription subscription);
/**
* Unsubscribe to attribute updates from Priusisiot
*
* @param subscription - the subscription
* @return true if successful, false if already unsubscribed
*/
boolean unsubscribe(AttributesUpdateSubscription subscription);
/**
* Report generic error from one of gateway components
*
* @param e - the error
*/
void onError(Exception e);
/**
* Report error related to device
*
* @param deviceName - the device name
* @param e - the error
*/
void onError(String deviceName, Exception e);
/**
* Report applied configuration
*
* @param configuration - extension configuration
*/
void onAppliedConfiguration(String configuration);
/**
* Report extension configuration error
*
* @param e - the error
* @param configuration - extension configuration
*/
void onConfigurationError(Exception e, PcExtensionConfiguration configuration);
/**
* Report extension configuration status
*
* @param id - extension id
* @param status - extension status
*/
void onConfigurationStatus(String id, String status);
/**
* rpc请求落文件
*
* @param mqttRpcDataMessage
* @return
* @throws IOException
*/
File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException;
/**
* 读取
*
* @param method
* @return
* @throws IOException
*/
MqttRpcDataMessage readFromFile(String method) throws IOException;
}
package com.priusis.client.service.data;
import com.priusis.client.data.kv.KvEntry;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* Created by priusis on 23.01.17.
*/
@Data
@AllArgsConstructor
@RequiredArgsConstructor
public class DeviceData {
private final List<KvEntry> attributes;
private int timeout;
}
package com.priusis.client.util.converter;
import com.fasterxml.jackson.databind.JsonNode;
import com.priusis.client.data.kv.KvEntry;
import com.priusis.client.service.data.DeviceData;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import static com.priusis.client.util.JsonTools.fromString;
import static com.priusis.client.util.JsonTools.getKvEntries;
/**
* Created by priusis on 15.05.17.
*/
@Data
@Slf4j
public class BasicJsonConverter{
protected String filterExpression;
protected String deviceNameJsonExpression;
protected String deviceTypeJsonExpression;
private ConcurrentHashMap<String, SimpleDateFormat> formatters = new ConcurrentHashMap<>();
public DeviceData parseBody(String body) {
try {
JsonNode payload = fromString(body);
return parseDeviceData(payload);
} catch (Exception e) {
log.error("Exception occurred while parsing json request body [{}]", body, e);
throw new RuntimeException(e);
}
}
protected DeviceData parseDeviceData(JsonNode payload) throws ParseException {
//long ts = System.currentTimeMillis();
List<KvEntry> attrData = getKvEntries(payload);
return new DeviceData(attrData);
}
}
差异被折叠。
差异被折叠。
差异被折叠。
apq-pc-info @ 83ec6432
差异被折叠。
apq-pc-register @ 83ec6432
差异被折叠。
差异被折叠。
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论