`
gaojingsong
  • 浏览: 1221808 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

【kafka源码改写(一)】

阅读更多

1、启动类改写

package kafka;

 

import java.util.Properties;

 

import kafka.metrics.KafkaMetricsReporter1;

import kafka.server.KafkaConfig;

import kafka.server.KafkaServerStartable;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class MyKafka {

 

public static void main(String[] args) {

try {

System.out.println("---gaojingsong-------QQ525354786--");

Properties props = Utils.loadProps("../config/server.properties");

System.out.println(props);

KafkaConfig serverConfig = new KafkaConfig(props);

VerifiableProperties verifiableProperties = new VerifiableProperties(props);

KafkaMetricsReporter1.startReporters(verifiableProperties);

     final KafkaServerStartable kafkaServerStartable = new  KafkaServerStartable(serverConfig);

System.out.println("--gaojingsong----kafkaServerStartable---");

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread() {

public void run() {

kafkaServerStartable.shutdown();

}

});

 

kafkaServerStartable.startup();

kafkaServerStartable.awaitShutdown();

} catch (Exception e) {

e.printStackTrace();

}

System.exit(0);

 

}

 

}



 

 报错原因是因为ZK链接超时,我压根就没有启动ZK,至少说明我的程序没有编译时候的语法错误。

 

二、改写KafkaMetricsReporter类

package kafka.metrics;

 

import java.util.concurrent.atomic.AtomicBoolean;

 

import scala.collection.Iterator;

import scala.collection.Seq;

import scala.collection.immutable.List;

import kafka.utils.Utils;

import kafka.utils.VerifiableProperties;

 

public class KafkaMetricsReporter1 {

static AtomicBoolean ReporterStarted = new AtomicBoolean(false);

 

public static void startReporters(VerifiableProperties verifiableProps) {

synchronized (ReporterStarted) {

if (ReporterStarted.get() == false) {

KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(verifiableProps);

Seq<String> seq = Utils.parseCsvList(verifiableProps.getString("kafka.metrics.reporters", ""));

List<String> metrics = seq.toList();

if (metrics.isEmpty()) {

List<String> reporters = metricsConfig.reporters().toList();

Iterator<String> iters = reporters.iterator();

while (iters.hasNext()) {

String con = iters.next();

System.out.println(con);

 

KafkaMetricsReporter reporter = null;

try {

Class clazz = Class.forName(con);

reporter = (KafkaMetricsReporter) clazz

.newInstance();

} catch (Exception e) {

e.printStackTrace();

}

// KafkaMetricsReporter reporter =

// Utils.createObject[KafkaMetricsReporter](con);

reporter.init(verifiableProps);

if (reporter instanceof KafkaMetricsReporterMBean) {

KafkaMetricsReporterMBean reportermb = (KafkaMetricsReporterMBean) reporter;

Utils.registerMBean(reporter,

reportermb.getMBeanName());

}

}

 

}

ReporterStarted.set(true);

}

}

 

}

}

 

  • 大小: 100.5 KB
0
0
分享到:
评论

相关推荐

    Kafka技术内幕:图文详解Kafka源码设计与实现+书签.pdf+源码

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...

    Kafka技术内幕:图文详解Kafka源码设计与实现 PD

    Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载 Kafka技术内幕:图文详解Kafka源码设计与实现 PDF 下载

    基于Kafka的管理系统源码.zip

    基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...

    kafka源码解析新手版本(修正版)

    ### Kafka源码解析新手版本(修正版)知识点详解 #### 一、Kafka诞生背景及其在LinkedIn的应用 **1.1 Apache Kafka项目简介** - **诞生背景:** Apache Kafka最初由LinkedIn开发,随后于2011年初开源,并在2012年...

    Kafka源码解析与实战.zip

    Kafka源码解析与实战.zip

    kafka源码分析

    kafka源码分析, Introduction kafka-Intro kafka-Unix kafka-Producer kafka-Producer-Scala kafka-SocketServer kafka-LogAppend kafka-ISR kafka-Consumer-init-Scala

    Kafka源码解析及实战

    《Kafka源码解析及实战》是一本专为深度学习Apache Kafka的读者设计的教材,旨在帮助读者深入了解Kafka的工作原理及其内部机制。通过源码级别的解析,读者可以更好地掌握Kafka在分布式消息系统中的核心功能和设计...

    kafka 技术内幕 图文详解Kafka源码设计与实现

    《Kafka技术内幕》这本书深入剖析了Apache Kafka这一分布式流处理平台的设计原理和实现细节,旨在帮助读者理解Kafka的核心机制,并能有效地运用到实际项目中。以下是对Kafka源码设计与实现的一些关键知识点的详细...

    kafka需要的源码包

    **Kafka 源码分析概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给 Apache 软件基金会。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流应用,...

    Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌(2017.11).pdf

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是郑奇煌在2017年11月出版的一本深入解析Apache Kafka的技术专著。这本书详细介绍了Kafka的核心概念、工作原理以及源码分析,旨在帮助读者理解并掌握这个分布式流...

    storm+kafka源码示例

    Storm是一个实时处理系统,而Kafka则是一个高吞吐量的消息队列。本示例将深入探讨如何结合这两个技术来构建一个实时数据流处理应用。 首先,让我们来了解Apache Storm。Storm是一个开源的分布式实时计算系统,它...

    Kafka技术内幕:图文详解Kafka源码设计与实现

    《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书籍覆盖了大数据处理领域的关键知识点,通过丰富的图表和...

    spring-kafka源代码

    Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度解析,帮助开发者更好地理解和运用...

    SpringBoot集成kafka完整框架源码IDEA

    本文将详细解析SpringBoot如何与Kafka整合,并提供一个完整的框架源码解析。 首先,SpringBoot是一个简化Spring应用开发的框架,它通过自动配置、内嵌Web服务器等功能,使得创建和运行Spring应用程序变得极其简单。...

    kafka-clients源码.zip

    《深入理解Kafka-clients源码》 Kafka-clients是Apache Kafka的重要组成部分,它提供了与Kafka集群交互的API,使得开发者能够构建基于Kafka的应用程序。在2.*版本中,Kafka-clients进行了多方面的优化和改进,提升...

    kafka源码以及导入idea的环境需要安装jdk8.zip

    **Kafka源码分析与IDEA配置指南** Kafka是一个分布式流处理平台,由LinkedIn开发并贡献给Apache软件基金会,现已成为Apache顶级项目。它主要用Java编写,但同时也支持Scala,是大数据领域中用于实时数据流处理和...

    Kafka源码剖析试读文章

    Apache Kafka是一个分布式流处理平台,被广泛用于构建实时数据管道和流应用程序。它最初由LinkedIn公司开发,之后成为Apache软件基金会的一个开源项目。Kafka具有高性能、可扩展、持久性、可靠和高吞吐量的特点,它...

    扩展logback将日志输出到Kafka实例源码

    标题"扩展logback将日志输出到Kafka实例源码"涉及的技术点主要集中在如何将Logback与Kafka集成,使得日志可以被有效地发送到Kafka集群。这个过程通常涉及到以下几个步骤: 1. **添加依赖**:首先,你需要在项目的...

    kafka源码解析新手版本

    ### Kafka源码解析新手版本 #### 一、Kafka概览与诞生背景 Apache Kafka 是一个高度可扩展的分布式消息系统,由 LinkedIn 开发并开源,后成为 Apache 的顶级项目。Kafka 采用 Scala 编写,其核心设计旨在提供一个...

Global site tag (gtag.js) - Google Analytics