`
gaojingsong
  • 浏览: 1217594 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【kafka源码改写(一)】

阅读更多

1、启动类改写

package kafka;

 

import java.util.Properties;

 

import kafka.metrics.KafkaMetricsReporter1;

import kafka.server.KafkaConfig;

import kafka.server.KafkaServerStartable;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class MyKafka {

 

public static void main(String[] args) {

try {

System.out.println("---gaojingsong-------QQ525354786--");

Properties props = Utils.loadProps("../config/server.properties");

System.out.println(props);

KafkaConfig serverConfig = new KafkaConfig(props);

VerifiableProperties verifiableProperties = new VerifiableProperties(props);

KafkaMetricsReporter1.startReporters(verifiableProperties);

     final KafkaServerStartable kafkaServerStartable = new  KafkaServerStartable(serverConfig);

System.out.println("--gaojingsong----kafkaServerStartable---");

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

kafkaServerStartable.shutdown();

}

});

 

kafkaServerStartable.startup();

kafkaServerStartable.awaitShutdown();

} catch (Exception e) {

e.printStackTrace();

}

System.exit(0);

 

}

 

}



 

 报错原因是因为ZK链接超时,我压根就没有启动ZK,至少说明我的程序没有编译时候的语法错误。

 

二、改写KafkaMetricsReporter类

package kafka.metrics;

 

import java.util.concurrent.atomic.AtomicBoolean;

 

import scala.collection.Iterator;

import scala.collection.Seq;

import scala.collection.immutable.List;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class KafkaMetricsReporter1 {

static AtomicBoolean ReporterStarted = new AtomicBoolean(false);

 

public static void startReporters(VerifiableProperties verifiableProps) {

synchronized (ReporterStarted) {

if (ReporterStarted.get() == false) {

KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(verifiableProps);

Seq<String> seq = Utils.parseCsvList(verifiableProps.getString("kafka.metrics.reporters", ""));

List<String> metrics = seq.toList();

if (metrics.isEmpty()) {

List<String> reporters = metricsConfig.reporters().toList();

Iterator<String> iters = reporters.iterator();

while (iters.hasNext()) {

String con = iters.next();

System.out.println(con);

 

KafkaMetricsReporter reporter = null;

try {

Class clazz = Class.forName(con);

reporter = (KafkaMetricsReporter) clazz

.newInstance();

} catch (Exception e) {

e.printStackTrace();

}

// KafkaMetricsReporter reporter =

// Utils.createObject[KafkaMetricsReporter](con);

reporter.init(verifiableProps);

if (reporter instanceof KafkaMetricsReporterMBean) {

KafkaMetricsReporterMBean reportermb = (KafkaMetricsReporterMBean) reporter;

Utils.registerMBean(reporter,

reportermb.getMBeanName());

}

}

 

}

ReporterStarted.set(true);

}

}

 

}

}

 

  • 大小: 100.5 KB
0
0
分享到:
评论

相关推荐

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    Kafka技术内幕:图文详解Kafka源码设计与实现 PD

    Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载

    基于Kafka的管理系统源码.zip

    基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...

    apache kafka源码剖析高清part2

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社,分为part1和part2,一起下载解压

    Kafka源码解析与实战

    ### Kafka源码解析与实战 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以分布式的方式运行,具有高吞吐量、低延迟的特点,适用...

    kafka源码解析新手版本(修正版)

    ### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...

    apache kafka源码剖析高清part1

    apache kafka源码剖析高清 带索引书签目录_徐郡明(著) 电子工业出版社

    Kafka源码解析与实战.zip

    Kafka源码解析与实战.zip

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    《Apache Kafka源码剖析》.part5.rar

    Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。

    Kafka源码解析及实战

    《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...

    Kafka源码解析与实战-高清-完整目录-2018年1月

    标题中提到的“Kafka源码解析与实战”指向了关于Apache Kafka这个开源流处理平台的深入分析和应用实践。Apache Kafka是一个分布式流处理平台,最初是由LinkedIn公司开发,并于2011年开源。它主要用于构建实时数据...

    kafka需要的源码包

    **Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...

    kafka 技术内幕 图文详解Kafka源码设计与实现

    《Kafka技术内幕》这本书深入剖析了Apache Kafka这一分布式流处理平台的设计原理和实现细节,旨在帮助读者理解Kafka的核心机制,并能有效地运用到实际项目中。以下是对Kafka源码设计与实现的一些关键知识点的详细...

    《Apache Kafka源码剖析》.part2.rar

    Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。

    《Apache Kafka源码剖析》.part3.rar

    Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    Kafka源码解析与实战.pdf,超清无水印,有书签导航,精品

    《Kafka源码解析与实战》是一本深入探讨Apache Kafka技术的专业书籍,旨在帮助读者理解Kafka的核心原理、源码实现及其在大数据处理中的实际应用。这本书以清晰无水印的PDF格式呈现,包含了详尽的书签导航,便于读者...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌在2017年11月出版的一本深入解析Apache Kafka的技术专著。这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流...

    《Apache Kafka源码剖析》.part1.rar

    Apache Kafka源码剖析 PDF较大,分6份上传!一起解压即可。

Global site tag (gtag.js) - Google Analytics