接上一篇,通过命令行或执行kafka.tools.ConsumerOffsetChecker的main方法,都只能把结果显示在标准输出流中,如果我想实时展示这些数据咋办呢? 这时就就需要把这些信息读出来。代码如下:
package com.wxj.kafka.monitor.jmx; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.HashMap; import java.util.Map; /** * 在java程序中,调用kafka-run-class.sh的中kafka.tools.ConsumerOffsetChecker类,查看各个topic及分区的消费情况 * 如果要调用scala.kafka.tools类,只需要简单修改一个main里面的cmd * @author root * */ public class SHRunClass { public static void main(String[] args) { if(args == null || args.length == 0) { System.out.println("zkconnect,group are required"); System.exit(1); } String cmd = "/home/hadoop/cdh44/kafka_2.10-0.8.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker"; System.out.println("输入参数:"); for(int i = 0 ; i < args.length ; i ++) { System.out.println(args[i]); cmd += " " + args[i]; } //cmd = "java -version"; Map<String, String> resultMap = execCmd(cmd); //printResult(resultMap); if("0".equals(resultMap.get("returnCode"))) { System.out.println("执行成功"); //System.out.println(resultMap.get("inStream")); //linux下用\n String[] lines = resultMap.get("inStream").split("@@"); String[] headers = lines[0].split("\t"); System.out.println("header:"); for(String header : headers) { System.out.println(header); } System.out.println("value:"); for(int i = 1 ; i < lines.length ; i ++) { String[] values = lines[i].split("\t"); for(String value : values) { System.out.print(value + "\t"); } System.out.println(); } } else { System.out.println("执行失败:"); System.out.println(resultMap.get("errStream").replaceAll("@@", "\n")); } } private static void printResult(Map<String, String> resultMap) { System.out.println("result:\n\n"); for(Map.Entry<String, String> entry : resultMap.entrySet()) { System.out.println(entry.getKey() + "," + entry.getValue()); } } /** * 执行命令 * @param cmd * @return */ private static Map<String, String> execCmd(String cmd) { Map<String, String> resultMap = new HashMap<String, String>(); try { Runtime rt = Runtime.getRuntime(); Process proc = rt.exec(cmd); ReadInputStreamThread inputStreamThread = new ReadInputStreamThread(proc); inputStreamThread.start(); inputStreamThread.join(); ReadErrInputStreamThread errInputStreamThread = new ReadErrInputStreamThread(proc); errInputStreamThread.start(); errInputStreamThread.join(); resultMap.put("returnCode", proc.waitFor() + ""); resultMap.put("inStream", inputStreamThread.getResult().toString()); resultMap.put("errStream", errInputStreamThread.getResult().toString()); } catch (Throwable t) { t.printStackTrace(); } return resultMap; } private static class ReadInputStreamThread extends Thread { private Process proc; private StringBuffer result = new StringBuffer(); public StringBuffer getResult() { return result; } private BufferedReader reader = null; public ReadInputStreamThread(Process proc) throws Exception { this.proc = proc; reader = new BufferedReader(new InputStreamReader(proc.getInputStream(), "utf-8")); } public void run() { String line = null; try { while((line = reader.readLine()) != null) { result.append(line); result.append("@@"); } } catch (Exception e) { e.printStackTrace(); } finally { if(reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } } private static class ReadErrInputStreamThread extends Thread { private Process proc; private StringBuffer result = new StringBuffer(); public StringBuffer getResult() { return result; } private BufferedReader reader = null; public ReadErrInputStreamThread(Process proc) throws Exception { this.proc = proc; reader = new BufferedReader(new InputStreamReader(proc.getErrorStream(), "utf-8")); } public void run() { String line = null; try { while((line = reader.readLine()) != null) { result.append(line); result.append("@@"); } } catch (Exception e) { e.printStackTrace(); } finally { if(reader != null) { try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
由于只是测试,所以在main方法中,把路径硬编码了。
在linux下的执行结果如下:
相关推荐
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...
说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...
4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/...
标题中的“KAFKA-3.1.1-1.3.1.1.p0.2-el7.parcel”指的是Apache Kafka的一个特定版本的软件包,适用于Red Hat Enterprise Linux 7(RHEL 7)系统。这个版本是3.1.1,并且可能是某个特定发行版或修补版的1.3.1.1.p0.2...
kafka-manager.jar 配置application.conf中的zk地址后可直接启动 bin/kafka-manager -Dconfig.file=/kafka-manager-2.0.0.2/conf/application.conf -Dhttp.port=8888
赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API...
《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库...
Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源...
- **启动服务**:执行bin/kafka-eagle-start.sh脚本启动服务,然后在浏览器中输入http://服务器IP:8080访问Web界面。 3. **使用指南** - **登录界面**:首次访问会提示设置默认管理员账号。 - **监控页面**:...
3. **启动**:执行启动脚本,通常为`bin/kafka-manager`,Kafka Manager服务即开始运行。 4. **访问Web界面**:默认情况下,Kafka Manager的Web界面会在本地8080端口提供服务,打开浏览器输入`...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...
标题 "KAFKA-3.0.0-1.3.0.0.p0.40-el7" 暗示我们正在讨论的是 Apache Kafka 的一个特定版本,这里是3.0.0,针对的是CDH(Cloudera Data Hub)环境,版本号为1.3.0.0.p0.40,适配的是EL7(CentOS 7)操作系统。...
- 启动应用,执行 `./bin/kafka-manager` 或通过 `sbt run` 命令启动。 - 访问预设的 Web 端口(默认为 9000),即可看到 Kafka-Manager 的界面。 3. **使用指南** - **添加集群**:在管理界面,点击“添加集群...
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
《PyPI上的Confluent-Kafka-Amine 1.4.2.1:Python库的高效消息传递解决方案》 在Python编程中,有许多强大的库帮助开发者实现各种功能,其中之一便是Confluent-Kafka-Amine。这个库是Confluent Kafka的Python绑定...
赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...
在部署Kafka Eagle 2.0.8时,需要注意的是,用户需要首先解压`kafka-eagle-bin-2.0.8`压缩包,然后按照官方提供的文档进行配置和启动服务。通常,这包括设置环境变量、配置服务器连接信息以及启动Web服务等步骤。 ...