From ed608e11daacdfaa8bd42068e11ae251ecf18977 Mon Sep 17 00:00:00 2001 From: hylexus Date: Sun, 14 Jan 2024 13:58:05 +0800 Subject: [PATCH] =?UTF-8?q?feat(AttachmentServer):=20=E8=8B=8F=E6=A0=87?= =?UTF-8?q?=E9=99=84=E4=BB=B6=E6=9C=8D=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 18 ++ .../script/common-maven-config.gradle | 4 +- .../jt808/handler/AttachmentFileHandler.java | 77 ++++++++ gradle.properties | 2 +- ...t808AttachmentServerAutoConfiguration.java | 113 ++++++++++++ .../Jt808NettyServerAutoConfiguration.java | 3 +- .../jt/jt808/boot/props/Jt808ServerProps.java | 6 +- .../attachment/AttachmentServerProps.java | 26 +++ .../MsgProcessorExecutorGroupProps.java | 4 +- .../hylexus/jt/jt808/JtProtocolConstant.java | 1 + .../hylexus/jt/jt808/spec/Jt808Request.java | 4 + .../jt/jt808/spec/Jt808RequestHeader.java | 4 + .../builtin/msg/req/BuiltinMsg1210Alias.java | 60 +++++++ .../builtin/msg/req/BuiltinMsg1211Alias.java | 30 ++++ .../builtin/msg/req/BuiltinMsg1212Alias.java | 30 ++++ .../msg/req/BuiltinMsg30316364Alias.java | 46 +++++ .../builtin/msg/resp/BuiltinMsg9208Alias.java | 43 +++++ .../builtin/msg/resp/BuiltinMsg9212Alias.java | 54 ++++++ .../jt808/spec/impl/BuiltinJt808MsgType.java | 8 + .../impl/request/DefaultJt808Request.java | 4 + .../spec/session/DefaultJt808Session.java | 4 + .../session/DefaultJt808SessionManager.java | 2 +- .../session/InternalJt808SessionManager.java | 164 ++++++++++++++++++ .../jt/jt808/spec/session/Jt808Session.java | 18 ++ .../spec/session/Jt808SessionManager.java | 2 +- ...entJt808DispatchChannelHandlerAdapter.java | 72 ++++++++ .../AttachmentJt808RequestProcessor.java | 8 + .../AttachmentJt808SessionManager.java | 6 + ...dLengthFieldBasedByteToMessageDecoder.java | 74 ++++++++ .../Jt808AttachmentServerNettyConfigure.java | 54 ++++++ .../Jt808NettyTcpAttachmentServer.java | 79 +++++++++ .../DefaultAttachmentJt808SessionManager.java | 22 +++ ...SimpleAttachmentJt808RequestProcessor.java | 156 +++++++++++++++++ .../support/netty/Jt808NettyTcpServer.java | 2 +- .../jt/netty/JtServerNettyConfigure.java | 8 +- .../DebugAnnotationBasedExceptionHandler.java | 3 +- .../src/main/resources/application.yml | 15 ++ 37 files changed, 1214 insertions(+), 12 deletions(-) create mode 100644 demos/jt-demo-808-server-webflux-boot3/src/main/java/io/github/hylexus/jt/demos/jt808/handler/AttachmentFileHandler.java create mode 100644 jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808AttachmentServerAutoConfiguration.java create mode 100644 jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/attachment/AttachmentServerProps.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1210Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1211Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1212Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg30316364Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9208Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9212Alias.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/InternalJt808SessionManager.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808DispatchChannelHandlerAdapter.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808RequestProcessor.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808SessionManager.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/DelimiterAndLengthFieldBasedByteToMessageDecoder.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808AttachmentServerNettyConfigure.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808NettyTcpAttachmentServer.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/DefaultAttachmentJt808SessionManager.java create mode 100644 jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/SimpleAttachmentJt808RequestProcessor.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b76611ce..3a1b3789 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,21 @@ +## 2.1.4-beta1(2024-01-14) + +### ⭐ New Features + +- 支持苏标附件服务器 + +## 2.1.3(2023-10-03) + +### 🐞 Bug Fixes + +`BuiltinCommonHandler.processTerminalHeartBeatMsg` 应该回复 `0x8001` 消息,而不是不回复消息 + +## 2.1.2(2023-09-01) + +### ⭐ New Features + +- [#78](https://github.com/hylexus/jt-framework/issues/78) + ## 2.1.1 ### ⭐ New Features diff --git a/build-script/script/common-maven-config.gradle b/build-script/script/common-maven-config.gradle index 71a39266..2f9d98c9 100644 --- a/build-script/script/common-maven-config.gradle +++ b/build-script/script/common-maven-config.gradle @@ -141,6 +141,6 @@ def isSnapshot() { } def static publishToMavenCentralRepo() { - // gradle clean build publish -Dpublish2MavenCentralRepo=true - System.getProperty("publish2MavenCentralRepo") ?: "false" == "true" + // ./gradlew clean build publish -DpublishToMavenCentralRepo=true + System.getProperty("publishToMavenCentralRepo") ?: "false" == "true" } \ No newline at end of file diff --git a/demos/jt-demo-808-server-webflux-boot3/src/main/java/io/github/hylexus/jt/demos/jt808/handler/AttachmentFileHandler.java b/demos/jt-demo-808-server-webflux-boot3/src/main/java/io/github/hylexus/jt/demos/jt808/handler/AttachmentFileHandler.java new file mode 100644 index 00000000..0eca1486 --- /dev/null +++ b/demos/jt-demo-808-server-webflux-boot3/src/main/java/io/github/hylexus/jt/demos/jt808/handler/AttachmentFileHandler.java @@ -0,0 +1,77 @@ +package io.github.hylexus.jt.demos.jt808.handler; + +import io.github.hylexus.jt.jt808.Jt808ProtocolVersion; +import io.github.hylexus.jt.jt808.spec.Jt808CommandSender; +import io.github.hylexus.jt.jt808.spec.Jt808MsgBuilder; +import io.github.hylexus.jt.jt808.spec.Jt808Request; +import io.github.hylexus.jt.jt808.spec.builtin.msg.req.BuiltinMsg1210Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.req.BuiltinMsg1211Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.req.BuiltinMsg1212Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.req.BuiltinMsg30316364Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.resp.BuiltinMsg9208Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.resp.BuiltinMsg9212Alias; +import io.github.hylexus.jt.jt808.spec.builtin.msg.resp.BuiltinServerCommonReplyMsg; +import io.github.hylexus.jt.jt808.spec.session.Jt808Session; +import io.github.hylexus.jt.jt808.spec.session.Jt808SessionManager; +import io.github.hylexus.jt.jt808.support.annotation.handler.Jt808RequestHandler; +import io.github.hylexus.jt.jt808.support.annotation.handler.Jt808RequestHandlerMapping; +import io.netty.buffer.ByteBuf; +import lombok.extern.slf4j.Slf4j; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@Component +@Jt808RequestHandler +public class AttachmentFileHandler { + private final Jt808SessionManager sessionManager; + + public AttachmentFileHandler(Jt808SessionManager sessionManager) { + this.sessionManager = sessionManager; + } + + @Jt808RequestHandlerMapping(msgType = 0x1210, versions = Jt808ProtocolVersion.AUTO_DETECTION) + public BuiltinServerCommonReplyMsg processMsg0x1210(Jt808Request request, BuiltinMsg1210Alias body) { + log.info("0x1210 ==> {}", body); + return BuiltinServerCommonReplyMsg.success(request.header().msgId(), request.flowId()); + } + + @Jt808RequestHandlerMapping(msgType = 0x1211, versions = Jt808ProtocolVersion.AUTO_DETECTION) + public BuiltinServerCommonReplyMsg processMsg0x1211(Jt808Request request, BuiltinMsg1211Alias body) { + log.info("0x1211 ==> {}", body); + return BuiltinServerCommonReplyMsg.success(request.header().msgId(), request.flowId()); + } + + @Jt808RequestHandlerMapping(msgType = 0x1212, versions = Jt808ProtocolVersion.AUTO_DETECTION) + public BuiltinServerCommonReplyMsg processMsg0x1212(Jt808Request request, BuiltinMsg1212Alias reqBody) { + log.info("0x1211 ==> {}", reqBody); + // final BuiltinMsg9212Alias respBody = new BuiltinMsg9212Alias(); + // respBody.setFileNameLength(reqBody.getFileNameLength()); + // respBody.setFileName(reqBody.getFileName()); + // respBody.setFileType(reqBody.getFileType()); + // respBody.setUploadResult((byte) 0x00); + // respBody.setPackageCountToReTransmit((short) 0); + + // this.sendMsg9212(request.terminalId(), respBody); + return BuiltinServerCommonReplyMsg.success(request.header().msgId(), request.flowId()); + } + + @Jt808RequestHandlerMapping(msgType = 0x30316364) + public void processMsg30316364(Jt808Request request, BuiltinMsg30316364Alias body, @Nullable Jt808Session session) { + log.info("0x30316364 ==> session:{}, msg:{}", session, body); + } + + public void sendMsg9212(String terminalId, BuiltinMsg9212Alias body) { + final Jt808Session session = this.sessionManager.findByTerminalId(terminalId).orElseThrow(); + + final ByteBuf byteBuf = Jt808MsgBuilder.newEntityBuilder(session) + .version(session.protocolVersion()) + .terminalId(terminalId) + .body(body) + .build(); + session.sendMsgToClient(byteBuf); + } +} diff --git a/gradle.properties b/gradle.properties index 6a939527..ff539a0c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ projectGroup=io.github.hylexus.jt -projectVersion=2.2.0-beta2 +projectVersion=2.1.4-beta1 # scm projectScmUrl=https://github.com/hylexus/jt-framework projectScmConnection=scm:git:git@github.com:hylexus/jt-framework.git diff --git a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808AttachmentServerAutoConfiguration.java b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808AttachmentServerAutoConfiguration.java new file mode 100644 index 00000000..b0e507bc --- /dev/null +++ b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808AttachmentServerAutoConfiguration.java @@ -0,0 +1,113 @@ +package io.github.hylexus.jt.jt808.boot.config.configuration; + +import io.github.hylexus.jt.core.OrderedComponent; +import io.github.hylexus.jt.jt808.boot.props.Jt808ServerProps; +import io.github.hylexus.jt.jt808.boot.props.attachment.AttachmentServerProps; +import io.github.hylexus.jt.jt808.boot.props.msg.processor.MsgProcessorExecutorGroupProps; +import io.github.hylexus.jt.jt808.spec.session.Jt808FlowIdGeneratorFactory; +import io.github.hylexus.jt.jt808.spec.session.Jt808SessionEventListener; +import io.github.hylexus.jt.jt808.support.codec.Jt808MsgDecoder; +import io.github.hylexus.jt.jt808.support.codec.Jt808RequestRouteExceptionHandler; +import io.github.hylexus.jt.jt808.support.dispatcher.Jt808DispatcherHandler; +import io.github.hylexus.jt.jt808.support.extension.attachment.*; +import io.github.hylexus.jt.jt808.support.extension.attachment.impl.DefaultAttachmentJt808SessionManager; +import io.github.hylexus.jt.jt808.support.extension.attachment.impl.SimpleAttachmentJt808RequestProcessor; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.concurrent.RejectedExecutionHandlers; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; + +import java.util.Comparator; + +import static io.github.hylexus.jt.jt808.JtProtocolConstant.BEAN_NAME_JT808_ATTACHMENT_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP; + +@Slf4j +@ConditionalOnProperty(prefix = "jt808.attachment-server", name = "enabled", havingValue = "true", matchIfMissing = false) +public class Jt808AttachmentServerAutoConfiguration { + private final Jt808ServerProps instructionServerProps; + + public Jt808AttachmentServerAutoConfiguration(Jt808ServerProps instructionServerProps) { + this.instructionServerProps = instructionServerProps; + } + + @Bean + @ConditionalOnMissingBean + public AttachmentJt808SessionManager attachmentJt808SessionManager(ObjectProvider listeners, Jt808FlowIdGeneratorFactory factory) { + final AttachmentJt808SessionManager manager = new DefaultAttachmentJt808SessionManager(factory); + listeners.stream().sorted(Comparator.comparing(OrderedComponent::getOrder)).forEach(manager::addListener); + return manager; + } + + @Bean + @ConditionalOnMissingBean(SimpleAttachmentJt808RequestProcessor.class) + public SimpleAttachmentJt808RequestProcessor simpleAttachmentJt808RequestProcessor( + Jt808MsgDecoder jt808MsgDecoder, + AttachmentJt808SessionManager sessionManager, + Jt808RequestRouteExceptionHandler routeExceptionHandler, + Jt808DispatcherHandler dispatcherHandler) { + return new SimpleAttachmentJt808RequestProcessor(jt808MsgDecoder, sessionManager, routeExceptionHandler, dispatcherHandler); + } + + @Bean(name = BEAN_NAME_JT808_ATTACHMENT_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP) + @ConditionalOnMissingBean(name = BEAN_NAME_JT808_ATTACHMENT_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP) + public EventExecutorGroup eventExecutorGroup() { + + final MsgProcessorExecutorGroupProps poolProps = instructionServerProps.getAttachmentServer().getMsgProcessor().getExecutorGroup(); + final DefaultThreadFactory threadFactory = new DefaultThreadFactory(poolProps.getPoolName()); + + log.info("MsgProcessorConfig = {}", poolProps); + return new DefaultEventExecutorGroup( + poolProps.getThreadCount(), + threadFactory, + poolProps.getMaxPendingTasks(), + RejectedExecutionHandlers.reject() + ); + } + + @Bean + @ConditionalOnMissingBean + public AttachmentJt808DispatchChannelHandlerAdapter attachmentJt808DispatchChannelHandlerAdapter( + AttachmentJt808RequestProcessor requestProcessor, + AttachmentJt808SessionManager sessionManager) { + + return new AttachmentJt808DispatchChannelHandlerAdapter(requestProcessor, sessionManager); + } + + @Bean + @ConditionalOnMissingBean + public Jt808AttachmentServerNettyConfigure jt808AttachmentServerNettyConfigure( + AttachmentJt808DispatchChannelHandlerAdapter attachmentJt808DispatchChannelHandlerAdapter, + @Qualifier(BEAN_NAME_JT808_ATTACHMENT_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP) EventExecutorGroup executors) { + + final AttachmentServerProps attachmentServerProps = instructionServerProps.getAttachmentServer(); + return new Jt808AttachmentServerNettyConfigure.DefaultJt808AttachmentServerNettyConfigure( + new Jt808AttachmentServerNettyConfigure.DefaultJt808AttachmentServerNettyConfigure.BuiltInServerBootstrapProps( + instructionServerProps.getProtocol().getMaxFrameLength(), + attachmentServerProps.getMaxFrameLength() + ), + attachmentJt808DispatchChannelHandlerAdapter, + executors + ); + } + + @Bean(initMethod = "doStart", destroyMethod = "doStop") + @ConditionalOnMissingBean + public Jt808NettyTcpAttachmentServer jt808NettyTcpAttachmentServer(Jt808AttachmentServerNettyConfigure configure) { + final Jt808NettyTcpAttachmentServer server = new Jt808NettyTcpAttachmentServer( + "808-tcp-attachment-server", + configure + ); + + final AttachmentServerProps nettyProps = instructionServerProps.getAttachmentServer(); + server.setPort(nettyProps.getPort()); + server.setBossThreadCount(nettyProps.getBossThreadCount()); + server.setWorkThreadCount(nettyProps.getWorkerThreadCount()); + return server; + } +} diff --git a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808NettyServerAutoConfiguration.java b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808NettyServerAutoConfiguration.java index d68d8a97..b4f5c480 100644 --- a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808NettyServerAutoConfiguration.java +++ b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/config/configuration/Jt808NettyServerAutoConfiguration.java @@ -52,7 +52,8 @@ */ @Slf4j @Import({ - Jt808NettyServerAutoConfiguration.Jt808ServerParamPrinterConfig.class + Jt808NettyServerAutoConfiguration.Jt808ServerParamPrinterConfig.class, + Jt808AttachmentServerAutoConfiguration.class, }) public class Jt808NettyServerAutoConfiguration { diff --git a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/Jt808ServerProps.java b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/Jt808ServerProps.java index b147d746..56b02586 100644 --- a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/Jt808ServerProps.java +++ b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/Jt808ServerProps.java @@ -1,10 +1,11 @@ package io.github.hylexus.jt.jt808.boot.props; +import io.github.hylexus.jt.jt808.boot.props.attachment.AttachmentServerProps; import io.github.hylexus.jt.jt808.boot.props.builtin.BuiltComponentsProps; import io.github.hylexus.jt.jt808.boot.props.builtin.RequestSubPackageStorageProps; import io.github.hylexus.jt.jt808.boot.props.builtin.ResponseSubPackageStorageProps; -import io.github.hylexus.jt.jt808.boot.props.msg.processor.MsgProcessorProps; import io.github.hylexus.jt.jt808.boot.props.feature.Jt808FeatureProps; +import io.github.hylexus.jt.jt808.boot.props.msg.processor.MsgProcessorProps; import io.github.hylexus.jt.jt808.boot.props.protocol.ProtocolConfig; import io.github.hylexus.jt.jt808.boot.props.server.Jt808NettyTcpServerProps; import lombok.Getter; @@ -56,4 +57,7 @@ public class Jt808ServerProps { @NestedConfigurationProperty private Jt808FeatureProps features = new Jt808FeatureProps(); + + @NestedConfigurationProperty + private AttachmentServerProps attachmentServer = new AttachmentServerProps(); } diff --git a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/attachment/AttachmentServerProps.java b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/attachment/AttachmentServerProps.java new file mode 100644 index 00000000..d0c79336 --- /dev/null +++ b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/attachment/AttachmentServerProps.java @@ -0,0 +1,26 @@ +package io.github.hylexus.jt.jt808.boot.props.attachment; + +import io.github.hylexus.jt.jt808.boot.props.msg.processor.MsgProcessorProps; +import lombok.Data; +import lombok.ToString; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +import javax.validation.constraints.Min; + +@Data +@ToString +public class AttachmentServerProps { + private boolean enabled = false; + private int port = 6809; + + @Min(value = 0, message = "bossThreadCount >= 0, 0 means that Netty's default logical") + private int bossThreadCount = 0; + + @Min(value = 0, message = "workerThreadCount >= 0, 0 means that Netty's default logical") + private int workerThreadCount = 0; + + private int maxFrameLength = 1024 * 65; + + @NestedConfigurationProperty + private MsgProcessorProps msgProcessor = new MsgProcessorProps(); +} diff --git a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/msg/processor/MsgProcessorExecutorGroupProps.java b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/msg/processor/MsgProcessorExecutorGroupProps.java index e40ca686..ee5b5a61 100644 --- a/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/msg/processor/MsgProcessorExecutorGroupProps.java +++ b/jt-808-server-spring-boot-autoconfigure/src/main/java/io/github/hylexus/jt/jt808/boot/props/msg/processor/MsgProcessorExecutorGroupProps.java @@ -20,13 +20,13 @@ public class MsgProcessorExecutorGroupProps { @NotEmpty(message = "poolName is null or empty. defaultValue = '808-msg-processor' ") - private String poolName = "808-msg-processor"; + private String poolName = "808-attachment-processor"; /** * 默认: 128。 */ @Min(value = 1, message = "threadCount >= 1, defaultValue = 128") - private int threadCount = 128; + private int threadCount = 32; @Min(value = 16, message = "maxPendingTasks >= 16, defaultValue = 128") private int maxPendingTasks = 128; diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/JtProtocolConstant.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/JtProtocolConstant.java index 0027d2f8..1a100f98 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/JtProtocolConstant.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/JtProtocolConstant.java @@ -15,6 +15,7 @@ public interface JtProtocolConstant { String BEAN_NAME_JT808_INTERCEPTORS = "jt808Interceptors"; String BEAN_NAME_JT808_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP = "jt808MsgProcessorEventExecutorGroup"; + String BEAN_NAME_JT808_ATTACHMENT_MSG_PROCESSOR_EVENT_EXECUTOR_GROUP = "jt808AttachmentMsgProcessorEventExecutorGroup"; String BEAN_NAME_NETTY_HANDLER_NAME_808_HEART_BEAT = "Jt808NettyHeartBeatHandler"; String NETTY_HANDLER_NAME_808_IDLE_STATE = "Jt808NettyIdleStateHandler"; diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808Request.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808Request.java index 4053c2eb..7caa2ae6 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808Request.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808Request.java @@ -130,6 +130,10 @@ default Jt808Request copy() { // <== Shortcut Methods End // ++++++++++++++++++++++++++++++++++++++++++++ + static Jt808RequestBuilder newBuilder(MsgType msgType) { + return new DefaultJt808Request.DefaultJt808RequestBuilder(msgType); + } + interface Jt808RequestBuilder { Jt808RequestBuilder header(Jt808RequestHeader header); diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808RequestHeader.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808RequestHeader.java index d9b64421..fdb44597 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808RequestHeader.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/Jt808RequestHeader.java @@ -44,6 +44,10 @@ default int msgBodyLength() { String toString(); + static Jt808MsgHeaderBuilder newBuilder() { + return new DefaultJt808RequestHeader.DefaultJt808MsgHeaderBuilder(); + } + default Jt808MsgHeaderBuilder mutate() { return new DefaultJt808RequestHeader.DefaultJt808MsgHeaderBuilder(this); } diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1210Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1210Alias.java new file mode 100644 index 00000000..abd8bf9f --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1210Alias.java @@ -0,0 +1,60 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.req; + +import io.github.hylexus.jt.annotation.BuiltinComponent; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.Jt808RequestBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.RequestField; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.RequestFieldAlias; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.Accessors; + +import java.util.List; + +import static io.github.hylexus.jt.jt808.support.data.MsgDataType.LIST; + +/** + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808RequestBody +@BuiltinComponent +public class BuiltinMsg1210Alias { + + // byte[0,7) + @RequestFieldAlias.Bytes(order = 10, length = 7) + private String terminalId; + + // byte[7,23) + @RequestFieldAlias.Bytes(order = 20, length = 16) + private String alarmIdentifier; + + // byte[23,55) + @RequestFieldAlias.Bytes(order = 30, length = 32) + private String alarmNo; + + // byte[55] + @RequestFieldAlias.Byte(order = 40) + private short messageType; + + // byte[56] + @RequestFieldAlias.Byte(order = 50) + private short attachmentCount; + + // byte[57,...) + @RequestField(order = 60, dataType = LIST, lengthExpression = "#ctx.msgBodyLength() - 57") + private List attachmentItemList; + + @Data + @NoArgsConstructor + public static class AttachmentItem { + @RequestFieldAlias.Byte(order = 10) + private int length; + + @RequestFieldAlias.String(order = 20, lengthExpression = "#this.length") + private String fileName; + + @RequestFieldAlias.Dword(order = 30) + private long fileSize; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1211Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1211Alias.java new file mode 100644 index 00000000..28b48f2b --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1211Alias.java @@ -0,0 +1,30 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.req; + +import io.github.hylexus.jt.annotation.BuiltinComponent; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.Jt808RequestBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.RequestFieldAlias; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808RequestBody +@BuiltinComponent +public class BuiltinMsg1211Alias { + + // byte[0] + @RequestFieldAlias.Byte(order = 10) + private short fileNameLength; + + @RequestFieldAlias.String(order = 20, lengthExpression = "#this.fileNameLength") + private String fileName; + + @RequestFieldAlias.Byte(order = 30) + private short fileType; + + @RequestFieldAlias.Dword(order = 40) + private long fileSize; +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1212Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1212Alias.java new file mode 100644 index 00000000..5420b8c8 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg1212Alias.java @@ -0,0 +1,30 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.req; + +import io.github.hylexus.jt.annotation.BuiltinComponent; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.Jt808RequestBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.RequestFieldAlias; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808RequestBody +@BuiltinComponent +public class BuiltinMsg1212Alias { + + // byte[0] + @RequestFieldAlias.Byte(order = 10) + private short fileNameLength; + + @RequestFieldAlias.String(order = 20, lengthExpression = "#this.fileNameLength") + private String fileName; + + @RequestFieldAlias.Byte(order = 30) + private short fileType; + + @RequestFieldAlias.Dword(order = 40) + private long fileSize; +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg30316364Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg30316364Alias.java new file mode 100644 index 00000000..e6928772 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/req/BuiltinMsg30316364Alias.java @@ -0,0 +1,46 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.req; + +import io.github.hylexus.jt.annotation.BuiltinComponent; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.Jt808RequestBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.req.RequestFieldAlias; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * TJSATL12—2017道路运输车辆主动安全智能防控系统 $4.6.4--文件数据上传 + *

+ * 该消息不需要回复客户端 + * + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808RequestBody +@BuiltinComponent +public class BuiltinMsg30316364Alias { + + // BYTE[50] 文件名称 + @RequestFieldAlias.Bytes(order = 10, lengthExpression = "50") + private String fileName; + + // DWORD 数据偏移量 + @RequestFieldAlias.Dword(order = 20) + private long dataOffset; + + // DWORD 数据长度 + @RequestFieldAlias.Dword(order = 30) + private long dataLength; + + // BYTE[n] 数据体 默认长度64K,文件小于64K则为实际长度 + @RequestFieldAlias.Bytes(order = 40, lengthExpression = "#this.dataLength") + private byte[] data; + + @Override + public String toString() { + return "BuiltinMsg30316364Alias{" + + "fileName='" + fileName + '\'' + + ", dataOffset=" + dataOffset + + ", dataLength=" + dataLength + + '}'; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9208Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9208Alias.java new file mode 100644 index 00000000..1cae4400 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9208Alias.java @@ -0,0 +1,43 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.resp; + +import io.github.hylexus.jt.jt808.support.annotation.msg.resp.Jt808ResponseBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.resp.ResponseFieldAlias; +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808ResponseBody(msgId = 0x9208) +public class BuiltinMsg9208Alias { + + // BYTE + @ResponseFieldAlias.Byte(order = 10) + private short attachmentServerIpLength; + + // STRING + @ResponseFieldAlias.String(order = 20) + private String attachmentServerIp; + + // WORD + @ResponseFieldAlias.Word(order = 30) + private int attachmentServerPortTcp; + + // WORD + @ResponseFieldAlias.Word(order = 40) + private int attachmentServerPortUdp; + + // BYTE[16] + @ResponseFieldAlias.Bytes(order = 50) + private String alarmIdentifier; + + // BYTE[32] + @ResponseFieldAlias.Bytes(order = 60) + private String alarmNo; + + // BYTE[16] + @ResponseFieldAlias.Bytes(order = 70) + private String reservedByte16; +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9212Alias.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9212Alias.java new file mode 100644 index 00000000..08259b35 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/builtin/msg/resp/BuiltinMsg9212Alias.java @@ -0,0 +1,54 @@ +package io.github.hylexus.jt.jt808.spec.builtin.msg.resp; + +import io.github.hylexus.jt.jt808.support.annotation.msg.resp.Jt808ResponseBody; +import io.github.hylexus.jt.jt808.support.annotation.msg.resp.ResponseFieldAlias; +import lombok.Data; +import lombok.experimental.Accessors; + +import java.util.List; + +/** + * @author hylexus + */ +@Data +@Accessors(chain = true) +@Jt808ResponseBody(msgId = 0x9212) +public class BuiltinMsg9212Alias { + + // BYTE + @ResponseFieldAlias.Byte(order = 10) + private short fileNameLength; + + // 文件名称 + @ResponseFieldAlias.String(order = 20) + private String fileName; + + // 0x00:图片 + // 0x01:音频 + // 0x02:视频 + // 0x03:文本 + // 0x04:其它 + @ResponseFieldAlias.Byte(order = 30) + private short fileType; + + // 0x00:完成 + // 0x01:需要补传 + @ResponseFieldAlias.Byte(order = 40) + private byte uploadResult; + + // 补传数据包数量 需要补传的数据包数量,无补传时该值为0 + @ResponseFieldAlias.Byte(order = 50) + private short packageCountToReTransmit; + + // 补传数据包列表 + @ResponseFieldAlias.List(order = 60) + private List retransmitItemList; + + @Data + public static class RetransmitItem { + @ResponseFieldAlias.Dword(order = 10) + private long dataOffset; + @ResponseFieldAlias.Dword(order = 20) + private long dataLength; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/BuiltinJt808MsgType.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/BuiltinJt808MsgType.java index cc34f8dd..82734657 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/BuiltinJt808MsgType.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/BuiltinJt808MsgType.java @@ -51,6 +51,14 @@ public enum BuiltinJt808MsgType implements MsgType { // 1078 SERVER_RTM_REQUEST(0x9101, "实时音视频传输请求"), SERVER_RTM_CONTROL(0x9102, "实时音视频传输控制"), + + // 苏标 + CLIENT_MSG_1210(0x1210, "报警附件信息消息"), + CLIENT_MSG_1211(0x1211, "文件信息上传"), + CLIENT_MSG_1212(0x1212, "信令数据报文"), + CLIENT_MSG_30316364(0x30316364, "附件上传消息"), + SERVER_MSG_9208(0x9208, "报警附件上传指令"), + SERVER_MSG_9212(0x9212, "文件上传完成消息应答"), ; private static final Map mapping = new HashMap<>(BuiltinJt808MsgType.values().length); diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/request/DefaultJt808Request.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/request/DefaultJt808Request.java index 72ceb4db..c68c2f24 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/request/DefaultJt808Request.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/impl/request/DefaultJt808Request.java @@ -88,6 +88,10 @@ public static class DefaultJt808RequestBuilder implements Jt808RequestBuilder { private byte calculatedCheckSum; private final MsgType msgType; + public DefaultJt808RequestBuilder(MsgType msgType) { + this.msgType = msgType; + } + public DefaultJt808RequestBuilder(Jt808Request request) { this.header = request.header(); this.rawByteBuf = request.rawByteBuf(); diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808Session.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808Session.java index f19f634d..eebb0138 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808Session.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808Session.java @@ -28,6 +28,10 @@ public DefaultJt808Session(Jt808FlowIdGenerator delegateFlowIdGenerator) { this.attributes = new ConcurrentHashMap<>(); } + @Getter + @Setter + private Role role; + @Getter @Setter private String id; diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808SessionManager.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808SessionManager.java index 6ac8c57f..ed637111 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808SessionManager.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/DefaultJt808SessionManager.java @@ -35,7 +35,7 @@ public class DefaultJt808SessionManager implements Jt808SessionManager { private final List listeners = new ArrayList<>(); - private DefaultJt808SessionManager(Jt808FlowIdGeneratorFactory flowIdGeneratorFactory) { + protected DefaultJt808SessionManager(Jt808FlowIdGeneratorFactory flowIdGeneratorFactory) { this.flowIdGeneratorFactory = flowIdGeneratorFactory; } diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/InternalJt808SessionManager.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/InternalJt808SessionManager.java new file mode 100644 index 00000000..47e9f62b --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/InternalJt808SessionManager.java @@ -0,0 +1,164 @@ +package io.github.hylexus.jt.jt808.spec.session; + +import io.github.hylexus.jt.jt808.Jt808ProtocolVersion; +import io.netty.channel.Channel; + +import javax.annotation.Nullable; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public interface InternalJt808SessionManager { + /** + * @return 以 {@link Stream} 的形式返回当前 {@link Jt808Session} 列表 + */ + Stream list(); + + default Stream list(int page, int pageSize, Predicate filter, Comparator sorter) { + return list().filter(filter).sorted(sorter).skip((long) (page - 1) * pageSize).limit(pageSize); + } + + /** + * 以分页的形式获取 {@link Jt808SessionManager} 中的 {@link Jt808Session} + * + * @param page 当前页码,从1开始 + * @param pageSize 每页大小 + * @return 当前页的 {@link Jt808Session} 信息 + * @see #list() + * @see #list(int, int, Predicate, Comparator, Function) + */ + default List list(int page, int pageSize) { + return list(page, pageSize, session -> true); + } + + /** + * 以分页的形式获取 {@link Jt808SessionManager} 中的 {@link Jt808Session} + * + * @param page 当前页码,从1开始 + * @param pageSize 每页大小 + * @param filter 过滤器 + * @return 当前页的 {@link Jt808Session} 信息 + * @see #list() + * @see #list(int, int, Predicate, Comparator, Function) + */ + default List list(int page, int pageSize, Predicate filter) { + return list(page, pageSize, filter, Comparator.comparing(Jt808Session::terminalId), Function.identity()); + } + + /** + * 以分页的形式获取 {@link Jt808SessionManager} 中的 {@link Jt808Session} + * + * @param page 当前页码,从1开始 + * @param pageSize 每页大小 + * @param converter {@link Jt808Session} 到 {@link T} 的转换器 + * @param filter 过滤器 + * @param sorter 排序器 + * @param 结果类型 + * @return 当前页的 {@link Jt808Session} 信息 + * @see #list() + * @see #list(int, int) + */ + default List list(int page, int pageSize, Predicate filter, Comparator sorter, Function converter) { + return list(page, pageSize, filter, sorter).map(converter).collect(Collectors.toList()); + } + + /** + * @return 当前活跃的 {@link Jt808Session} 总数。 + */ + default long count() { + return this.count(session -> true); + } + + long count(Predicate filter); + + /** + * @param channel Channel + * @return sessionId + */ + default String generateSessionId(Channel channel) { + return channel.id().asLongText(); + } + + /** + * @param terminalId 终端号(手机号) + * @param version 协议版本号 + * @param channel Channel + * @return 生成的新Session + */ + Jt808Session generateSession(String terminalId, Jt808ProtocolVersion version, Channel channel); + + /** + * @param terminalId 终端号(手机号) + * @param version 协议版本号 + * @param channel Channel + * @return 如果已经存在,则刷新上次通信时间并返回Session。否则,生成新的Session并返回。 + * @see #persistenceIfNecessary(String, Jt808ProtocolVersion, Channel, boolean) + */ + default Jt808Session persistenceIfNecessary(String terminalId, Jt808ProtocolVersion version, Channel channel) { + return persistenceIfNecessary(terminalId, version, channel, true); + } + + /** + * @param terminalId 终端号(手机号) + * @param channel Channel + * @param updateLastCommunicateTime 是否更新上次通信时间 + * @return 如果已经存在,则刷新上次通信时间并返回Session。否则,生成新的Session并返回。 + */ + default Jt808Session persistenceIfNecessary(String terminalId, Jt808ProtocolVersion version, Channel channel, boolean updateLastCommunicateTime) { + final Optional session = findByTerminalId(terminalId, updateLastCommunicateTime); + if (session.isPresent()) { + return session.get(); + } + final Jt808Session newSession = generateSession(terminalId, version, channel); + persistence(newSession); + return newSession; + } + + /** + * @param session 持久化session + */ + void persistence(Jt808Session session); + + @Nullable + Jt808Session removeBySessionId(String sessionId); + + /** + * @param sessionId sessionId + * @param reason 关闭原因 + */ + void removeBySessionIdAndClose(String sessionId, SessionCloseReason reason); + + /** + * @param sessionId sessionId + * @return 当前 {@link Jt808Session} + */ + Optional findBySessionId(String sessionId); + + /** + * @param terminalId 终端号(手机号) + * @param updateLastCommunicateTime 是否更新上次通信时间 + * @return 当前 {@link Jt808Session} + */ + Optional findByTerminalId(String terminalId, boolean updateLastCommunicateTime); + + /** + * @param terminalId 终端号(手机号) + * @return 当前 {@link Jt808Session} + * @see Jt808SessionManager#findByTerminalId(String, boolean) + */ + default Optional findByTerminalId(String terminalId) { + return findByTerminalId(terminalId, false); + } + + default void sendBytesToClient(Jt808Session session, byte[] bytes) throws InterruptedException { + session.sendMsgToClient(bytes); + } + + Jt808SessionManager addListener(Jt808SessionEventListener listener); + + List getListeners(); +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808Session.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808Session.java index fdd0692b..086bbc14 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808Session.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808Session.java @@ -19,6 +19,24 @@ @BuiltinComponent public interface Jt808Session extends Jt808FlowIdGenerator { + enum Role { + /** + * 指令服务器对应的会话 + */ + INSTRUCTION, + /** + * 附件服务器对应的会话 + */ + ATTACHMENT, + } + + /** + * @since 2.1.4 + */ + default Role role() { + return Role.INSTRUCTION; + } + /** * @param bytes 待发送给客户端的数据 * @see #sendMsgToClient(ByteBuf) diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808SessionManager.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808SessionManager.java index f7255919..49b4af85 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808SessionManager.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/spec/session/Jt808SessionManager.java @@ -21,7 +21,7 @@ * * @author hylexus */ -public interface Jt808SessionManager { +public interface Jt808SessionManager extends InternalJt808SessionManager { /** * @return 以 {@link Stream} 的形式返回当前 {@link Jt808Session} 列表 diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808DispatchChannelHandlerAdapter.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808DispatchChannelHandlerAdapter.java new file mode 100644 index 00000000..0c18f35e --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808DispatchChannelHandlerAdapter.java @@ -0,0 +1,72 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + +import io.github.hylexus.jt.jt808.spec.Jt808RequestLifecycleListener; +import io.github.hylexus.jt.jt808.spec.Jt808RequestLifecycleListenerAware; +import io.github.hylexus.jt.jt808.support.utils.JtProtocolUtils; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; + +import static io.github.hylexus.jt.jt808.spec.session.DefaultSessionCloseReason.CHANNEL_INACTIVE; +import static io.github.hylexus.jt.jt808.spec.session.DefaultSessionCloseReason.SERVER_EXCEPTION_OCCURRED; + +/** + * @author hylexus + **/ +@Slf4j +@ChannelHandler.Sharable +public class AttachmentJt808DispatchChannelHandlerAdapter extends ChannelInboundHandlerAdapter implements Jt808RequestLifecycleListenerAware { + + private final AttachmentJt808SessionManager sessionManager; + private final AttachmentJt808RequestProcessor requestProcessor; + private Jt808RequestLifecycleListener requestLifecycleListener; + + public AttachmentJt808DispatchChannelHandlerAdapter(AttachmentJt808RequestProcessor requestProcessor, AttachmentJt808SessionManager sessionManager) { + this.requestProcessor = requestProcessor; + this.sessionManager = sessionManager; + } + + @Override + public void channelRead(@Nonnull ChannelHandlerContext ctx, @Nonnull Object msg) throws Exception { + if (msg instanceof ByteBuf) { + final ByteBuf buf = (ByteBuf) msg; + try { + if (buf.readableBytes() <= 0) { + return; + } + final boolean continueProcess = this.requestLifecycleListener.beforeDecode(buf, ctx.channel()); + if (!continueProcess) { + return; + } + this.requestProcessor.processJt808Request(buf, ctx.channel()); + } finally { + JtProtocolUtils.release(buf); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + sessionManager.removeBySessionIdAndClose(sessionManager.generateSessionId(ctx.channel()), SERVER_EXCEPTION_OCCURRED); + log.error("[exceptionCaught]", cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + if (log.isDebugEnabled()) { + log.warn("channelInactive, address={} ", ctx.channel().remoteAddress()); + } + sessionManager.removeBySessionIdAndClose(sessionManager.generateSessionId(ctx.channel()), CHANNEL_INACTIVE); + } + + @Override + public void setRequestLifecycleListener(Jt808RequestLifecycleListener requestLifecycleListener) { + this.requestLifecycleListener = requestLifecycleListener; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808RequestProcessor.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808RequestProcessor.java new file mode 100644 index 00000000..09324178 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808RequestProcessor.java @@ -0,0 +1,8 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; + +public interface AttachmentJt808RequestProcessor { + void processJt808Request(ByteBuf byteBuf, Channel channel) throws Exception; +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808SessionManager.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808SessionManager.java new file mode 100644 index 00000000..bd88547d --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/AttachmentJt808SessionManager.java @@ -0,0 +1,6 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + +import io.github.hylexus.jt.jt808.spec.session.InternalJt808SessionManager; + +public interface AttachmentJt808SessionManager extends InternalJt808SessionManager { +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/DelimiterAndLengthFieldBasedByteToMessageDecoder.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/DelimiterAndLengthFieldBasedByteToMessageDecoder.java new file mode 100644 index 00000000..c9eec924 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/DelimiterAndLengthFieldBasedByteToMessageDecoder.java @@ -0,0 +1,74 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import java.util.List; + +public class DelimiterAndLengthFieldBasedByteToMessageDecoder extends ByteToMessageDecoder { + + static class InternalLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder { + + public InternalLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip); + } + + // 将父类的 protected final void decode(ChannelHandlerContext, ByteBuf, List) 改成 public 的 + public final void doDecode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + super.decode(ctx, in, out); + } + + } + + static class InternalDelimiterBasedFrameDecoder extends DelimiterBasedFrameDecoder { + + public InternalDelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) { + super(maxFrameLength, delimiter); + } + + // 将父类的 protected final void decode(ChannelHandlerContext, ByteBuf, List) 改成 public 的 + public final void doDecode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + super.decode(ctx, in, out); + } + + } + + private final ByteBuf prefix; + private final InternalLengthFieldBasedFrameDecoder lengthFieldDecoder; + private final InternalDelimiterBasedFrameDecoder delimiterDecoder; + + public DelimiterAndLengthFieldBasedByteToMessageDecoder(int delimiterBasedFrameMaxFrameLength, int lengthFieldBasedFrameMaxFrameLength) { + this.prefix = Unpooled.copiedBuffer(new byte[]{0x30, 0x31, 0x63, 0x64}); + final ByteBuf delimiter = Unpooled.copiedBuffer(new byte[]{0x7e}); + this.lengthFieldDecoder = new InternalLengthFieldBasedFrameDecoder(lengthFieldBasedFrameMaxFrameLength, 58, 4, 0, 0); + this.delimiterDecoder = new InternalDelimiterBasedFrameDecoder(delimiterBasedFrameMaxFrameLength, delimiter); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if (this.startWith(in, prefix)) { + this.lengthFieldDecoder.doDecode(ctx, in, out); + } else { + this.delimiterDecoder.doDecode(ctx, in, out); + } + } + + boolean startWith(ByteBuf buf, ByteBuf prefix) { + final int readableBytes = prefix.readableBytes(); + if (readableBytes > buf.readableBytes()) { + return false; + } + + for (int i = 0, j = buf.readerIndex(); i < readableBytes; i++, j++) { + if (prefix.getByte(i) != buf.getByte(j)) { + return false; + } + } + return true; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808AttachmentServerNettyConfigure.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808AttachmentServerNettyConfigure.java new file mode 100644 index 00000000..7a08181d --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808AttachmentServerNettyConfigure.java @@ -0,0 +1,54 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + +import io.github.hylexus.jt.annotation.BuiltinComponent; +import io.github.hylexus.jt.netty.JtServerNettyConfigure; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.EventExecutorGroup; +import lombok.Data; + +import static io.github.hylexus.jt.jt808.JtProtocolConstant.NETTY_HANDLER_NAME_808_FRAME; +import static io.github.hylexus.jt.jt808.JtProtocolConstant.NETTY_HANDLER_NAME_808_MSG_DISPATCHER_ADAPTER; + +public interface Jt808AttachmentServerNettyConfigure extends JtServerNettyConfigure { + + @BuiltinComponent + class DefaultJt808AttachmentServerNettyConfigure implements Jt808AttachmentServerNettyConfigure { + private final DefaultJt808AttachmentServerNettyConfigure.BuiltInServerBootstrapProps serverBootstrapProps; + private final ChannelInboundHandlerAdapter jt808DispatchChannelHandlerAdapter; + private final EventExecutorGroup eventExecutorGroup; + + public DefaultJt808AttachmentServerNettyConfigure( + DefaultJt808AttachmentServerNettyConfigure.BuiltInServerBootstrapProps serverBootstrapProps, + ChannelInboundHandlerAdapter channelHandlerAdapter, + EventExecutorGroup eventExecutorGroup) { + + this.serverBootstrapProps = serverBootstrapProps; + this.jt808DispatchChannelHandlerAdapter = channelHandlerAdapter; + this.eventExecutorGroup = eventExecutorGroup; + } + + @Override + public void configureSocketChannel(SocketChannel ch) { + ch.pipeline().addLast( + NETTY_HANDLER_NAME_808_FRAME, + new DelimiterAndLengthFieldBasedByteToMessageDecoder( + serverBootstrapProps.getDelimiterBasedFrameMaxFrameLength(), + serverBootstrapProps.getLengthFieldBasedFrameMaxFrameLength() + ) + ); + ch.pipeline().addLast(this.eventExecutorGroup, NETTY_HANDLER_NAME_808_MSG_DISPATCHER_ADAPTER, jt808DispatchChannelHandlerAdapter); + } + + @Data + public static class BuiltInServerBootstrapProps { + final int delimiterBasedFrameMaxFrameLength; + final int lengthFieldBasedFrameMaxFrameLength; + + public BuiltInServerBootstrapProps(int delimiterBasedFrameMaxFrameLength, int lengthFieldBasedFrameMaxFrameLength) { + this.delimiterBasedFrameMaxFrameLength = delimiterBasedFrameMaxFrameLength; + this.lengthFieldBasedFrameMaxFrameLength = lengthFieldBasedFrameMaxFrameLength; + } + } + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808NettyTcpAttachmentServer.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808NettyTcpAttachmentServer.java new file mode 100644 index 00000000..73032550 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/Jt808NettyTcpAttachmentServer.java @@ -0,0 +1,79 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment; + +import io.github.hylexus.jt.utils.AbstractRunner; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.Future; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.lang.NonNull; + +@Slf4j +@Setter +@Getter +public class Jt808NettyTcpAttachmentServer extends AbstractRunner { + + private final Jt808AttachmentServerNettyConfigure serverConfigure; + private EventLoopGroup bossGroup = null; + private EventLoopGroup workerGroup = null; + private Integer port; + private Integer workThreadCount; + private Integer bossThreadCount; + + public Jt808NettyTcpAttachmentServer(String name, Jt808AttachmentServerNettyConfigure serverConfigure) { + super(name); + this.serverConfigure = serverConfigure; + } + + private void bind() throws Exception { + this.bossGroup = new NioEventLoopGroup(bossThreadCount); + this.workerGroup = new NioEventLoopGroup(workThreadCount); + ServerBootstrap serverBootstrap = new ServerBootstrap(); + + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(@NonNull SocketChannel ch) { + serverConfigure.configureSocketChannel(ch); + } + }); + + serverConfigure.configureServerBootstrap(serverBootstrap); + + log.info("----> netty tcp attachment server started, port = {}", this.port); + ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); + + channelFuture.channel().closeFuture().sync(); + } + + @Override + public void doProcessBlocked() throws Exception { + this.bind(); + } + + @Override + public void onDestroy() throws Exception { + this.stopServer(); + } + + public synchronized void stopServer() throws Exception { + + Future future = this.workerGroup.shutdownGracefully().await(); + if (!future.isSuccess()) { + log.error("<---- netty workerGroup cannot be stopped", future.cause()); + } + + future = this.bossGroup.shutdownGracefully().await(); + if (!future.isSuccess()) { + log.error("<---- netty bossGroup cannot be stopped", future.cause()); + } + } + +} \ No newline at end of file diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/DefaultAttachmentJt808SessionManager.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/DefaultAttachmentJt808SessionManager.java new file mode 100644 index 00000000..7b4722cd --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/DefaultAttachmentJt808SessionManager.java @@ -0,0 +1,22 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment.impl; + +import io.github.hylexus.jt.jt808.Jt808ProtocolVersion; +import io.github.hylexus.jt.jt808.spec.session.DefaultJt808Session; +import io.github.hylexus.jt.jt808.spec.session.DefaultJt808SessionManager; +import io.github.hylexus.jt.jt808.spec.session.Jt808FlowIdGeneratorFactory; +import io.github.hylexus.jt.jt808.spec.session.Jt808Session; +import io.github.hylexus.jt.jt808.support.extension.attachment.AttachmentJt808SessionManager; +import io.netty.channel.Channel; + +public class DefaultAttachmentJt808SessionManager extends DefaultJt808SessionManager implements AttachmentJt808SessionManager { + public DefaultAttachmentJt808SessionManager(Jt808FlowIdGeneratorFactory flowIdGeneratorFactory) { + super(flowIdGeneratorFactory); + } + + @Override + public Jt808Session generateSession(String terminalId, Jt808ProtocolVersion version, Channel channel) { + final DefaultJt808Session session = (DefaultJt808Session) super.generateSession(terminalId, version, channel); + session.role(Jt808Session.Role.ATTACHMENT); + return session; + } +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/SimpleAttachmentJt808RequestProcessor.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/SimpleAttachmentJt808RequestProcessor.java new file mode 100644 index 00000000..8fa8c839 --- /dev/null +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/extension/attachment/impl/SimpleAttachmentJt808RequestProcessor.java @@ -0,0 +1,156 @@ +package io.github.hylexus.jt.jt808.support.extension.attachment.impl; + +import io.github.hylexus.jt.jt808.spec.Jt808Request; +import io.github.hylexus.jt.jt808.spec.Jt808RequestHeader; +import io.github.hylexus.jt.jt808.spec.Jt808Response; +import io.github.hylexus.jt.jt808.spec.Jt808ServerExchange; +import io.github.hylexus.jt.jt808.spec.impl.BuiltinJt808MsgType; +import io.github.hylexus.jt.jt808.spec.impl.DefaultJt808MsgBodyProps; +import io.github.hylexus.jt.jt808.spec.impl.DefaultJt808ServerExchange; +import io.github.hylexus.jt.jt808.spec.impl.response.DefaultJt808Response; +import io.github.hylexus.jt.jt808.spec.session.Jt808Session; +import io.github.hylexus.jt.jt808.support.codec.Jt808MsgDecoder; +import io.github.hylexus.jt.jt808.support.codec.Jt808RequestRouteExceptionHandler; +import io.github.hylexus.jt.jt808.support.dispatcher.Jt808DispatcherHandler; +import io.github.hylexus.jt.jt808.support.exception.Jt808UnknownMsgException; +import io.github.hylexus.jt.jt808.support.extension.attachment.AttachmentJt808RequestProcessor; +import io.github.hylexus.jt.jt808.support.extension.attachment.AttachmentJt808SessionManager; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SimpleAttachmentJt808RequestProcessor implements AttachmentJt808RequestProcessor { + public static final AttributeKey SESSION_ATTR_KEY = AttributeKey.newInstance("attachment-" + Jt808Session.class.getName()); + public static final byte[] ATTACHMENT_REQUEST_PREFIX = {0x30, 0x31, 0x63, 0x64}; + private final Jt808RequestRouteExceptionHandler routeExceptionHandler; + private final Jt808MsgDecoder decoder; + private final Jt808DispatcherHandler dispatcherHandler; + private final AttachmentJt808SessionManager sessionManager; + + public SimpleAttachmentJt808RequestProcessor( + Jt808MsgDecoder decoder, + AttachmentJt808SessionManager sessionManager, + Jt808RequestRouteExceptionHandler routeExceptionHandler, + Jt808DispatcherHandler dispatcherHandler) { + this.decoder = decoder; + this.routeExceptionHandler = routeExceptionHandler; + this.dispatcherHandler = dispatcherHandler; + this.sessionManager = sessionManager; + } + + @Override + public void processJt808Request(ByteBuf buf, Channel channel) { + if (startWith(buf, ATTACHMENT_REQUEST_PREFIX)) { + final Jt808Session session = this.getSession(channel); + final Jt808RequestHeader requestHeader; + if (session == null) { + requestHeader = Jt808RequestHeader.newBuilder() + .version(null) + .msgType(BuiltinJt808MsgType.CLIENT_MSG_30316364.getMsgId()) + .msgBodyProps(new DefaultJt808MsgBodyProps(0)) + .terminalId(null) + .flowId(0) + .subPackageProps(null) + .build(); + } else { + requestHeader = Jt808RequestHeader.newBuilder() + .version(session.protocolVersion()) + .msgType(BuiltinJt808MsgType.CLIENT_MSG_30316364.getMsgId()) + .msgBodyProps(new DefaultJt808MsgBodyProps(0)) + .terminalId(session.terminalId()) + .flowId(0) + .subPackageProps(null) + .build(); + } + final Jt808Request request = Jt808Request.newBuilder(BuiltinJt808MsgType.CLIENT_MSG_30316364) + .header(requestHeader) + .rawByteBuf(null) + // 跳过 0x30316364 4字节 + .body(buf.readerIndex(buf.readerIndex() + 4)) + .originalCheckSum((byte) 0) + .calculatedCheckSum((byte) 0) + .build(); + this.doDispatch(request, session); + } else { + this.doProcessAttachmentJt808Request(buf, channel); + } + } + + private void doProcessAttachmentJt808Request(ByteBuf buf, Channel channel) { + Jt808Session jt808Session = null; + Jt808Request request = null; + try { + try { + request = decoder.decode(buf); + final Jt808RequestHeader header = request.header(); + final String terminalId = header.terminalId(); + jt808Session = this.persistenceSessionIfNecessary(channel, request); + // jt808Session = sessionManager.persistenceIfNecessary(terminalId, header.version(), channel); + + if (log.isDebugEnabled()) { + log.debug("[decode] : {}, terminalId={}, msg = {}", request.msgType(), terminalId, request); + } + } catch (Jt808UnknownMsgException unknownMsgException) { + this.routeExceptionHandler.onReceiveUnknownMsg(unknownMsgException.getMsgId(), unknownMsgException.getPayload()); + return; + } catch (Exception e) { + if (request != null) { + request.release(); + } + throw e; + } + doDispatch(request, jt808Session); + // this.msgDispatcher.doDispatch(request); + } catch (Throwable throwable) { + try { + log.error("", throwable); + // commonExceptionHandler.handleException(null, ArgumentContext.of(request, jt808Session, new Jt808NettyException(throwable))); + throw throwable; + } catch (Throwable e) { + log.error("An error occurred while invoke ExceptionHandler", e); + } + } + } + + // 这个 session 对应的是上传文件的连接,而不是指令服务器对应的普通 808 连接 + private Jt808Session persistenceSessionIfNecessary(Channel channel, Jt808Request request) { + final Jt808Session session = this.getSession(channel); + if (session != null) { + return session; + } + final Jt808Session attachmentSession = sessionManager.generateSession(request.terminalId(), request.version(), channel); + channel.attr(SESSION_ATTR_KEY).set(attachmentSession); + return attachmentSession; + } + + private Jt808Session getSession(Channel channel) { + return channel.attr(SESSION_ATTR_KEY).get(); + } + + private void doDispatch(Jt808Request originalRequest, Jt808Session jt808Session) { + Jt808ServerExchange exchange = null; + Jt808Response originalResponse = null; + try { + originalResponse = DefaultJt808Response.init(originalRequest.version(), originalRequest.terminalId()); + exchange = new DefaultJt808ServerExchange(originalRequest, originalResponse, jt808Session); + dispatcherHandler.handleRequest(exchange); + } finally { + if (exchange != null && exchange.response() != originalResponse) { + originalResponse.release(); + } + } + } + + + private boolean startWith(ByteBuf buf, byte[] bytes) { + for (int i = 0, j = buf.readerIndex(); i < bytes.length; i++, j++) { + if (buf.getByte(j) != bytes[i]) { + return false; + } + } + return true; + } + +} diff --git a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/netty/Jt808NettyTcpServer.java b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/netty/Jt808NettyTcpServer.java index 53abb13a..ac35c2cf 100644 --- a/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/netty/Jt808NettyTcpServer.java +++ b/jt-808-server-support/src/main/java/io/github/hylexus/jt/jt808/support/netty/Jt808NettyTcpServer.java @@ -43,7 +43,7 @@ private void bind() throws Exception { serverConfigure.configureServerBootstrap(serverBootstrap); - log.info("----> netty tcp server started, port = {}", this.port); + log.info("----> netty tcp instruction server started, port = {}", this.port); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channelFuture.channel().closeFuture().sync(); diff --git a/jt-core/src/main/java/io/github/hylexus/jt/netty/JtServerNettyConfigure.java b/jt-core/src/main/java/io/github/hylexus/jt/netty/JtServerNettyConfigure.java index 7a4600c8..4083c766 100644 --- a/jt-core/src/main/java/io/github/hylexus/jt/netty/JtServerNettyConfigure.java +++ b/jt-core/src/main/java/io/github/hylexus/jt/netty/JtServerNettyConfigure.java @@ -1,6 +1,7 @@ package io.github.hylexus.jt.netty; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; /** @@ -8,7 +9,12 @@ */ public interface JtServerNettyConfigure { - void configureServerBootstrap(ServerBootstrap serverBootstrap); + default void configureServerBootstrap(ServerBootstrap serverBootstrap) { + serverBootstrap + .option(ChannelOption.SO_BACKLOG, 2048) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true); + } void configureSocketChannel(SocketChannel ch); diff --git a/samples/jt-808-server-sample-debug/src/main/java/io/github/hylexus/jt/jt808/samples/debug/DebugAnnotationBasedExceptionHandler.java b/samples/jt-808-server-sample-debug/src/main/java/io/github/hylexus/jt/jt808/samples/debug/DebugAnnotationBasedExceptionHandler.java index 85d987be..becdd542 100644 --- a/samples/jt-808-server-sample-debug/src/main/java/io/github/hylexus/jt/jt808/samples/debug/DebugAnnotationBasedExceptionHandler.java +++ b/samples/jt-808-server-sample-debug/src/main/java/io/github/hylexus/jt/jt808/samples/debug/DebugAnnotationBasedExceptionHandler.java @@ -3,6 +3,7 @@ import io.github.hylexus.jt.jt808.support.annotation.handler.Jt808ExceptionHandler; import io.github.hylexus.jt.jt808.support.annotation.handler.Jt808RequestHandlerAdvice; import io.github.hylexus.jt.jt808.support.exception.Jt808HandlerNotFoundException; +import io.github.hylexus.jt.utils.FormatUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -26,6 +27,6 @@ public void processIllegalStateException(IllegalStateException illegalStateExcep @Jt808ExceptionHandler(value = Jt808HandlerNotFoundException.class) public void processJt808HandlerNotFoundException(Jt808HandlerNotFoundException exception) { - log.error("没有处理器来处理 msgId={} 的消息", exception.getRequest().header().msgId()); + log.error("没有处理器来处理 msgId={}(0x{}) 的消息", exception.getRequest().header().msgId(), FormatUtils.toHexString(exception.getRequest().header().msgId())); } } diff --git a/samples/jt-808-server-sample-debug/src/main/resources/application.yml b/samples/jt-808-server-sample-debug/src/main/resources/application.yml index 60e3cd2c..23f8ff0f 100644 --- a/samples/jt-808-server-sample-debug/src/main/resources/application.yml +++ b/samples/jt-808-server-sample-debug/src/main/resources/application.yml @@ -23,6 +23,21 @@ jt808: caffeine: maximum-size: 100 ttl: 3m + ## 苏标附件服务器 + attachment-server: + enabled: true + # 附件服务器端口(TCP) + port: 6809 + boss-thread-count: 2 + worker-thread-count: 4 + ## 0x30316364附件上传报文的最大长度(66560 = 1024 * 65) + max-frame-length: 66560 + # 处理附件相关指令(0x1210,0x1211,0x1212,0x6364)的线程池线配置 + msg-processor: + executor-group: + max-pending-tasks: 64 + pool-name: 808-attachment-processor + thread-count: 16 logging: level: