提交 1cfad9d3 authored 作者: wangqiang's avatar wangqiang

mqtt netty

上级 a31790c1
......@@ -514,6 +514,7 @@ public class MqttServiceImpl implements MqttService, MqttHandler, MqttClientCall
tbClient.on(DEVICE_ATTRIBUTES_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_EVENT_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_PLUS_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_GET_ATTRIBUTES_RESPONSE_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
tbClient.on(DEVICE_RPC_TOPIC, this).await(connection.getConnectionTimeout(), TimeUnit.MILLISECONDS);
......
......@@ -109,14 +109,16 @@ public class JsonTools {
Map.Entry<String, JsonNode> field = it.next();
String key = field.getKey();
JsonNode value = field.getValue();
if (value.isBoolean()) {
if (value.isObject()) {
attributes.add(new JsonDataEntry(key, value.toPrettyString()));
} else if (value.isBoolean()) {
attributes.add(new BooleanDataEntry(key, value.asBoolean()));
} else if (value.isDouble()) {
attributes.add(new DoubleDataEntry(key, value.asDouble()));
} else if (value.canConvertToLong()) {
attributes.add(new LongDataEntry(key, value.asLong()));
} else {
attributes.add(new StringDataEntry(key, value.asText()));
attributes.add(new StringDataEntry(key, value.toString()));
}
}
return attributes;
......
......@@ -15,6 +15,7 @@
<module>apq-pc-control</module>
<module>apq-pc-info</module>
<module>apq-pc-register</module>
<module>tools</module>
</modules>
</project>
\ No newline at end of file
.idea/
*.ipr
*.iws
*.ids
*.iml
logs
target
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.priusis</groupId>
<version>1.0.0-SNAPSHOT</version>
<artifactId>monitor-tools</artifactId>
</parent>
<groupId>com.priusis</groupId>
<version>1.0.0-SNAPSHOT</version>
<artifactId>netty-mqtt</artifactId>
<packaging>jar</packaging>
<name>Netty MQTT Client</name>
<url>https://priusis.io</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.build.timestamp.format>yyyyMMdd</maven.build.timestamp.format>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<jdk.version>1.8</jdk.version>
<netty.version>4.1.49.Final</netty.version>
<guava.version>28.2-jre</guava.version>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.6</version>
</extension>
</extensions>
</build>
</project>
\ No newline at end of file
package com.priusis.monitor.mqtt;
/**
* Created by Valerii Sosliuk on 12/26/2017.
*/
public class ChannelClosedException extends RuntimeException {
private static final long serialVersionUID = 6266638352424706909L;
public ChannelClosedException() {
}
public ChannelClosedException(String message) {
super(message);
}
public ChannelClosedException(String message, Throwable cause) {
super(message, cause);
}
public ChannelClosedException(Throwable cause) {
super(cause);
}
public ChannelClosedException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
package com.priusis.monitor.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future;
public interface MqttClient {
/**
* Connect to the specified hostname/ip. By default uses port 1883.
* If you want to change the port number, see {@link #connect(String, int)}
*
* @param host The ip address or host to connect to
* @return A future which will be completed when the connection is opened and we received an CONNACK
*/
Future<MqttConnectResult> connect(String host);
/**
* Connect to the specified hostname/ip using the specified port
*
* @param host The ip address or host to connect to
* @param port The tcp port to connect to
* @return A future which will be completed when the connection is opened and we received an CONNACK
*/
Future<MqttConnectResult> connect(String host, int port);
/**
* @return boolean value indicating if channel is active
*/
boolean isConnected();
/**
* Attempt reconnect to the host that was attempted with {@link #connect(String, int)} method before
*
* @return A future which will be completed when the connection is opened and we received an CONNACK
* @throws IllegalStateException if no previous {@link #connect(String, int)} calls were attempted
*/
Future<MqttConnectResult> reconnect();
/**
* Retrieve the netty {@link EventLoopGroup} we are using
*
* @return The netty {@link EventLoopGroup} we use for the connection
*/
EventLoopGroup getEventLoop();
/**
* By default we use the netty {@link NioEventLoopGroup}.
* If you change the EventLoopGroup to another type, make sure to change the {@link Channel} class using {@link MqttClientConfig#setChannelClass(Class)}
* If you want to force the MqttClient to use another {@link EventLoopGroup}, call this function before calling {@link #connect(String, int)}
*
* @param eventLoop The new eventloop to use
*/
void setEventLoop(EventLoopGroup eventLoop);
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
* @return A future which will be completed when the server acknowledges our subscribe request
*/
Future<Void> on(String topic, MqttHandler handler);
/**
* Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
* @param qos The qos to request to the server
* @return A future which will be completed when the server acknowledges our subscribe request
*/
Future<Void> on(String topic, MqttHandler handler, MqttQoS qos);
/**
* Subscribe on the given topic. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
* This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
* @return A future which will be completed when the server acknowledges our subscribe request
*/
Future<Void> once(String topic, MqttHandler handler);
/**
* Subscribe on the given topic, with the given qos. When a message is received, MqttClient will invoke the {@link MqttHandler#onMessage(String, ByteBuf)} function of the given handler
* This subscription is only once. If the MqttClient has received 1 message, the subscription will be removed
*
* @param topic The topic filter to subscribe to
* @param handler The handler to invoke when we receive a message
* @param qos The qos to request to the server
* @return A future which will be completed when the server acknowledges our subscribe request
*/
Future<Void> once(String topic, MqttHandler handler, MqttQoS qos);
/**
* Remove the subscription for the given topic and handler
* If you want to unsubscribe from all handlers known for this topic, use {@link #off(String)}
*
* @param topic The topic to unsubscribe for
* @param handler The handler to unsubscribe
* @return A future which will be completed when the server acknowledges our unsubscribe request
*/
Future<Void> off(String topic, MqttHandler handler);
/**
* Remove all subscriptions for the given topic.
* If you want to specify which handler to unsubscribe, use {@link #off(String, MqttHandler)}
*
* @param topic The topic to unsubscribe for
* @return A future which will be completed when the server acknowledges our unsubscribe request
*/
Future<Void> off(String topic);
/**
* Publish a message to the given payload
*
* @param topic The topic to publish to
* @param payload The payload to send
* @return A future which will be completed when the message is sent out of the MqttClient
*/
Future<Void> publish(String topic, ByteBuf payload);
/**
* Publish a message to the given payload, using the given qos
*
* @param topic The topic to publish to
* @param payload The payload to send
* @param qos The qos to use while publishing
* @return A future which will be completed when the message is delivered to the server
*/
Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos);
/**
* Publish a message to the given payload, using optional retain
*
* @param topic The topic to publish to
* @param payload The payload to send
* @param retain true if you want to retain the message on the server, false otherwise
* @return A future which will be completed when the message is sent out of the MqttClient
*/
Future<Void> publish(String topic, ByteBuf payload, boolean retain);
/**
* Publish a message to the given payload, using the given qos and optional retain
*
* @param topic The topic to publish to
* @param payload The payload to send
* @param qos The qos to use while publishing
* @param retain true if you want to retain the message on the server, false otherwise
* @return A future which will be completed when the message is delivered to the server
*/
Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos, boolean retain);
/**
* Retrieve the MqttClient configuration
*
* @return The {@link MqttClientConfig} instance we use
*/
MqttClientConfig getClientConfig();
/**
* Construct the MqttClientImpl with additional config.
* This config can also be changed using the {@link #getClientConfig()} function
*
* @param config The config object to use while looking for settings
* @param defaultHandler The handler for incoming messages that do not match any topic subscriptions
*/
static MqttClient create(MqttClientConfig config, MqttHandler defaultHandler) {
return new MqttClientImpl(config, defaultHandler);
}
/**
* Send disconnect and close channel
*/
void disconnect();
/**
* Sets the {@see #MqttClientCallback} object for this MqttClient
*
* @param callback The callback to be set
*/
void setCallback(MqttClientCallback callback);
}
package com.priusis.monitor.mqtt;
/**
* Created by Valerii Sosliuk on 12/30/2017.
*/
public interface MqttClientCallback {
/**
* This method is called when the connection to the server is lost.
*
* @param cause the reason behind the loss of connection.
*/
void connectionLost(Throwable cause);
/**
* This method is called when the connection to the server is recovered.
*/
void onSuccessfulReconnect();
}
package com.priusis.monitor.mqtt;
import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Random;
@SuppressWarnings({"WeakerAccess", "unused"})
public final class MqttClientConfig {
private final SslContext sslContext;
private final String randomClientId;
private String clientId;
private int timeoutSeconds = 60;
private MqttVersion protocolVersion = MqttVersion.MQTT_3_1;
@Nullable
private String username = null;
@Nullable
private String password = null;
private boolean cleanSession = true;
@Nullable
private MqttLastWill lastWill;
private Class<? extends Channel> channelClass = NioSocketChannel.class;
private boolean reconnect = true;
private long reconnectDelay = 1L;
private int maxBytesInMessage = 8092;
public MqttClientConfig() {
this(null);
}
public MqttClientConfig(SslContext sslContext) {
this.sslContext = sslContext;
Random random = new Random();
String id = "netty-mqtt/";
String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split("");
for (int i = 0; i < 8; i++) {
id += options[random.nextInt(options.length)];
}
this.clientId = id;
this.randomClientId = id;
}
@Nonnull
public String getClientId() {
return clientId;
}
public void setClientId(@Nullable String clientId) {
if (clientId == null) {
this.clientId = randomClientId;
} else {
this.clientId = clientId;
}
}
public int getTimeoutSeconds() {
return timeoutSeconds;
}
public void setTimeoutSeconds(int timeoutSeconds) {
if (timeoutSeconds != -1 && timeoutSeconds <= 0) {
throw new IllegalArgumentException("timeoutSeconds must be > 0 or -1");
}
this.timeoutSeconds = timeoutSeconds;
}
public MqttVersion getProtocolVersion() {
return protocolVersion;
}
public void setProtocolVersion(MqttVersion protocolVersion) {
if (protocolVersion == null) {
throw new NullPointerException("protocolVersion");
}
this.protocolVersion = protocolVersion;
}
@Nullable
public String getUsername() {
return username;
}
public void setUsername(@Nullable String username) {
this.username = username;
}
@Nullable
public String getPassword() {
return password;
}
public void setPassword(@Nullable String password) {
this.password = password;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
@Nullable
public MqttLastWill getLastWill() {
return lastWill;
}
public void setLastWill(@Nullable MqttLastWill lastWill) {
this.lastWill = lastWill;
}
public Class<? extends Channel> getChannelClass() {
return channelClass;
}
public void setChannelClass(Class<? extends Channel> channelClass) {
this.channelClass = channelClass;
}
public SslContext getSslContext() {
return sslContext;
}
public boolean isReconnect() {
return reconnect;
}
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
public long getReconnectDelay() {
return reconnectDelay;
}
/**
* Sets the reconnect delay in seconds. Defaults to 1 second.
*
* @param reconnectDelay
* @throws IllegalArgumentException if reconnectDelay is smaller than 1.
*/
public void setReconnectDelay(long reconnectDelay) {
if (reconnectDelay <= 0) {
throw new IllegalArgumentException("reconnectDelay must be > 0");
}
this.reconnectDelay = reconnectDelay;
}
public int getMaxBytesInMessage() {
return maxBytesInMessage;
}
/**
* Sets the maximum number of bytes in the message for the {@link io.netty.handler.codec.mqtt.MqttDecoder}.
* Default value is 8092 as specified by Netty. The absolute maximum size is 256MB as set by the MQTT spec.
*
* @param maxBytesInMessage
* @throws IllegalArgumentException if maxBytesInMessage is smaller than 1 or greater than 256_000_000.
*/
public void setMaxBytesInMessage(int maxBytesInMessage) {
if (maxBytesInMessage <= 0 || maxBytesInMessage > 256_000_000) {
throw new IllegalArgumentException("maxBytesInMessage must be > 0 or < 256_000_000");
}
this.maxBytesInMessage = maxBytesInMessage;
}
}
package com.priusis.monitor.mqtt;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
@SuppressWarnings({"WeakerAccess", "unused"})
public final class MqttConnectResult {
private final boolean success;
private final MqttConnectReturnCode returnCode;
private final ChannelFuture closeFuture;
MqttConnectResult(boolean success, MqttConnectReturnCode returnCode, ChannelFuture closeFuture) {
this.success = success;
this.returnCode = returnCode;
this.closeFuture = closeFuture;
}
public boolean isSuccess() {
return success;
}
public MqttConnectReturnCode getReturnCode() {
return returnCode;
}
public ChannelFuture getCloseFuture() {
return closeFuture;
}
}
package com.priusis.monitor.mqtt;
import io.netty.buffer.ByteBuf;
public interface MqttHandler {
void onMessage(String topic, ByteBuf payload);
}
package com.priusis.monitor.mqtt;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.function.Consumer;
final class MqttIncomingQos2Publish {
private final MqttPublishMessage incomingPublish;
private final RetransmissionHandler<MqttMessage> retransmissionHandler = new RetransmissionHandler<>();
MqttIncomingQos2Publish(MqttPublishMessage incomingPublish, MqttMessage originalMessage) {
this.incomingPublish = incomingPublish;
this.retransmissionHandler.setOriginalMessage(originalMessage);
}
MqttPublishMessage getIncomingPublish() {
return incomingPublish;
}
void startPubrecRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
this.retransmissionHandler.start(eventLoop);
}
void onPubrelReceived() {
this.retransmissionHandler.stop();
}
}
package com.priusis.monitor.mqtt;
import io.netty.handler.codec.mqtt.MqttQoS;
@SuppressWarnings({"WeakerAccess", "unused", "SimplifiableIfStatement", "StringBufferReplaceableByString"})
public final class MqttLastWill {
private final String topic;
private final String message;
private final boolean retain;
private final MqttQoS qos;
public MqttLastWill(String topic, String message, boolean retain, MqttQoS qos) {
if (topic == null) {
throw new NullPointerException("topic");
}
if (message == null) {
throw new NullPointerException("message");
}
if (qos == null) {
throw new NullPointerException("qos");
}
this.topic = topic;
this.message = message;
this.retain = retain;
this.qos = qos;
}
public String getTopic() {
return topic;
}
public String getMessage() {
return message;
}
public boolean isRetain() {
return retain;
}
public MqttQoS getQos() {
return qos;
}
public static MqttLastWill.Builder builder() {
return new MqttLastWill.Builder();
}
public static final class Builder {
private String topic;
private String message;
private boolean retain;
private MqttQoS qos;
public String getTopic() {
return topic;
}
public Builder setTopic(String topic) {
if (topic == null) {
throw new NullPointerException("topic");
}
this.topic = topic;
return this;
}
public String getMessage() {
return message;
}
public Builder setMessage(String message) {
if (message == null) {
throw new NullPointerException("message");
}
this.message = message;
return this;
}
public boolean isRetain() {
return retain;
}
public Builder setRetain(boolean retain) {
this.retain = retain;
return this;
}
public MqttQoS getQos() {
return qos;
}
public Builder setQos(MqttQoS qos) {
if (qos == null) {
throw new NullPointerException("qos");
}
this.qos = qos;
return this;
}
public MqttLastWill build() {
return new MqttLastWill(topic, message, retain, qos);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MqttLastWill that = (MqttLastWill) o;
if (retain != that.retain) return false;
if (!topic.equals(that.topic)) return false;
if (!message.equals(that.message)) return false;
return qos == that.qos;
}
@Override
public int hashCode() {
int result = topic.hashCode();
result = 31 * result + message.hashCode();
result = 31 * result + (retain ? 1 : 0);
result = 31 * result + qos.hashCode();
return result;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("MqttLastWill{");
sb.append("topic='").append(topic).append('\'');
sb.append(", message='").append(message).append('\'');
sb.append(", retain=").append(retain);
sb.append(", qos=").append(qos.name());
sb.append('}');
return sb.toString();
}
}
package com.priusis.monitor.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Promise;
import java.util.function.Consumer;
final class MqttPendingPublish {
private final int messageId;
private final Promise<Void> future;
private final ByteBuf payload;
private final MqttPublishMessage message;
private final MqttQoS qos;
private final RetransmissionHandler<MqttPublishMessage> publishRetransmissionHandler = new RetransmissionHandler<>();
private final RetransmissionHandler<MqttMessage> pubrelRetransmissionHandler = new RetransmissionHandler<>();
private boolean sent = false;
MqttPendingPublish(int messageId, Promise<Void> future, ByteBuf payload, MqttPublishMessage message, MqttQoS qos) {
this.messageId = messageId;
this.future = future;
this.payload = payload;
this.message = message;
this.qos = qos;
this.publishRetransmissionHandler.setOriginalMessage(message);
}
int getMessageId() {
return messageId;
}
Promise<Void> getFuture() {
return future;
}
ByteBuf getPayload() {
return payload;
}
boolean isSent() {
return sent;
}
void setSent(boolean sent) {
this.sent = sent;
}
MqttPublishMessage getMessage() {
return message;
}
MqttQoS getQos() {
return qos;
}
void startPublishRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.publishRetransmissionHandler.setHandle(((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttPublishMessage(fixedHeader, originalMessage.variableHeader(), this.payload.retain()))));
this.publishRetransmissionHandler.start(eventLoop);
}
void onPubackReceived() {
this.publishRetransmissionHandler.stop();
}
void setPubrelMessage(MqttMessage pubrelMessage) {
this.pubrelRetransmissionHandler.setOriginalMessage(pubrelMessage);
}
void startPubrelRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.pubrelRetransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttMessage(fixedHeader, originalMessage.variableHeader())));
this.pubrelRetransmissionHandler.start(eventLoop);
}
void onPubcompReceived() {
this.pubrelRetransmissionHandler.stop();
}
}
package com.priusis.monitor.mqtt;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.util.concurrent.Promise;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
final class MqttPendingSubscription {
private final Promise<Void> future;
private final String topic;
private final Set<MqttPendingHandler> handlers = new HashSet<>();
private final MqttSubscribeMessage subscribeMessage;
private final RetransmissionHandler<MqttSubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
private boolean sent = false;
MqttPendingSubscription(Promise<Void> future, String topic, MqttSubscribeMessage message) {
this.future = future;
this.topic = topic;
this.subscribeMessage = message;
this.retransmissionHandler.setOriginalMessage(message);
}
Promise<Void> getFuture() {
return future;
}
String getTopic() {
return topic;
}
boolean isSent() {
return sent;
}
void setSent(boolean sent) {
this.sent = sent;
}
MqttSubscribeMessage getSubscribeMessage() {
return subscribeMessage;
}
void addHandler(MqttHandler handler, boolean once) {
this.handlers.add(new MqttPendingHandler(handler, once));
}
Set<MqttPendingHandler> getHandlers() {
return handlers;
}
void startRetransmitTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
if (this.sent) { //If the packet is sent, we can start the retransmit timer
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttSubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retransmissionHandler.start(eventLoop);
}
}
void onSubackReceived() {
this.retransmissionHandler.stop();
}
final class MqttPendingHandler {
private final MqttHandler handler;
private final boolean once;
MqttPendingHandler(MqttHandler handler, boolean once) {
this.handler = handler;
this.once = once;
}
MqttHandler getHandler() {
return handler;
}
boolean isOnce() {
return once;
}
}
}
package com.priusis.monitor.mqtt;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.util.concurrent.Promise;
import java.util.function.Consumer;
final class MqttPendingUnsubscription {
private final Promise<Void> future;
private final String topic;
private final RetransmissionHandler<MqttUnsubscribeMessage> retransmissionHandler = new RetransmissionHandler<>();
MqttPendingUnsubscription(Promise<Void> future, String topic, MqttUnsubscribeMessage unsubscribeMessage) {
this.future = future;
this.topic = topic;
this.retransmissionHandler.setOriginalMessage(unsubscribeMessage);
}
Promise<Void> getFuture() {
return future;
}
String getTopic() {
return topic;
}
void startRetransmissionTimer(EventLoop eventLoop, Consumer<Object> sendPacket) {
this.retransmissionHandler.setHandle((fixedHeader, originalMessage) ->
sendPacket.accept(new MqttUnsubscribeMessage(fixedHeader, originalMessage.variableHeader(), originalMessage.payload())));
this.retransmissionHandler.start(eventLoop);
}
void onUnsubackReceived() {
this.retransmissionHandler.stop();
}
}
package com.priusis.monitor.mqtt;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
final class MqttPingHandler extends ChannelInboundHandlerAdapter {
private final int keepaliveSeconds;
private ScheduledFuture<?> pingRespTimeout;
MqttPingHandler(int keepaliveSeconds) {
this.keepaliveSeconds = keepaliveSeconds;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MqttMessage)) {
ctx.fireChannelRead(msg);
return;
}
MqttMessage message = (MqttMessage) msg;
if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
this.handlePingReq(ctx.channel());
} else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
this.handlePingResp();
} else {
ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
break;
case WRITER_IDLE:
this.sendPingReq(ctx.channel());
break;
}
}
}
private void sendPingReq(Channel channel) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
if (this.pingRespTimeout != null) {
this.pingRespTimeout = channel.eventLoop().schedule(() -> {
MqttFixedHeader fixedHeader2 = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader2)).addListener(ChannelFutureListener.CLOSE);
//TODO: what do when the connection is closed ?
}, this.keepaliveSeconds, TimeUnit.SECONDS);
}
}
private void handlePingReq(Channel channel) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
channel.writeAndFlush(new MqttMessage(fixedHeader));
}
private void handlePingResp() {
if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
this.pingRespTimeout.cancel(true);
this.pingRespTimeout = null;
}
}
}
package com.priusis.monitor.mqtt;
import java.util.regex.Pattern;
final class MqttSubscription {
private final String topic;
private final Pattern topicRegex;
private final MqttHandler handler;
private final boolean once;
private boolean called;
MqttSubscription(String topic, MqttHandler handler, boolean once) {
if (topic == null) {
throw new NullPointerException("topic");
}
if (handler == null) {
throw new NullPointerException("handler");
}
this.topic = topic;
this.handler = handler;
this.once = once;
this.topicRegex = Pattern.compile(topic.replace("+", "[^/]+").replace("#", ".+") + "$");
}
String getTopic() {
return topic;
}
public MqttHandler getHandler() {
return handler;
}
boolean isOnce() {
return once;
}
boolean isCalled() {
return called;
}
boolean matches(String topic) {
return this.topicRegex.matcher(topic).matches();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MqttSubscription that = (MqttSubscription) o;
return once == that.once && topic.equals(that.topic) && handler.equals(that.handler);
}
@Override
public int hashCode() {
int result = topic.hashCode();
result = 31 * result + handler.hashCode();
result = 31 * result + (once ? 1 : 0);
return result;
}
void setCalled(boolean called) {
this.called = called;
}
}
package com.priusis.monitor.mqtt;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
final class RetransmissionHandler<T extends MqttMessage> {
private ScheduledFuture<?> timer;
private int timeout = 10;
private BiConsumer<MqttFixedHeader, T> handler;
private T originalMessage;
void start(EventLoop eventLoop) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (this.handler == null) {
throw new NullPointerException("handler");
}
this.timeout = 10;
this.startTimer(eventLoop);
}
private void startTimer(EventLoop eventLoop) {
this.timer = eventLoop.schedule(() -> {
this.timeout += 5;
boolean isDup = this.originalMessage.fixedHeader().isDup();
if (this.originalMessage.fixedHeader().messageType() == MqttMessageType.PUBLISH && this.originalMessage.fixedHeader().qosLevel() != MqttQoS.AT_MOST_ONCE) {
isDup = true;
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(this.originalMessage.fixedHeader().messageType(), isDup, this.originalMessage.fixedHeader().qosLevel(), this.originalMessage.fixedHeader().isRetain(), this.originalMessage.fixedHeader().remainingLength());
handler.accept(fixedHeader, originalMessage);
startTimer(eventLoop);
}, timeout, TimeUnit.SECONDS);
}
void stop() {
if (this.timer != null) {
this.timer.cancel(true);
}
}
void setHandle(BiConsumer<MqttFixedHeader, T> runnable) {
this.handler = runnable;
}
void setOriginalMessage(T originalMessage) {
this.originalMessage = originalMessage;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<version>1.0.0-SNAPSHOT</version>
<groupId>com.priusis</groupId>
<artifactId>monitor-tools</artifactId>
<name>monitor-tools</name>
<packaging>pom</packaging>
<description>
tools
</description>
<modules>
<module>netty-mqtt</module>
</modules>
</project>
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论