`

读取kafka-run-class.sh 执行的结果

阅读更多

接上一篇,通过命令行或执行kafka.tools.ConsumerOffsetChecker的main方法,都只能把结果显示在标准输出流中,如果我想实时展示这些数据咋办呢? 这时就就需要把这些信息读出来。代码如下:

 

package com.wxj.kafka.monitor.jmx;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

/**
 * 在java程序中,调用kafka-run-class.sh的中kafka.tools.ConsumerOffsetChecker类,查看各个topic及分区的消费情况
 * 如果要调用scala.kafka.tools类,只需要简单修改一个main里面的cmd
 * @author root
 *
 */
public class SHRunClass
{

	public static void main(String[] args)
	{
		if(args == null || args.length == 0)
		{
			System.out.println("zkconnect,group are required");
			System.exit(1);
		}
		String cmd = "/home/hadoop/cdh44/kafka_2.10-0.8.1/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker";
		System.out.println("输入参数:");
		for(int i = 0 ; i < args.length ; i ++)
		{
			System.out.println(args[i]);
			cmd += " " + args[i];
		}

		//cmd = "java -version";
		
		Map<String, String> resultMap = execCmd(cmd);
		//printResult(resultMap);
		
		if("0".equals(resultMap.get("returnCode")))
		{
			System.out.println("执行成功");
			//System.out.println(resultMap.get("inStream"));
			//linux下用\n
			String[] lines = resultMap.get("inStream").split("@@");
			String[] headers = lines[0].split("\t");
			System.out.println("header:");
			for(String header : headers)
			{
				System.out.println(header);
			}
			
			System.out.println("value:");
			for(int i = 1 ; i < lines.length ; i ++)
			{
				String[] values = lines[i].split("\t");
				for(String value : values)
				{
					System.out.print(value + "\t");
				}
				System.out.println();
			}
		}
		else
		{
			System.out.println("执行失败:");
			System.out.println(resultMap.get("errStream").replaceAll("@@", "\n"));
		}
		
	}

	private static void printResult(Map<String, String> resultMap)
	{
		System.out.println("result:\n\n");
		
		for(Map.Entry<String, String> entry : resultMap.entrySet())
		{
			System.out.println(entry.getKey() + "," + entry.getValue());
		}
	}

	/**
	 * 执行命令
	 * @param cmd
	 * @return
	 */
	private static Map<String, String> execCmd(String cmd)
	{
		Map<String, String> resultMap = new HashMap<String, String>();
		try
		{
			Runtime rt = Runtime.getRuntime();
			Process proc = rt.exec(cmd);
		
			ReadInputStreamThread inputStreamThread = new ReadInputStreamThread(proc);
			inputStreamThread.start();
			inputStreamThread.join();
			
			ReadErrInputStreamThread errInputStreamThread = new ReadErrInputStreamThread(proc);
			errInputStreamThread.start();
			errInputStreamThread.join();
			
			resultMap.put("returnCode", proc.waitFor() + "");
			resultMap.put("inStream", inputStreamThread.getResult().toString());
			resultMap.put("errStream", errInputStreamThread.getResult().toString());
			
		} catch (Throwable t)
		{
			t.printStackTrace();
		}
		return resultMap;
	}
	
	private static class ReadInputStreamThread extends Thread
	{
		private Process proc;
		private StringBuffer result = new StringBuffer();
		
		public StringBuffer getResult()
		{
			return result;
		}

		private BufferedReader reader = null;
		
		public ReadInputStreamThread(Process proc) throws Exception
		{
			this.proc = proc;
			reader = new BufferedReader(new InputStreamReader(proc.getInputStream(), "utf-8"));
		}
		
		public void run()
		{
			String line = null;
			try
			{
				while((line = reader.readLine()) != null)
				{
					result.append(line);
					result.append("@@");
				}
			} catch (Exception e)
			{
				e.printStackTrace();
			}
			finally
			{
				if(reader != null)
				{
					try
					{
						reader.close();
					} catch (IOException e)
					{
						e.printStackTrace();
					}
				}
			}
		}
	}
	
	private static class ReadErrInputStreamThread extends Thread
	{
		private Process proc;
		private StringBuffer result = new StringBuffer();
		
		public StringBuffer getResult()
		{
			return result;
		}

		private BufferedReader reader = null;
		
		public ReadErrInputStreamThread(Process proc) throws Exception
		{
			this.proc = proc;
			reader = new BufferedReader(new InputStreamReader(proc.getErrorStream(), "utf-8"));
		}
		
		public void run()
		{
			String line = null;
			try
			{
				while((line = reader.readLine()) != null)
				{
					result.append(line);
					result.append("@@");
				}
			} catch (Exception e)
			{
				e.printStackTrace();
			}
			finally
			{
				if(reader != null)
				{
					try
					{
						reader.close();
					} catch (IOException e)
					{
						e.printStackTrace();
					}
				}
			}
		}
	}
	

}

 

由于只是测试,所以在main方法中,把路径硬编码了。

在linux下的执行结果如下:



 

 

 

 

 

  • 大小: 191 KB
分享到:
评论

相关推荐

    kafka-clients-0.10.0.1-API文档-中文版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.10.0.1.pom; 包含...

    kafka-manager-1.3.3.7.zip

    说明:kafka-manager 自己下载编译速度巨慢,此资源是编译好的 kafka-manager,版本是:kafka-manager-1.3.3.7(适用于较新的版本,kafka版本是kafka_2.11-2.0.1)。 安装配置说明: 1. 里头有个自己写的启动脚本,...

    kafka-eagle-bin-2.1.0.tar.gz

    4. 启动:执行`bin/kafka-eagle-start.sh`启动服务,使用`bin/kafka-eagle-stop.sh`停止服务。 5. 访问:在Web浏览器中输入`http://服务器IP:端口`,按照提示进行登录和操作。 ### 使用注意事项 1. 确保运行环境已...

    kafka-manager-1.3.3.22.zip

    已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 [root@hadoop102 module]$ unzip kafka-manager-1.3.3.22.zip 4)进入到/opt/module/kafka-manager-1.3.3.22/...

    KAFKA-3.1.1-1.3.1.1.p0.2-el7.parcel

    标题中的“KAFKA-3.1.1-1.3.1.1.p0.2-el7.parcel”指的是Apache Kafka的一个特定版本的软件包,适用于Red Hat Enterprise Linux 7(RHEL 7)系统。这个版本是3.1.1,并且可能是某个特定发行版或修补版的1.3.1.1.p0.2...

    kafka-manager-2.0.0.2.zip

    kafka-manager.jar 配置application.conf中的zk地址后可直接启动 bin/kafka-manager -Dconfig.file=/kafka-manager-2.0.0.2/conf/application.conf -Dhttp.port=8888

    kafka-clients-0.10.0.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-0.10.0.1.jar; 赠送原API文档:kafka-clients-0.10.0.1-javadoc.jar; 赠送源代码:kafka-clients-0.10.0.1-sources.jar; 包含翻译后的API文档:kafka-clients-0.10.0.1-javadoc-API...

    kafka-python-2.0.2.tar.gz

    《Python中的Kafka库:kafka-python-2.0.2深入解析》 在Python编程环境中,Kafka作为分布式消息系统的接口,对于实时数据处理和流处理应用至关重要。`kafka-python`是Python社区开发的一个非常流行的Kafka客户端库...

    Java开发案例-springboot-08-整合Kafka-源代码+文档.rar

    Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源代码+文档.rar Java开发案例-springboot-08-整合Kafka-源...

    kafka-eagle-bin-2.0.5.tar.gz

    - **启动服务**:执行bin/kafka-eagle-start.sh脚本启动服务,然后在浏览器中输入http://服务器IP:8080访问Web界面。 3. **使用指南** - **登录界面**:首次访问会提示设置默认管理员账号。 - **监控页面**:...

    kafka-manager-1.3.3.16.zip(已编译)

    3. **启动**:执行启动脚本,通常为`bin/kafka-manager`,Kafka Manager服务即开始运行。 4. **访问Web界面**:默认情况下,Kafka Manager的Web界面会在本地8080端口提供服务,打开浏览器输入`...

    kafka-clients-2.0.0-API文档-中文版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-clients-2.4.1-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    kafka-clients-0.9.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-0.9.0.0.jar; 赠送原API文档:kafka-clients-0.9.0.0-javadoc.jar; 赠送源代码:kafka-clients-0.9.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-0.9.0.0.pom; 包含翻译后...

    KAFKA-3.0.0-1.3.0.0.p0.40-el7

    标题 "KAFKA-3.0.0-1.3.0.0.p0.40-el7" 暗示我们正在讨论的是 Apache Kafka 的一个特定版本,这里是3.0.0,针对的是CDH(Cloudera Data Hub)环境,版本号为1.3.0.0.p0.40,适配的是EL7(CentOS 7)操作系统。...

    kafka-manager-1.3.3.21

    - 启动应用,执行 `./bin/kafka-manager` 或通过 `sbt run` 命令启动。 - 访问预设的 Web 端口(默认为 9000),即可看到 Kafka-Manager 的界面。 3. **使用指南** - **添加集群**:在管理界面,点击“添加集群...

    kafka-clients-2.4.1-API文档-中文版.zip

    赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...

    PyPI 官网下载 | confluent-kafka-amine-1.4.2.1.tar.gz

    《PyPI上的Confluent-Kafka-Amine 1.4.2.1:Python库的高效消息传递解决方案》 在Python编程中,有许多强大的库帮助开发者实现各种功能,其中之一便是Confluent-Kafka-Amine。这个库是Confluent Kafka的Python绑定...

    kafka-clients-2.0.0-API文档-中英对照版.zip

    赠送jar包:kafka-clients-2.0.0.jar; 赠送原API文档:kafka-clients-2.0.0-javadoc.jar; 赠送源代码:kafka-clients-2.0.0-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.0.0.pom; 包含翻译后的API文档...

    kafka-eagle-2.0.8.tar.gz

    在部署Kafka Eagle 2.0.8时,需要注意的是,用户需要首先解压`kafka-eagle-bin-2.0.8`压缩包,然后按照官方提供的文档进行配置和启动服务。通常,这包括设置环境变量、配置服务器连接信息以及启动Web服务等步骤。 ...

Global site tag (gtag.js) - Google Analytics