提交 6eb6fb00 authored 作者: yangli's avatar yangli

Merge remote-tracking branch 'origin/master'

package com.priusis.client.extensions.http; package com.priusis.client.extensions.http;
import com.fasterxml.jackson.databind.JsonNode;
import com.priusis.client.data.kv.KvEntry; import com.priusis.client.data.kv.KvEntry;
import com.priusis.client.extensions.ExtensionUpdate; import com.priusis.client.extensions.ExtensionUpdate;
import com.priusis.client.extensions.http.conf.HttpConfiguration; import com.priusis.client.extensions.http.conf.HttpConfiguration;
...@@ -69,6 +70,19 @@ public class DefaultHttpService extends ExtensionUpdate implements HttpService { ...@@ -69,6 +70,19 @@ public class DefaultHttpService extends ExtensionUpdate implements HttpService {
} }
} }
@Override
public void processEventRequest(String token, String body) throws Exception {
log.trace("[{}] Processing request body [{}] for token [{}]", mqttService.getTenantLabel(), body, token);
if (configuration != null) {
if (StringUtils.isEmpty(configuration.getToken()) || configuration.getToken().equals(token)) {
processEventBody(body);
} else {
log.error("[{}] Request token [{}] doesn't match configuration token!", mqttService.getTenantLabel(), token);
throw new SecurityException("Request token [" + token + "] doesn't match configuration token!");
}
}
}
@Override @Override
public File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException { public File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException {
return mqttService.flushRpcDataToFile(mqttRpcDataMessage); return mqttService.flushRpcDataToFile(mqttRpcDataMessage);
...@@ -79,6 +93,32 @@ public class DefaultHttpService extends ExtensionUpdate implements HttpService { ...@@ -79,6 +93,32 @@ public class DefaultHttpService extends ExtensionUpdate implements HttpService {
return mqttService.readFromFile(method); return mqttService.readFromFile(method);
} }
private void processEventBody(String body) throws Exception {
/*JsonNode jsonObject = fromString(body);
String methodName = jsonObject.get("methodName").asText();
int requestId = jsonObject.get("requestId").asInt();
String paramsStr = null;
JsonNode params = jsonObject.get("params");
if (null != params) {
paramsStr = params.toString();
}*/
List<KvEntry> attrData = getKvEntries(fromString(body));
DeviceData dd = new DeviceData(attrData);
if (dd != null) {
List<MqttDeliveryFuture> futures = new ArrayList<>();
if (!dd.getAttributes().isEmpty()) {
futures.add(mqttService.onDeviceEventUpdate(dd.getAttributes()));
}
for (Future future : futures) {
waitWithTimeout(future);
}
} else {
log.error("[{}] DeviceData is null. Body [{}] was not parsed successfully!", mqttService.getTenantLabel(), body);
throw new IllegalArgumentException("Device Data is null. Body [" + body + "] was not parsed successfully!");
}
}
private void processBody(String body) throws Exception { private void processBody(String body) throws Exception {
List<KvEntry> attrData = getKvEntries(fromString(body)); List<KvEntry> attrData = getKvEntries(fromString(body));
DeviceData dd = new DeviceData(attrData); DeviceData dd = new DeviceData(attrData);
......
...@@ -30,6 +30,11 @@ public class HttpController { ...@@ -30,6 +30,11 @@ public class HttpController {
service.processRequest(token, body); service.processRequest(token, body);
} }
@RequestMapping(value = "/uplink_event/{token}", method = RequestMethod.POST)
public void handleEventRequest(@PathVariable String token, @RequestBody String body) throws Exception {
service.processEventRequest(token, body);
}
@RequestMapping(value = "/rpc_cmd/{method}", method = RequestMethod.GET) @RequestMapping(value = "/rpc_cmd/{method}", method = RequestMethod.GET)
public MqttRpcDataMessage getRpcCmdDataRequest(@PathVariable String method) throws Exception { public MqttRpcDataMessage getRpcCmdDataRequest(@PathVariable String method) throws Exception {
MqttRpcDataMessage mqttRpcDataMessage = service.readFromFile(method); MqttRpcDataMessage mqttRpcDataMessage = service.readFromFile(method);
......
...@@ -10,6 +10,8 @@ public interface HttpService extends ExtensionService { ...@@ -10,6 +10,8 @@ public interface HttpService extends ExtensionService {
void processRequest(String token, String body) throws Exception; void processRequest(String token, String body) throws Exception;
void processEventRequest(String token, String body) throws Exception;
File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException; File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException;
MqttRpcDataMessage readFromFile(String method) throws IOException; MqttRpcDataMessage readFromFile(String method) throws IOException;
......
...@@ -67,6 +67,11 @@ public abstract class DefaultTenantManagerService implements TenantManagerServic ...@@ -67,6 +67,11 @@ public abstract class DefaultTenantManagerService implements TenantManagerServic
httpService.processRequest(token, body); httpService.processRequest(token, body);
} }
@Override
public void processEventRequest(String token, String body) throws Exception {
httpService.processEventRequest(token, body);
}
@Override @Override
public File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException { public File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException {
return httpService.flushRpcDataToFile(mqttRpcDataMessage); return httpService.flushRpcDataToFile(mqttRpcDataMessage);
......
...@@ -10,6 +10,8 @@ public interface TenantManagerService { ...@@ -10,6 +10,8 @@ public interface TenantManagerService {
void processRequest(String token, String body) throws Exception; void processRequest(String token, String body) throws Exception;
void processEventRequest(String token, String body) throws Exception;
File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException; File flushRpcDataToFile(MqttRpcDataMessage mqttRpcDataMessage) throws IOException;
MqttRpcDataMessage readFromFile(String method) throws IOException; MqttRpcDataMessage readFromFile(String method) throws IOException;
......
...@@ -32,6 +32,13 @@ public interface MqttService { ...@@ -32,6 +32,13 @@ public interface MqttService {
*/ */
MqttDeliveryFuture onDeviceAttributesUpdate(List<KvEntry> attributes); MqttDeliveryFuture onDeviceAttributesUpdate(List<KvEntry> attributes);
/**
* Report device event change to Priusisiot
*
* @param attributes - the attribute values list
*/
MqttDeliveryFuture onDeviceEventUpdate(List<KvEntry> attributes);
/** /**
* Report attributes request to Priusisiot * Report attributes request to Priusisiot
* *
......
...@@ -57,6 +57,7 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall ...@@ -57,6 +57,7 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
private static final String DEVICE_TELEMETRY_TOPIC = "devices/telemetry"; private static final String DEVICE_TELEMETRY_TOPIC = "devices/telemetry";
private static final String DEVICE_ATTRIBUTES_TOPIC = "devices/attrs"; private static final String DEVICE_ATTRIBUTES_TOPIC = "devices/attrs";
private static final String DEVICE_RPC_TOPIC = "devices/rpc"; private static final String DEVICE_RPC_TOPIC = "devices/rpc";
private static final String DEVICE_EVENT_TOPIC = "devices/event";
private static final String DEVICE_GET_ATTRIBUTES_REQUEST_TOPIC = "devices/attrs/req/1"; private static final String DEVICE_GET_ATTRIBUTES_REQUEST_TOPIC = "devices/attrs/req/1";
private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC = "devices/attrs/res/+"; private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC = "devices/attrs/res/+";
private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC = "devices/attrs/res/1"; private static final String DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC = "devices/attrs/res/1";
...@@ -157,6 +158,22 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall ...@@ -157,6 +158,22 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
error -> log.warn("[{}] Failed to report device attributes!", msgId, error)); error -> log.warn("[{}] Failed to report device attributes!", msgId, error));
} }
@Override
public MqttDeliveryFuture onDeviceEventUpdate(List<KvEntry> attributes) {
final int msgId = msgIdSeq.incrementAndGet();
log.trace("[{}] Updating device event: {}", msgId, attributes);
ObjectNode node = newNode();
//ObjectNode deviceNode = node.putObject(deviceName);
attributes.forEach(kv -> putToNode(node, kv));
final int packSize = attributes.size();
return persistMessage(DEVICE_EVENT_TOPIC, msgId, toNodeBytes(node),
message -> {
log.debug("[{}] Device event were delivered!", msgId);
attributesCount.addAndGet(packSize);
},
error -> log.warn("[{}] Failed to report device event!", msgId, error));
}
private MqttDeliveryFuture persistMessage(String topic, private MqttDeliveryFuture persistMessage(String topic,
int msgId, int msgId,
byte[] payload, byte[] payload,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论