最近使用JT809协议进行数据对接,遇到了不少问题,度娘谷歌都不好使,找不到很好的客户端实现代码的例子,只能苦逼的自己闷头弄,现在特意写篇帖子,希望能帮助一些人
说正经的:
背景:跟某公司做数据对接,将本公司的一些信息推送到接收端
要求:建立tcp链接,使用接收端提供的用户名密码等信息 先登录,登录成功后推送数据,数据采用JT809标准协议
实现语言:java
下面介绍具体实现,包涵完整代码
在这之前,最好先下载jt809协议,研究研究,网上就有,我找到的里面还有一些错误别,一度让我觉得是个盗版货
首先说下整体结构:一个tcp客户端,一个Decoder,一个心跳Handler,一个处理反馈的handler,一个消息对象
通过tcp客户端链接制定ip地址和端口,进行登陆,心跳handler负责在发送数据空闲时向目标服务器发送请求保持包;decoder负责解码组装对象,反馈handler负责处理收到的反馈信息,消息对象我就不多说了
下面看代码:
TCPclient:
由于时间紧迫,代码结构有些臃肿,我使用的tcpclient为两个,一个是基类,一个子类,其实这两个可以合并到一起,这里有心的朋友自己去和,下面来看下tcpclient基类的代码,其中有一些模块的引用因为涉及到公司信息,所以我删除掉了,大家参考的时候,引用自己编写的就可以了,
//包路径,自己填 import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.logging.LoggingHandler; import org.jboss.netty.handler.timeout.IdleStateHandler; import org.jboss.netty.logging.InternalLogLevel; import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TcpClient{ private static final Logger LOG = LoggerFactory.getLogger(TcpClient.class); private static final int DEFAULT_PORT = 9000; private long connectTimeoutMillis = 3000; private int port = DEFAULT_PORT; private boolean tcpNoDelay = false; private boolean reuseAddress = true; private boolean keepAlive = true; private int workerCount = 4; private ClientBootstrap bootstrap = null; private static Channel channel = null; private Executor bossExecutor = Executors.newCachedThreadPool(); private Executor workerExecutor = Executors.newCachedThreadPool(); private static TcpClient instance = new TcpClient(); private TcpClient() { init(); } public static TcpClient getInstence(){ return instance; } public void init() { bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( bossExecutor, workerExecutor, workerCount)); bootstrap.setOption("tcpNoDelay", tcpNoDelay); bootstrap.setOption("connectTimeoutMillis", connectTimeoutMillis); bootstrap.setOption("reuseAddress", reuseAddress); bootstrap.setOption("keepAlive", keepAlive); } public Channel getChannel(String address, int port) { if (null == channel || !channel.isOpen()) { bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024); bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024); bootstrap.setPipelineFactory(new ChannelPipelineFactory(){ @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); // pipeline.addLast("loging", new LoggingHandler(InternalLogLevel.ERROR)); 打印日志信息,上线稳定后可去掉 pipeline.addLast("timeout", new IdleStateHandler(new HashedWheelTimer(), 10, 60, 0));//设置空闲心跳机制 pipeline.addLast("heartbeat", new HeartBeatHandler());//心跳发送包处理handler pipeline.addLast("decode", new Decoder());//解码 pipeline.addLast("loginHandler", new RecevieHandler());//反馈数据处理 return pipeline; } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress( address, port)); future.awaitUninterruptibly(); if (future.isSuccess()) { channel = future.getChannel(); } else { throw new FrameworkRuntimeException(future.getCause()); } } return channel; } public long getConnectTimeoutMillis() { return connectTimeoutMillis; } public void setConnectTimeoutMillis(long connectTimeoutMillis) { this.connectTimeoutMillis = connectTimeoutMillis; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public boolean isTcpNoDelay() { return tcpNoDelay; } public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; } public boolean isReuseAddress() { return reuseAddress; } public void setReuseAddress(boolean reuseAddress) { this.reuseAddress = reuseAddress; } public boolean isKeepAlive() { return keepAlive; } public void setKeepAlive(boolean keepAlive) { this.keepAlive = keepAlive; } public int getWorkerCount() { return workerCount; } public void setWorkerCount(int workerCount) { this.workerCount = workerCount; } }
上面的配置,用过netty都不会陌生,就不多说了,下面看下子类的实现
<pre name="code" class="java">//包路径,自己填
import java.nio.charset.Charset;import java.text.NumberFormat;import java.util.Calendar;import java.util.Date;import java.util.Properties;import org.apache.commons.lang3.StringUtils;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.buffer.ChannelBuffers;import org.jboss.netty.channel.Channel;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class TcpClientFuJian{private static Logger LOG = LoggerFactory.getLogger(TcpClientFuJian.class);public static int PLANT_CODE;//公司介入码public static int ZUCHE_ID_FUJIAN;//公司用户名public static String ZUCHE_PWD_FUJIAN;//公司密码public static String LONGINSTATUS = "";public static String LOGINING = "logining";private static int LOGIN_FLAG = 0;private static String DOWN_LINK_IP = "127.0.0.1";//初始化基类private static TcpClient tcpClient = TcpClient.getInstence();//初始化private static TcpClientFuJian tcpClientFujian = new TcpClientFuJian();//初始化配置,这里时读取配置文件,使用的是公司内部的工具,各位自己实现就好,并不复杂private static Properties property = PropertiesReader.getProperties("aws2xiamen", true);//初始化channel,以便心跳机制及时登录private Channel channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);public static TcpClientFuJian getInstance(){//加在车辆数据;由于我们推送的车辆位置信息,所以这里初始化一部分制定的数据InitVehicleData.initVehicleMap();//获取本机IP对应的用户名密码,使用的是公司内部的工具,各位自己实现就好,并不复杂String localIp = IpUtils.getLocalIp();if(StringUtils.isNotBlank(localIp)){String properties = property.getProperty(localIp);if(StringUtils.isNotBlank(localIp)){String[] pros = properties.split(":");PLANT_CODE = Integer.parseInt(pros[0]);ZUCHE_ID = Integer.parseInt(pros[1]);ZUCHE_PWD = pros[2];}}else{LOG.error("获取本机IP异常");}return tcpClientFujian;}/** * 判断是否登录 * boolean * @return */public boolean isLogined(){
return Constants.LOGIN_SUCCESS.equals(LONGINSTATUS); //Constants常量类,自己随便定义就好,LOGIN_STATAUS="0x00"
}/** * 登录接入平台 * boolean * @return */public boolean login2FuJianGov(){boolean success = false;if(!Constants.LOGIN_SUCCESS.equals(LONGINSTATUS) && !LOGINING.equals(LONGINSTATUS)){//开始登录 Message为数据对象,代码稍后给出Message msg = new Message(JT809Constants.UP_CONNECT_REQ);ChannelBuffer buffer = ChannelBuffers.buffer(46);buffer.writeInt(ZUCHE_ID);//4byte[] pwd = getBytesWithLengthAfter(8, ZUCHE_PWD.getBytes());buffer.writeBytes(pwd);//8byte[] ip = getBytesWithLengthAfter(32, DOWN_LINK_IP.getBytes());buffer.writeBytes(ip);//32buffer.writeShort((short)Constants.TCP_RESULT_PORT);//2msg.setMsgBody(buffer);channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);channel.write(buildMessage(msg));LONGINSTATUS = LOGINING;}return success;}public static ChannelBuffer buildMessage(Message msg){int bodyLength = 0 ;if(null != msg.getMsgBody()){bodyLength = msg.getMsgBody().readableBytes();}msg.setMsgGesscenterId(PLANT_CODE);ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(bodyLength + Message.MSG_FIX_LENGTH);ChannelBuffer headBuffer = ChannelBuffers.buffer(22);//---数据头headBuffer.writeInt(buffer.capacity() - 1);//4headBuffer.writeInt(msg.getMsgSn());//4headBuffer.writeShort((short)msg.getMsgId());//2headBuffer.writeInt(msg.getMsgGesscenterId());//4headBuffer.writeBytes(msg.getVersionFlag());//3headBuffer.writeByte(0);//1headBuffer.writeInt(10);//4buffer.writeBytes(headBuffer);//---数据体if(null != msg.getMsgBody()){buffer.writeBytes(msg.getMsgBody());}ChannelBuffer finalBuffer = ChannelBuffers.copiedBuffer(buffer);//--crc校验码byte[] b = ChannelBuffers.buffer(finalBuffer.readableBytes()).array();finalBuffer.getBytes(0, b);int crcValue = CRC16CCITT.crc16(b);finalBuffer.writeShort((short)crcValue);//2//转义byte[] bytes = ChannelBuffers.copiedBuffer(finalBuffer).array();ChannelBuffer headFormatedBuffer = ChannelBuffers.dynamicBuffer(finalBuffer.readableBytes());formatBuffer(bytes, headFormatedBuffer);ChannelBuffer buffera = ChannelBuffers.buffer(headFormatedBuffer.readableBytes() + 2);buffera.writeByte(Message.MSG_HEAD);buffera.writeBytes(headFormatedBuffer);buffera.writeByte(Message.MSG_TALL);return ChannelBuffers.copiedBuffer(buffera);}/** * 发送数据到接入平台 * boolean * @param awsVo 是上层程序得到的带发送的数据对象,可以看自己的需求,替换 * @return */public boolean sendMsg2FuJianGov(Idc2AwsGpsVo awsVo){boolean success = false;if(isLogined()){//已经登录成功,开始发送数据channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);if(null != channel && channel.isWritable()){Message msg = buildSendVO(awsVo);ChannelBuffer msgBuffer = buildMessage(msg);channel.write(msgBuffer);}else{LONGINSTATUS = "";}}else if(LOGIN_FLAG == 0){LOGIN_FLAG ++;login2FuJianGov();LOG.error("--------------第一次登录");}else{LOG.error("--------------等待登录");}return success;}/** * 转换VO * void * @param awsVo */private Message buildSendVO(Idc2AwsGpsVo awsVo){Message msg = new Message(JT809Constants.UP_EXG_MSG);ChannelBuffer buffer = ChannelBuffers.buffer(36);//是否加密buffer.writeByte((byte)0);//0未加密 // 1//日月年dmyyCalendar cal = Calendar.getInstance();cal.setTime(new Date());buffer.writeByte((byte)cal.get(Calendar.DATE));buffer.writeByte((byte)(cal.get(Calendar.MONTH) + 1));String hexYear = "0" + Integer.toHexString(cal.get(Calendar.YEAR));buffer.writeBytes(hexStringToByte(hexYear));//4//时分秒buffer.writeByte((byte)cal.get(Calendar.HOUR));buffer.writeByte((byte)cal.get(Calendar.MINUTE));buffer.writeByte((byte)cal.get(Calendar.SECOND));//3//经度,纬度buffer.writeInt(formatLonLat(awsVo.getLon()));//4buffer.writeInt(formatLonLat(awsVo.getLat()));//4//速度buffer.writeShort(awsVo.getSpeed().shortValue());//2//行驶记录速度buffer.writeShort(awsVo.getSpeed().shortValue());//2//车辆当前总里程数buffer.writeInt(awsVo.getMileage().intValue());//4//方向buffer.writeShort(awsVo.getDirection().shortValue());//2//海拔buffer.writeShort((short)0);//2//车辆状态buffer.writeInt(1);//4//报警状态buffer.writeInt(0);//0表示正常;1表示报警//4ChannelBuffer headBuffer = ChannelBuffers.buffer(buffer.capacity() + 28);headBuffer.writeBytes(getBytesWithLengthAfter(21 , awsVo.getVehicleNo().getBytes(Charset.forName("GBK"))));//21 车牌号headBuffer.writeByte((byte)1);//1 车牌颜色:注意不是车身颜色headBuffer.writeShort(JT809Constants.UP_EXG_MSG_REAL_LOCATION);//2 子业务码headBuffer.writeInt(buffer.capacity());//4headBuffer.writeBytes(buffer);msg.setMsgBody(headBuffer);return msg;}/** * 报文转义 * void * @param bytes * @param formatBuffer */private static void formatBuffer(byte[] bytes, ChannelBuffer formatBuffer){for (byte b : bytes) {switch(b){case 0x5b:byte[] formatByte0x5b = new byte[2];formatByte0x5b[0] = 0x5a;formatByte0x5b[1] = 0x01;formatBuffer.writeBytes(formatByte0x5b);break;case 0x5a:byte[] formatByte0x5a = new byte[2];formatByte0x5a[0] = 0x5a;formatByte0x5a[1] = 0x02;formatBuffer.writeBytes(formatByte0x5a);break;case 0x5d:byte[] formatByte0x5d = new byte[2];formatByte0x5d[0] = 0x5e;formatByte0x5d[1] = 0x01;formatBuffer.writeBytes(formatByte0x5d);break;case 0x5e:byte[] formatByte0x5e = new byte[2];formatByte0x5e[0] = 0x5e;formatByte0x5e[1] = 0x02;formatBuffer.writeBytes(formatByte0x5e);break;default: formatBuffer.writeByte(b);break;}}}/** * 16进制字符串转换成byte数组 * byte[] * @param hex */ public static byte[] hexStringToByte(String hex) { hex = hex.toUpperCase(); int len = (hex.length() / 2); byte[] result = new byte[len]; char[] achar = hex.toCharArray(); for (int i = 0; i < len; i++) { int pos = i * 2; result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1])); } return result; } private static byte toByte(char c) { byte b = (byte) "0123456789ABCDEF".indexOf(c); return b; } /** * 格式化经纬度,保留六位小数 * int * @param needFormat * @return */ private int formatLonLat(Double needFormat){ NumberFormat numFormat = NumberFormat.getInstance();numFormat.setMaximumFractionDigits(6);numFormat.setGroupingUsed(false);String fristFromat = numFormat.format(needFormat);Double formatedDouble = Double.parseDouble(fristFromat);numFormat.setMaximumFractionDigits(0);String formatedValue = numFormat.format(formatedDouble * 1000000);return Integer.parseInt(formatedValue); } /** * 补全位数不够的定长参数 有些定长参数,实际值长度不够,在后面补0x00 * byte[] * @param length * @param pwdByte * @return */private byte[] getBytesWithLengthAfter(int length, byte[] pwdByte){byte[] lengthByte = new byte[length];for(int i = 0; i < pwdByte.length; i ++){lengthByte[i] = pwdByte[i];}for (int i = 0; i < (length- pwdByte.length); i++) {lengthByte[pwdByte.length + i] = 0x00;}return lengthByte;}public static void main(String[] args) {TcpClientFuJian s = TcpClientFuJian.getInstance();Idc2AwsGpsVo awsVo = new Idc2AwsGpsVo();awsVo.setDirection(120);awsVo.setLon(117.2900911);awsVo.setLat(39.56362);awsVo.setSpeed(45D);awsVo.setMileage(10001D);awsVo.setVehicleNo("幽123D32");s.sendMsg2FuJianGov(awsVo);try {Thread.sleep(20*1000);} catch (InterruptedException e) {e.printStackTrace();}s.sendMsg2FuJianGov(awsVo);}}这里其实也没有太多好说的,整体说一下,先按照协议拼接数据,拼接完成后,计算CRC,进行转义,由于协议规定开始标记为0x5b,结束标记为0x5d,数据中可能包涵这些标记,所以必须转移,具体转换规则,在协议中就能找到,有一点要注意,一定是先计算crc,再转一,将否则无法通过crc校验,我就是在这里吃了亏,耗了一天,再别人的指点下才解决,协议中crc计算使用的是crc16-ccitt标准,这里找了好多实现,每个计算结果都不一样,我也没有仔细研究,我用的是没问题的,代码如下
public class JT809Constants { public static int UP_CONNECT_REQ = 0x1001;//主链路登录请求消息 public static int UP_CONNECT_RSP = 0x1002;//主链路登录应答消息 public static int UP_CONNECT_RSP_SUCCESS = 0x00;//登录成功 public static int UP_CONNECT_RSP_ERROR_01 = 0x01;//IP 地址不正确 public static int UP_CONNECT_RSP_ERROR_02 = 0x02;//接入码不正确 public static int UP_CONNECT_RSP_ERROR_03 = 0x03;//用户没注册 public static int UP_CONNECT_RSP_ERROR_04 = 0x04;//密码错误 public static int UP_CONNECT_RSP_ERROR_05 = 0x05;//资源紧张,稍后再连接(已经占 用); public static int UP_CONNECT_RSP_ERROR_06 = 0x06;//其他 public static int UP_DICONNECE_REQ = 0x1003;//主链路注销请求消息 public static int UP_DISCONNECT_RSP = 0x1004;//主链路注销应答消息 public static int UP_LINKETEST_REQ = 0x1005;//主链路连接保持请求消息 public static int UP_LINKTEST_RSP = 0x1006;//主链路连接保持应答消息 public static int UP_DISCONNECT_INFORM = 0x1007;//主链路断开通知消息 public static int UP_CLOSELINK_INFORM = 0x1008;//下级平台主动关闭链路通 知消息 public static int DOWN_CONNECT_REQ = 0x9001;//从链路连接请求消息 public static int DOWN_CONNECT_RSP = 0x9002;//从链路连接应答消息 public static int DOWN_DISCONNECT_REQ = 0x9003;//从链路注销请求消息 public static int UP_EXG_MSG = 0x1200;//主链路动态信息交换消息 public static int UP_EXG_MSG_REAL_LOCATION = 0x1202;//实时上传车辆定位信息 public static int UP_EXG_MSG_HISTORY_LOCATION = 0x1203;//车辆定位信息自动补报 public static int DOWN_EXG_MSG = 0x9200;//从链路动态信息交换消息 } import java.io.Serializable; import java.util.Arrays; import org.jboss.netty.buffer.ChannelBuffer; public class Message implements Serializable{ private static final long serialVersionUID = 4398559115325723920L; public static final int MSG_HEAD = 0x5b; public static final int MSG_TALL = 0x5d; //报文中除数据体外,固定的数据长度 public static final int MSG_FIX_LENGTH = 26; private static int internalMsgNo = 0; private long msgLength; private long encryptFlag=1; private int msgGesscenterId; private long encryptKey; private int crcCode; private int msgId; private int msgSn; private ChannelBuffer msgBody; private byte[] versionFlag = {0,0,1}; //下行报文标识,值为1时,代表发送的数据;默认为0,代表接收的报文 // private int downFlag = 0; public Message(){} public Message(int msgId){ //下行报文需要填充报文序列号 synchronized((Integer)internalMsgNo) { if(internalMsgNo == Integer.MAX_VALUE){ internalMsgNo = 0; } } this.msgSn = ++internalMsgNo; this.msgId = msgId; //this.downFlag = 1; } public static int getInternalMsgNo() { return internalMsgNo; } public static void setInternalMsgNo(int internalMsgNo) { Message.internalMsgNo = internalMsgNo; } public long getMsgLength() { return msgLength; } public void setMsgLength(long msgLength) { this.msgLength = msgLength; } public long getEncryptFlag() { return encryptFlag; } public void setEncryptFlag(long encryptFlag) { this.encryptFlag = encryptFlag; } public int getMsgGesscenterId() { return msgGesscenterId; } public void setMsgGesscenterId(int msgGesscenterId) { this.msgGesscenterId = msgGesscenterId; } public void setMsgGesscenterId(long msgGesscenterId) { this.msgGesscenterId = (int)msgGesscenterId; } public long getEncryptKey() { return encryptKey; } public void setEncryptKey(long encryptKey) { this.encryptKey = encryptKey; } public int getCrcCode() { return crcCode; } public void setCrcCode(int crcCode) { this.crcCode = crcCode; } public int getMsgId() { return msgId; } public void setMsgId(int msgId) { this.msgId = msgId; } public int getMsgSn() { return msgSn; } public void setMsgSn(int msgSn) { this.msgSn = msgSn; } public ChannelBuffer getMsgBody() { return msgBody; } public void setMsgBody(ChannelBuffer msgBody) { this.msgBody = msgBody; } public byte[] getVersionFlag() { return versionFlag; } public void setVersionFlag(byte[] versionFlag) { this.versionFlag = versionFlag; } public static int getMsgHead() { return MSG_HEAD; } public static int getMsgTall() { return MSG_TALL; } public static int getMsgFixLength() { return MSG_FIX_LENGTH; } @Override public String toString() { return "Message [msgLength=" + msgLength + ", encryptFlag=" + encryptFlag + ", msgGesscenterId=" + msgGesscenterId + ", encryptKey=" + encryptKey + ", crcCode=" + crcCode + ", msgId=" + msgId + ", msgSn=" + msgSn + ", msgBody=" + msgBody + ", versionFlag=" + Arrays.toString(versionFlag) + "]"; } }
下面是Decoder
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class Decoder extends FrameDecoder { @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { int head = buffer.getByte(0); int tail = buffer.getByte(buffer.capacity() - 1); if( !(head == Message.MSG_HEAD && tail == Message.MSG_TALL)){ return null; } buffer.skipBytes(1); Message msg = this.buildMessage(buffer); return msg; } private Message buildMessage(ChannelBuffer buffer){ Message msg = new Message(); msg.setMsgLength(buffer.readUnsignedInt()); msg.setMsgSn(buffer.readInt());//4byte msg.setMsgId(buffer.readUnsignedShort());//2byte msg.setMsgGesscenterId(buffer.readUnsignedInt());//4byte msg.setVersionFlag(buffer.readBytes(3).array());//3byte msg.setEncryptFlag(buffer.readUnsignedByte());//1byte msg.setEncryptKey(buffer.readUnsignedInt());//4byte if(buffer.readableBytes() >= 9){ buffer.skipBytes(buffer.readableBytes() - 8); } ChannelBuffer bodyBytes = buffer.readBytes(buffer.readableBytes() -3); msg.setMsgBody(bodyBytes); msg.setCrcCode(buffer.readUnsignedShort());//2byte buffer.skipBytes(1); return msg; } }
这个decoder值得说一下,有一些朋友应该能看出来
if(buffer.readableBytes() >= 9){ buffer.skipBytes(buffer.readableBytes() - 8); }
这段代码很突兀,其实我也很郁闷,登陆的时候收到的反馈信息,信息长度竟然不是一样的,有的时候长度是32,有的时候长度是31,长度是32的时候,如果不跳过一位,那么解析出来的反馈状态码就是错误的
下面是recevieHandler,
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RecevieHandler extends SimpleChannelHandler{ private static Logger LOG = LoggerFactory.getLogger(RecevieHandler.class); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e){ Message msg = (Message) e.getMessage(); LOG.error("应答----------------" + "0x" + Integer.toHexString(msg.getMsgId())); if(msg.getMsgId() == JT809Constants.UP_CONNECT_RSP){ ChannelBuffer msgBody = msg.getMsgBody(); int result = msgBody.readByte(); if(result == JT809Constants.UP_CONNECT_RSP_SUCCESS){ TcpClientFuJian.LONGINSTATUS = Constants.LOGIN_SUCCESS; LOG.error("------------------登录成功"); }else{ LOG.error("------------------登录异常,请检查" + "0x0" + Integer.toHexString(result)); } } } }
根据JT809协议,应该是一个主链路,一个从链路,从链路用来接收下发的消息,实际上不需要的,推送数据的,在主链路登录时会有反馈,从链路可以忽略,但是从链路的ip和端口,是必填参数,随便给就可以了,
上面经过decoder之后得到Message,receiveHandler就是来判断是否登陆,登陆成功的话,就可以发送数据
这里涉及登陆时机的问题,先看下心跳handler,我们再说登陆问题
HeartBeatHandler如下:
import org.apache.commons.lang3.StringUtils; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.timeout.IdleState; import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; import org.jboss.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zuche.us.gpsidc2aws.constant.JT809Constants; import com.zuche.us.gpsidc2aws.tcpclient.vo.Message; public class HeartBeatHandler extends IdleStateAwareChannelHandler { private static Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class); @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception{ if(StringUtils.isBlank(TcpClientFuJian.LONGINSTATUS) || TcpClientFuJian.LOGINING.equals(TcpClientFuJian.LONGINSTATUS)){ TcpClientFuJian.getInstance().login2FuJianGov(); LOG.error("利用空闲心跳去登录------ 开始登录"); } if(e.getState() == IdleState.WRITER_IDLE){ LOG.error("链路空闲,发送心跳!"); Message msg = new Message(JT809Constants.UP_LINKETEST_REQ); e.getChannel().write(TcpClientFuJian.buildMessage(msg)); super.channelIdle(ctx, e); } } }
心跳包中包含了登陆,这个怎么说呢,没想到什么好办法进行登陆失败重登陆情况,因为一个不小心,可能就会一直在登陆
所以在发送数据时如果发现没登陆,那么进行登陆,通过flag来判断,如果登陆过了,那就不管了,成功就发数据,失败就等着,等到心跳包发送心跳时去检测是否登陆成功了,没成功的话让心跳包去再登陆一次,这样省了不少代码,就是肯能比较混乱,这个看个人想法了
只要能实现就行,
最后时crc16工具类
public class CRC16CCITT { public static int crc16(byte[] bytes){ int crc = 0xFFFF; for (int j = 0; j < bytes.length ; j++) { crc = ((crc >>> 8) | (crc << 8) )& 0xffff; crc ^= (bytes[j] & 0xff);//byte to int, trunc sign crc ^= ((crc & 0xff) >> 4); crc ^= (crc << 12) & 0xffff; crc ^= ((crc & 0xFF) << 5) & 0xffff; } crc &= 0xffff; return crc; } }
我用的是直接计算的方式,还有一种查表法,这里就不说了,百度下应该能找到
到此,客户端实现就完成了
不过由于我们是多台服务器推送数据,所以必须建立多个tcp链接,接收端之前没这么收过数据,中间经历了一点儿小波折,后台接收端根据我们的服务器数量给开通了多个账号,
多个账号就涉及到哪台服务器用哪个账号的问题,所以就有了上面代码中的配置文件
文件内容很简单
ip=用户名:介入码:密码
例如:
127.0.0.1=101:101:666666
为每台服务器制定好所使用的账号;
另外车牌颜色,颜色的状态需要使用JT/T415-2006标准协议,这个挺难找的,我直接把颜色和状态码贴出来,方便大家参考
JT/T415-2006
1:蓝色
2:黄色
3:黑色
4:白色
9:其他
至此整个客户端完满,经过测试,完美上线,没有出现问题,有一点稍微注意下,代码中发送数据的时候,时间的小时的12进制的,看接收端的要求,可以改成24小时制的
修改方法很简单,将Calendar.HOUR 改成Calendar.HOUR_OF_DAY 就可以
本地开发的时候,可能不知道发送的数据对不对,这个对应的接收端应该都有解析工具,可以要来自己去校验
差不多了再进行联调
也可以自己模拟一个简单的接收端,来进行测试,这个代码晚上有现成的,这里就不多说了。
希望能帮助大家,如果又问题,可以随时联系我
相关推荐
针对JT809-2019 协议java版本 针对JT809-2019 协议java版本 针对JT809-2019 协议java版本
本项目使用JAVA + MINA + ServiceLoader实现交通部809协议服务端代码。代码中完成了主链路部分,包括对客户端登录验证及应答,注销及应答,保持连接及应答,接收实时定位数据及历史定位数据等。如果扩充新业务只需要...
JT809协议 java 加解密
交通部809协议解析,开发语言版本为java版本.本次代码作为上级平台开发,主要是接入下级平台车辆定位数据
java解析jt809协议客户端 交通部JT809协议
部标809协议,服务端对接代码,可以对接gps信息
交通部809协议源码(java开发,基于apache-mina框架)。未全部实现功能,仅实现server端主链路,从链路未做,默认链路不加密,可满足常规接入实时车辆GPS定位等功能,需实现更多业务逻辑请自行丰富MsgCallBack类。 ...
为了测试JT808协议各种数据格式,以及自定义数据上报至服务端,服务端是否能正确识别并接收,返回指定的数据格式,从而验证数据格式的正确性,以及通信的是否能成功,附加操作手册以及概要的程序设计文档,需要的可以下载...
JT808协议是中国的一种通用的车载终端通信协议,主要用于车辆监控、GPS定位、行驶数据记录以及车辆安全预警等功能。这个模拟器允许用户在无需实际车载设备的情况下,模拟出各种车辆状态和行驶数据,以便于开发和测试...
JT_1078音视频传输协议是一种专用于监控系统中的实时音视频数据传输标准,主要应用于安防监控、智能交通等领域。本指南将深入探讨该协议的原理、结构及其在实际开发中的应用。 首先,我们要了解JT_1078协议的基础...
最近在开发809-2019协议的客户端及服务端,发现原交通部协议文档PDF不能复制,不能搜索,要查找协议比较麻烦。于是我添加了书签,非常方便查找,跳转。 首页 目次 前言 1 范围 2 规范性引用文件 3 属于和定义...
以上这些是针对“JT808车载定位器|定位器|宠物定位器二次开发协议对接”主题的主要技术点和相关知识。通过深入理解和掌握这些内容,开发者能够构建出稳定、高效且功能丰富的定位系统。而"gpstrackdemo"可能是项目的...
在Java中,通过Socket类和ServerSocket类,开发者可以实现基于TCP的网络通信。 TCP协议在处理客户-服务器事务时存在两个主要问题。一是三次握手过程会增加一个往返时间(RTT),导致事务响应时间增加。二是由于...
基于Netty,实现JT808 JT/T808部标协议的消息处理,与编码解码; 使用SpringBoot + MyBatis提供数据入库、Web接口服务; 协议部分不依赖Spring,可移除Spring独立运行(支持Android客户端); 最简洁、清爽、易用的...
2. **数据传输协议支持**:物联网设备可能使用不同的通信协议,如MQTT、CoAP、HTTP等,网关需要解析并转换这些协议,以实现不同设备间的互通。 3. **数据过滤与处理**:网关可能需要对收集到的数据进行预处理,比如...
Maven管理整个项目的构建和依赖,最后,处理过的数据通过RTMP协议发送到流媒体服务器,实现推流功能。由于代码可能存在一定的混乱,建议开发者在维护时进行代码重构和文档编写,以提高代码的可读性和可维护性。