`
gaojingsong
  • 浏览: 1183125 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【kafka源码改写(一)】

阅读更多

1、启动类改写

package kafka;

 

import java.util.Properties;

 

import kafka.metrics.KafkaMetricsReporter1;

import kafka.server.KafkaConfig;

import kafka.server.KafkaServerStartable;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class MyKafka {

 

public static void main(String[] args) {

try {

System.out.println("---gaojingsong-------QQ525354786--");

Properties props = Utils.loadProps("../config/server.properties");

System.out.println(props);

KafkaConfig serverConfig = new KafkaConfig(props);

VerifiableProperties verifiableProperties = new VerifiableProperties(props);

KafkaMetricsReporter1.startReporters(verifiableProperties);

     final KafkaServerStartable kafkaServerStartable = new  KafkaServerStartable(serverConfig);

System.out.println("--gaojingsong----kafkaServerStartable---");

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

kafkaServerStartable.shutdown();

}

});

 

kafkaServerStartable.startup();

kafkaServerStartable.awaitShutdown();

} catch (Exception e) {

e.printStackTrace();

}

System.exit(0);

 

}

 

}



 

 报错原因是因为ZK链接超时,我压根就没有启动ZK,至少说明我的程序没有编译时候的语法错误。

 

二、改写KafkaMetricsReporter类

package kafka.metrics;

 

import java.util.concurrent.atomic.AtomicBoolean;

 

import scala.collection.Iterator;

import scala.collection.Seq;

import scala.collection.immutable.List;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class KafkaMetricsReporter1 {

static AtomicBoolean ReporterStarted = new AtomicBoolean(false);

 

public static void startReporters(VerifiableProperties verifiableProps) {

synchronized (ReporterStarted) {

if (ReporterStarted.get() == false) {

KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(verifiableProps);

Seq<String> seq = Utils.parseCsvList(verifiableProps.getString("kafka.metrics.reporters", ""));

List<String> metrics = seq.toList();

if (metrics.isEmpty()) {

List<String> reporters = metricsConfig.reporters().toList();

Iterator<String> iters = reporters.iterator();

while (iters.hasNext()) {

String con = iters.next();

System.out.println(con);

 

KafkaMetricsReporter reporter = null;

try {

Class clazz = Class.forName(con);

reporter = (KafkaMetricsReporter) clazz

.newInstance();

} catch (Exception e) {

e.printStackTrace();

}

// KafkaMetricsReporter reporter =

// Utils.createObject[KafkaMetricsReporter](con);

reporter.init(verifiableProps);

if (reporter instanceof KafkaMetricsReporterMBean) {

KafkaMetricsReporterMBean reportermb = (KafkaMetricsReporterMBean) reporter;

Utils.registerMBean(reporter,

reportermb.getMBeanName());

}

}

 

}

ReporterStarted.set(true);

}

}

 

}

}

 

  • 大小: 100.5 KB
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics