1.netty调研记录
项目中准备用netty框架来实现socket接口,对于netty的性能做了个初步调研,大致过程如下:
1.1 调用socket接口的客户端
为了让客户端快速发送数据,我们已经提前将需要传输的数据通过java的ObjectInputStream写数据到了一个文件,主要是节省客户端接口的编码,然后让客户端不断循环发送数据一段时间(比如20分钟,时间是可以指定),客户端代码如下:
import java.io.FileNotFoundException; import java.io.IOException; import java.io.ObjectInputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; public class SocketTask implements Runnable { private long sentTime=0L; private long endTime=0L; private int packageIndex=-1; private String ip = "127.0.0.1"; private int port = 2005; private long sleep = 0L; private boolean sleepFlag = true; public SocketTask(String ip, int port, int packageType, long sentTime, long sleep) { super(); this.ip = ip; this.port = port; this.packageIndex = packageType; this.sentTime = sentTime; this.sleep = sleep; } public void run() { if (this.sentTime > 0L && this.port > 0) { if(this.sleep>0) { this.sleepFlag = true; }else{ this.sleepFlag = false; } this.endTime = System.currentTimeMillis() + this.sentTime; testSocket(); } } private void testSocket() { Socket socket = null; OutputStream os = null; try { // 1.建立客户端socket连接,指定服务器位置及端口 socket = new Socket(this.ip, this.port); // 2.得到socket读写流 os = socket.getOutputStream(); // InputStream is = socket.getInputStream(); // BufferedReader br = new BufferedReader(new // InputStreamReader(is)); // 3.socket账号验证 //MessageBind.txt已经存储了socket接口账号验证的包数据,直接读取发送 byte[] MessageBind = stream2Bytes("MessageBind.txt"); os.write(MessageBind); os.flush(); // 4.socket发送数据 byte[][] byteArray = new byte[6][]; //MessageSendRecord_*_*k.txt已经将要发送的数据包以二进制流写到文件,这里直接读取,避免了编码 byteArray[0] = stream2Bytes("MessageSendRecord_190_10k.txt");// 10K byteArray[1] = stream2Bytes("MessageSendRecord_380_20k.txt");// 20K byteArray[2] = stream2Bytes("MessageSendRecord_570_30k.txt");// 30K byteArray[3] = stream2Bytes("MessageSendRecord_760_40k.txt");// 40K byteArray[4] = stream2Bytes("MessageSendRecord_950_50k.txt");// 50K byteArray[5] = stream2Bytes("MessageSendRecord_1140_60k.txt");// 60K java.util.Random r = new java.util.Random(); int index = 0; boolean flag = (this.packageIndex >= 0 && this.packageIndex <= 5); while (true) { if (this.endTime < System.currentTimeMillis()) break; if (flag) { index = this.packageIndex; } else { //随机选取一个包发送数据 index = Math.abs(r.nextInt()) % 6; } os.write(byteArray[index]); os.flush(); //这里休眠只是为了模拟客户端在收集数据和编码所耗时间 if(this.sleepFlag){ Thread.currentThread().sleep(this.sleep); } } } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ if(null!=os){ try { os.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } os = null; } if(null!=socket){ try { socket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } socket = null; } } } /** * 读取文件中的二进制数据 * @param filename * @return */ private byte[] stream2Bytes(String filename) { byte[] encodeBytes = null; try { ObjectInputStream ois = new ObjectInputStream(this.getClass().getClassLoader().getResourceAsStream(filename)); encodeBytes = (byte[]) ois.readObject(); System.out.println(encodeBytes); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return encodeBytes; } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class JavaSocketClient{ public static void main(String[] args) { //检查参数 checkParameters(args); //解析参数 String ip = args[0];//被测试socket接口的IP int port = Integer.parseInt(args[1]);//被测试socket接口的端口 int concurrentNumber =Integer.parseInt(args[2]);//多少并发去发送数据 int packageType = Integer.parseInt(args[3]);//发送哪个大小的数据包,10k 20k 30k 40k 50k 60k long sentTime = Long.parseLong(args[4]);//发送多长时间 long sleep = Long.parseLong(args[5]);//每发送一次休眠多长时间,休眠的时间可以理解为客户端在收集数据或者编码数据 ExecutorService executorService = Executors.newFixedThreadPool(concurrentNumber); for(int count=0;count<concurrentNumber;count++){ executorService.execute(new SocketTask(ip, port,packageType, sentTime,sleep)); } } private static void checkParameters(String[] args) { if (args.length != 6) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>"); System.exit(1); } if ("".equals(args[0])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,ip is null"); System.exit(1); } if ("".equals(args[1])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,port shoud be a number"); System.exit(1); } if ("".equals(args[2])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,concurrentNumber shoud be a number"); System.exit(1); } if ("".equals(args[3])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,packageType shoud be a number"); System.exit(1); } if ("".equals(args[4])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,sentTime shoud be a number"); System.exit(1); } if ("".equals(args[5])) { System.err.println("Usage: Test <ip> <port> <concurrentNumber> <packageType(-1,0,1,2,3,4,5)> <sentTime> <sleep>,sleep shoud be a number"); System.exit(1); } } }
1.2 netty服务端大致代码如下
bossGroup = new NioEventLoopGroup(1); //主线程池 //netty里面默认通过Runtime.getRuntime().availableProcessors()获取机器逻辑CPU个数,建议NioEventLoopGroup值在(逻辑CPU个数~逻辑CPU个数*2)之间 // 逻辑CPU的个数=物理CPU个数*物理CPU中core的个数(即核数)*超线程数 //# 查看物理CPU个数:cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l //# 查看每个物理CPU中core的个数(即核数):cat /proc/cpuinfo| grep "cpu cores"| uniq //# 查看逻辑CPU的个数:cat /proc/cpuinfo| grep "processor"| wc -l workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors()*2); //工作线程池,默认为CPU个数的2倍,也即linux逻辑CPU的个数 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) /** * ChannelOption.SO_BACKLOG: SO_BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时, 用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50 **/ .option(ChannelOption.SO_BACKLOG, 1024) //最大等待队列数 /** * ChannelOption.TCP_NODELAY: * 在TCP/IP协议中,无论发送多少数据,总是要在数据前面加上协议头,同时,对方接收到数据, * 也需要发送ACK表示确认。为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。 * 这里就涉及到一个名为Nagle的算法,该算法的目的就是为了尽可能发送大块数据,避免网络中充斥着许多小数据块。 * * TCP_NODELAY就是用于启用或关闭Nagle算法。如果要求高实时性,有数据发送时就马上发送, * 就将该选项设置为true关闭Nagle算法;如果要减少发送次数减少网络交互,就设置为false等累积一定大小后再发送。 * 默认为false。 **/ .option(ChannelOption.TCP_NODELAY, true) //Nagle算法 /** ChannelOption.SO_KEEPALIVE: 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态) 并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。 **/ // .option(ChannelOption.SO_KEEPALIVE, true) //服务端主动检测客户端是否存活 // .option(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(false)) // true 直接内存/ false heap内存 /** * ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF, * ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF, * 这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据, * 直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。 */ .option(ChannelOption.SO_SNDBUF, 256 * 1024 ) //256K 双方协定 .option(ChannelOption.SO_RCVBUF, 256 * 1024 ) //256K 双方协定 //.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new Decoder()); pipeline.addLast("encoder", new Encoder()); pipeline.addLast("readTimeoutHandler",new ReadTimeoutHandler(Integer.parseInt(data.getString("server.readTimeout")))); pipeline.addLast("exception", new ExceptionHandler()); pipeline.addLast("auth",new BindCheckHandler()); // pipeline.addLast("sendRecord", new SendRecordHandler()); //这里单独设置SendRecordHandler使用的线程池,不设置的话默认使用的是workerGroup线程池 pipeline.addLast(businessGroup, "sendRecord", new SendRecordHandler()); pipeline.addLast("checkLink",new CheckLinkHandler()); pipeline.addLast("unbind",new UnbindHandler()); } }); ChannelFuture f = b.bind(PORT).sync(); logger.info("netty服务器已经启动"); isOpen = true; f.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); }
1.3 单线程socket客户端测试的结果如下:
单线程socket客户端跑20分钟,分别测试休眠1毫秒和不休眠传输速度和占用带宽资源
发送间隔(休眠时间) | 发送包 | 速度(行/秒) | 均值(Bytes/秒) | 占用带宽 |
1毫秒 | 20K,380行记录 | 344054 | 18772555 | 143Mb |
0毫秒 | 20K,380行记录 | 1985689 | 107018186 | 816Mb |
基本上千兆带宽在20分钟测试过程中被消耗殆尽,但是netty服务端性能杠杠的,带宽是唯一的制约因素。
2.kafka调研结果记录
场景:两台kafka,新建topic=topictest,partitions=6,replication-factor=2,ACK=ALL(每个parttion的2个replication同时写完才算写完)
3.hdfs测试记录
场景:4台datanode
磁盘RAID对于写入速度的影响
4.hbase调研记录
4.1 发起测试的机器性能对于hbase写入速度的影响
4.2 发起测试的并发线程数对于hbase写入速度的影响
4.3 发起测试的批量大小对于hbase写入速度的影响(hbase.client.write.buffer=2MB)
4.4 拆分数据表影响测试
相关推荐
涵盖了Java基础、Linux基础、Hadoop生态、分布式数据库HBase、数据仓库Hive、数据迁移工具Sqoop、Flume分布式日志框架、Zookeeper分布式协调服务、Netty异步IO通信框架、Kafka消息队列、Storm实时计算框架等重要主题...
项目描述:Flume+Kafka+MapReduce+Storm+Spark+HDFS+Hbase+Redis+Solr,拉手有很多历史数据,包括用户数据、商品数据、商家数据、用户交易记录、用户位置轨迹信息。 职责描述: * 数据清洗,清除作弊、刷单、代购等...
- **Hbase调优(读、写、设计)**:提升HBase性能的方法。 - **数据仓库Hive**:介绍Hive的基本概念、集群架构、HiveQL与SQL的比较。 - **外部表和分区表**:学习Hive中外部表和分区表的区别及应用场景。 - **...
Netty是一个异步IO通信框架,可用于构建高性能的服务。Kafka和Storm分别用于消息发布订阅和实时数据处理。Storm提供了低延迟的数据处理能力,其编程模型包括Topology、Spout和Bolt,且可以与Kafka结合以增强消息可靠...
4. **分布式数据库与数据仓库**:Hbase是一个分布式的NoSQL数据库,与RDBMS进行对比,学习其系统架构、MapReduce在Hbase上的应用、Hbase的核心术语和操作,包括基本的Shell操作和设计优化。Hive则是一种基于Hadoop的...
Netty异步IO通信框架的学习,以及Zookeeper如何与Netty和Redis集成,展现了分布式系统中的协调与通信。Kafka和Storm的介绍,以及Scala和Spark的使用,为实时数据处理和流计算打下了基础。最后,Docker虚拟化技术和...
大数据相关的框架包括Storm(实时处理)、Flink(流处理和批处理)、Spark(快速大数据处理)、HBase(NoSQL数据库)、Doris(OLAP数据仓库)以及ClickHouse(高性能分析型数据库)。 日志收集与分析工具有Filebeat...
Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。 #### Redis 客户端 - **Jedis**: 工具。Jedis 是Redis的Java客户端,提供了丰富的命令接口。 - **Redisson**: 框架。...
lang-java【技术体系】/ JAVA核心: 多线程并发编程、网络...大数据相关: Zookeeper集群(协调服务)、Kafka(高速数据管道)、HBase(列式数据库-列簇 & rowkey)、Hadoop(HDFS分布式文件存储)app-samplesjust sa
在独立模式下,Hadoop的安装与测试涉及配置HDFS和MapReduce。Hadoop的启动与停止涉及NameNode、DataNode、ResourceManager、NodeManager等核心组件。YARN作为Hadoop的资源调度框架,负责任务分配和资源管理。事件...
8. **Netty与RPC**:Netty是高性能的异步网络通信框架,RPC则是远程过程调用,简化了分布式系统间的通信。 9. **一致性算法**:如Paxos、Raft等,用于保证分布式系统的一致性。 【其他相关知识点】 1. **数据库...
已经更新300+篇〜关注〜...RPC八,大数据框架基石之网路通信-Netty 第二部分:大数据框架学习篇本部分引用作者heibaiying,大佬写的文章非常好,欢迎大家关注他的博客。我个人会持续补充深度和实战性的文章〜一,Ha