使用环境说明
a) kafka 使用版本: kafka_2.9.2-0.8.1
b) 三台虚拟机 10.3.63.5 10.3.63.6 10.3.63.7 分别对应的是 1 2 3 三个节点。
宕机现象
a) 目前看到的现象是 storm 无法写入和查询(当时没有确定是那种情况), 而通过查看zookeeper中的
broker借点,发现只有3, 1 两个, 而不是正常情况下的 3, 2, 1 三个节点。
b) 2 节点丢失, 以前的做法是重启,这次也不例外。 果断重启,时间是: 2015-01-12 16:07:47
下面是对应的 controller.log 的日志
[2015-01-12 16:07:47,054] INFO [ControllerEpochListener on 2]: Initialized controller epoch to 18 and zk version 17 (kafka.controller.ControllerEpochListener)
[2015-01-12 16:07:47,303] INFO [Controller 2]: Controller starting up (kafka.controller.KafkaController)
[2015-01-12 16:07:47,467] INFO [Controller 2]: Controller startup complete (kafka.controller.KafkaController)
对应的算state-change.log 的日志
[2015-01-12 16:07:47,682] TRACE Broker 2 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:2,ControllerEpoch:18),ReplicationFactor:2),AllReplicas:3,1) for partition [V1_A,6] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 13 (state.change.logger)
[2015-01-12 16:07:47,683] TRACE Broker 2 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:5,ControllerEpoch:18),ReplicationFactor:2),AllReplicas:3,1) for partition [V1_TEST,7] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 13 (state.change.logger)
[2015-01-12 16:07:47,683] TRACE Broker 2 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:5,ControllerEpoch:18),ReplicationFactor:2),AllReplicas:3,2) for partition [V1_TEST,4] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 13 (state.change.logger)
查看日志结果
a) controller.log 重启时间段内对应的日志
[2015-01-12 15:11:45,549] INFO [Controller-2-to-broker-1-send-thread], Controller 2 connected to id:1,host:10.3.63.5,port:9092 for sending state change requests (kafka.controller.RequestSendThread)
[2015-01-12 15:11:45,549] INFO [Controller 2]: New broker startup callback for 1 (kafka.controller.KafkaController)
[2015-01-12 15:11:45,549] INFO [Controller-2-to-broker-1-send-thread], Starting (kafka.controller.RequestSendThread)
[2015-01-12 15:11:45,558] INFO [Replica state machine on controller 2]: Invoking state change to OnlineReplica for replicas [Topic=V1_B,Partition=8,Replica=1],[Topic=V1_B,Partition=7,Replica=1],[Topic=V1_TEST,Partition=5,Replica=1],[Topic=V1_B,Partition=3,Replica=1],[Topic=V1_B,Partition=9,Replica=1],[Topic=V1_B,Partition=5,Replica=1],[Topic=V1_B,Partition=1,Replica=1],[Topic=V1_TEST,Partition=9,Replica=1],[Topic=V1_TEST,Partition=2,Replica=1],[Topic=V1_A,Partition=6,Replica=1],[Topic=V1_A,Partition=0,Replica=1],[Topic=V1_B,Partition=2,Replica=1],[Topic=V1_A,Partition=1,Replica=1],[Topic=V1_TEST,Partition=7,Replica=1],[Topic=V1_TEST,Partition=8,Replica=1],[Topic=V1_TEST,Partition=1,Replica=1],[Topic=V1_A,Partition=5,Replica=1],[Topic=V1_A,Partition=4,Replica=1],[Topic=V1_TEST,Partition=3,Replica=1] (kafka.controller.ReplicaStateMachine)
[2015-01-12 15:50:44,292] INFO [SessionExpirationListener on 2], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
[2015-01-12 15:50:44,298] INFO [delete-topics-thread], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
b) 对应服务死掉的时间段 15:40 左右, state-change.log 没有任何日志,最后一条日志的记录如下:
[2015-01-12 15:11:46,127] TRACE Controller 2 epoch 17 received response correlationId 14 for a request sent to broker id:1,host:10.3.63.5,port:9092 (state.change.logger)
[2015-01-12 15:11:46,172] TRACE Controller 2 epoch 17 received response correlationId 14 for a request sent to broker id:1,host:10.3.63.5,port:9092 (state.change.logger)
c) 对应的server.log 日志如下:
[2015-01-12 15:49:59,926] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:02,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:05,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:08,926] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:11,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:14,926] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:17,928] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:20,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:23,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:26,928] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:29,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:32,928] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:35,928] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:39,055] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:41,927] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:44,926] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:47,931] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:49,189] INFO Closing socket connection to /10.3.63.7. (kafka.network.Processor)
[2015-01-12 15:50:50,509] INFO Closing socket connection to /10.3.63.5. (kafka.network.Processor)
[2015-01-12 15:51:05,742] INFO Partition [V1_TEST,0] on broker 2: Shrinking ISR for partition [V1_TEST,0] from 2,3 to 2 (kafka.cluster.Partition)
[2015-01-12 15:51:06,041] ERROR Conditional update of path /brokers/topics/V1_TEST/partitions/0/state with data {"controller_epoch":16,"leader":2,"version":1,"leader_epoch":3,"isr":[2]} and expected version 4 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/V1_TEST/partitions/0/state (kafka.utils.ZkUtils$)
[2015-01-12 15:51:06,042] INFO Partition [V1_TEST,0] on broker 2: Cached zkVersion [4] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-01-12 15:51:06,042] INFO Partition [V1_A,3] on broker 2: Shrinking ISR for partition [V1_A,3] from 2,3 to 2 (kafka.cluster.Partition)
[2015-01-12 15:51:06,214] ERROR Conditional update of path /brokers/topics/V1_A/partitions/3/state with data {"controller_epoch":17,"leader":2,"version":1,"leader_epoch":1,"isr":[2]} and expected version 2 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/V1_A/partitions/3/state (kafka.utils.ZkUtils$)
[2015-01-12 15:51:06,214] INFO Partition [V1_A,3] on broker 2: Cached zkVersion [2] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-01-12 15:51:06,214] INFO Partition [V1_B,3] on broker 2: Shrinking ISR for partition [V1_B,3] from 2,1 to 2 (kafka.cluster.Partition)
[2015-01-12 15:51:06,482] ERROR Conditional update of path /brokers/topics/V1_B/partitions/3/state with data {"controller_epoch":16,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} and expected version 2 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/V1_B/partitions/3/state (kafka.utils.ZkUtils$)
[2015-01-12 15:51:06,482] INFO Partition [V1_B,3] on broker 2: Cached zkVersion [2] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-01-12 15:51:06,483] INFO Partition [V1_B,4] on broker 2: Shrinking ISR for partition [V1_B,4] from 2,3 to 2 (kafka.cluster.Partition)
[2015-01-12 15:51:06,488] ERROR Conditional update of path /brokers/topics/V1_B/partitions/4/state with data {"controller_epoch":17,"leader":2,"version":1,"leader_epoch":1,"isr":[2]} and expected version 2 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/V1_B/partitions/4/state (kafka.utils.ZkUtils$)
[2015-01-12 15:51:06,488] INFO Partition [V1_B,4] on broker 2: Cached zkVersion [2] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
[2015-01-12 15:51:06,488] INFO Partition [V1_A,1] on broker 2: Shrinking ISR for partition [V1_A,1] from 2,1 to 2 (kafka.cluster.Partition)
[2015-01-12 15:51:06,494] ERROR Conditional update of path /brokers/topics/V1_A/partitions/1/state with data {"controller_epoch":17,"leader":2,"version":1,"leader_epoch":1,"isr":[2]} and expected version 2 failed due to org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /brokers/topics/V1_A/partitions/1/state (kafka.utils.ZkUtils$)
根据日志分析
a) 根据controller.log种的显示查看源代码,代码中有两个版本: 0.8.1.0 0.8.1.1 。 两个版本均能查到第一句
[2015-01-12 15:50:44,292] INFO [SessionExpirationListener on 2], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
KafkaController.scala
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
从上面可以看到是因为 zookeeper 的 session 失效, 而一个新的 session 建立了。根据提示处理这种情况需要关闭所有的
业务controller, 然后尝试重新选举。 onControllerResignation()方法代码如下:
/**
* This callback is invoked by the zookeeper leader elector when the current broker resigns as the controller. This is
* required to clean up internal controller data structures
*/
def onControllerResignation() {
inLock(controllerContext.controllerLock) {
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
deleteTopicManager.shutdown()
Utils.unregisterMBean(KafkaController.MBeanName)
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
}
}
上面的关闭代码种有 deleteTopicManager.shutdown(), 这个就是产生了下面的那个 delete-topics-thread 关闭。 以前以为是这个关闭导致了
kafka无法提供服务, 现在看来不是的,而且此处的日志级别为INFO, 而不是ERROR 或者 WARNING
重新选举对应的是 ZookeeperLeaderElector 的 下面代码:
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
try {
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
(controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
controllerContext.zkSessionTimeout)
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
case Some(controller) => KafkaController.parseControllerId(controller)
<pre>
case None => {
</pre>
warn("A leader has been elected but just resigned, this will result in another round of election")
-1
}
}
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()
}
amILeader
}
b) server.log 中显示 2015-01-12 15:50:47 时还在和 138(storm服务期) 机器通信, 而下面的两条是和 5 和 7 就是 kafka其他两个节点, 然后
就 Shrinking ISR for partition 和 ERROR Conditional update of path
kafka集群其他节点日志情况
a) 10.3.63.5 server.log 日志情况
[2015-01-12 15:50:44,046] INFO re-registering broker info in ZK for broker 1 (kafka.server.KafkaHealthcheck)
[2015-01-12 15:50:44,315] INFO Registered broker 1 at path /brokers/ids/1 with address 10.3.63.5:9092. (kafka.utils.ZkUtils$)
[2015-01-12 15:50:44,316] INFO done re-registering broker (kafka.server.KafkaHealthcheck)
[2015-01-12 15:50:44,316] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck)
[2015-01-12 15:50:44,328] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-01-12 15:50:50,179] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [V1_B,3],[V1_TEST,9],[V1_TEST,8],[V1_B,9],[V1_B
,8],[V1_A,5],[V1_TEST,2],[V1_A,1],[V1_B,2],[V1_TEST,3] (kafka.server.ReplicaFetcherManager)
[2015-01-12 15:50:50,193] INFO [ReplicaFetcherThread-0-2], Shutting down (kafka.server.ReplicaFetcherThread)
[2015-01-12 15:50:50,508] INFO [ReplicaFetcherThread-0-2], Stopped (kafka.server.ReplicaFetcherThread)
[2015-01-12 15:50:50,508] INFO [ReplicaFetcherThread-0-2], Shutdown completed (kafka.server.ReplicaFetcherThread)
[2015-01-12 15:50:50,992] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:53,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:56,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:59,934] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:02,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:05,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:08,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:11,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:14,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:17,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:20,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:23,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:26,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:29,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:32,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:33,418] INFO Closing socket connection to /10.3.63.7. (kafka.network.Processor)
[2015-01-12 15:51:35,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:38,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:39,709] INFO re-registering broker info in ZK for broker 1 (kafka.server.KafkaHealthcheck)
[2015-01-12 15:51:41,400] INFO Registered broker 1 at path /brokers/ids/1 with address 10.3.63.5:9092. (kafka.utils.ZkUtils$)
[2015-01-12 15:51:41,400] INFO done re-registering broker (kafka.server.KafkaHealthcheck)
[2015-01-12 15:51:41,401] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck)
[2015-01-12 15:51:41,933] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:43,381] INFO New leader is 3 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-01-12 15:51:44,932] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
10.3.63.7 server.log 日志情况
[2015-01-12 15:50:44,776] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:47,776] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:50,776] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:53,776] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:56,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:50:59,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:02,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:05,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:08,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:11,777] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:14,779] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:17,778] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:20,778] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:23,778] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:26,778] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:29,778] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:32,779] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
[2015-01-12 15:51:35,779] INFO Closing socket connection to /10.3.63.138. (kafka.network.Processor)
b) 10.3.63.5 上面的 controller.log 日志:
[2015-01-12 15:50:44,043] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
[2015-01-12 15:50:44,320] DEBUG [ControllerEpochListener on 1]: Controller epoch listener fired with new epoch 18 (kafka.controller.ControllerEpochListener)
[2015-01-12 15:50:44,321] INFO [ControllerEpochListener on 1]: Initialized controller epoch to 18 and zk version 17 (kafka.controller.ControllerEpochListener)
[2015-01-12 15:51:39,705] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
[2015-01-12 15:51:42,393] DEBUG [ControllerEpochListener on 1]: Controller epoch listener fired with new epoch 18 (kafka.controller.ControllerEpochListener)
[2015-01-12 15:51:42,693] INFO [ControllerEpochListener on 1]: Initialized controller epoch to 18 and zk version 17 (kafka.controller.ControllerEpochListener)
10.3.63.7 上面 controller.log 日志:
[2015-01-12 15:52:25,093] INFO [Controller 3]: Broker 3 starting become controller state transition (kafka.controller.KafkaController)
[2015-01-12 15:52:25,995] INFO [Controller 3]: Controller 3 incremented epoch to 18 (kafka.controller.KafkaController)
[2015-01-12 15:52:29,901] DEBUG [Channel manager on controller 3]: Controller 3 trying to connect to broker 3 (kafka.controller.ControllerChannel
Manager)
[2015-01-12 15:52:29,905] INFO [Controller-3-to-broker-3-send-thread], Controller 3 connected to id:3,host:10.3.63.7,port:9092 for sending state
change requests (kafka.controller.RequestSendThread)
[2015-01-12 15:52:30,614] INFO [Controller-3-to-broker-3-send-thread], Starting (kafka.controller.RequestSendThread)
[2015-01-12 15:52:30,660] INFO [Controller 3]: Partitions undergoing preferred replica election: (kafka.controller.KafkaController)
[2015-01-12 15:52:30,662] INFO [Controller 3]: Partitions that completed preferred replica election: (kafka.controller.KafkaController)
[2015-01-12 15:52:30,663] INFO [Controller 3]: Resuming preferred replica election for partitions: (kafka.controller.KafkaController)
[2015-01-12 15:52:30,756] INFO [Controller 3]: Partitions being reassigned: Map() (kafka.controller.KafkaController)
[2015-01-12 15:52:30,756] INFO [Controller 3]: Partitions already reassigned: List() (kafka.controller.KafkaController)
[2015-01-12 15:52:30,757] INFO [Controller 3]: Resuming reassignment of partitions: Map() (kafka.controller.KafkaController)
[2015-01-12 15:52:31,002] INFO [Controller 3]: List of topics to be deleted: (kafka.controller.KafkaController)
[2015-01-12 15:52:31,003] INFO [Controller 3]: List of topics ineligible for deletion: V1_A,V1_TEST,V1_B (kafka.controller.KafkaController)
[2015-01-12 15:52:31,005] INFO [Controller 3]: Currently active brokers in the cluster: Set(3) (kafka.controller.KafkaController)
[2015-01-12 15:52:31,006] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController)
[2015-01-12 15:52:31,006] INFO [Controller 3]: Current list of topics in the cluster: Set(V1_TEST, V1_B, V1_A) (kafka.controller.KafkaController)
[2015-01-12 15:52:31,443] INFO [Replica state machine on controller 3]: Invoking state change to OnlineReplica for replicas [Topic=V1_B,Partition
=4,Replica=3],[Topic=V1_A,Partition=6,Replica=3],[Topic=V1_B,Partition=6,Replica=3],[Topic=V1_A,Partition=2,Replica=3],[Topic=V1_B,Partition=1,Re
plica=3],[Topic=V1_TEST,Partition=6,Replica=3],[Topic=V1_B,Partition=5,Replica=3],[Topic=V1_B,Partition=0,Replica=3],[Topic=V1_TEST,Partition=5,R
eplica=3],[Topic=V1_TEST,Partition=1,Replica=3],[Topic=V1_TEST,Partition=4,Replica=3],[Topic=V1_A,Partition=3,Replica=3],[Topic=V1_A,Partition=4,
Replica=3],[Topic=V1_TEST,Partition=0,Replica=3],[Topic=V1_A,Partition=0,Replica=3],[Topic=V1_TEST,Partition=7,Replica=3],[Topic=V1_B,Partition=7
,Replica=3] (kafka.controller.ReplicaStateMachine)
10.3.63.5 上面的 state-change.log.2015-01-12-15
[2015-01-12 15:11:46,172] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:2,1) for partition [V1_TEST,3] in response to UpdateMetadata request sent by controller 2 epoch 17 with correlation id 14 (state.change.logger)
[2015-01-12 15:50:48,759] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,1,LeaderEpoch:1,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:3,1) for partition [V1_A,6] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 4 (state.change.logger)
[2015-01-12 15:50:48,760] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,1,LeaderEpoch:4,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:3,1) for partition [V1_TEST,7] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 4 (state.change.logger)
[2015-01-12 15:50:48,760] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,LeaderEpoch:5,ControllerEpoch:18),ReplicationFactor:2),AllReplicas:3,2) for partition [V1_TEST,4] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 4 (state.change.logger)
[2015-01-12 15:50:48,760] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,1,LeaderEpoch:1,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:1,3) for partition [V1_B,5] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 4 (state.change.logger)
[2015-01-12 15:50:48,760] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:3,ISR:3,1,LeaderEpoch:1,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:3,1) for partition [V1_B,1] in response to UpdateMetadata request sent by controller 3 epoch 18 with correlation id 4 (state.change.logger)
10.3.63.7 上面 state-change.log.2015-01-12-15 的日志
[2015-01-12 15:13:30,483] TRACE Broker 3 cached leader info (LeaderAndIsrInfo:(Leader:2,ISR:2,LeaderEpoch:0,ControllerEpoch:17),ReplicationFactor:2),AllReplicas:2,1) for partition [V1_TEST,3] in response to UpdateMetadata request sent by controller 2 epoch 17 with correlation id 14 (state.change.logger)
[2015-01-12 15:52:31,475] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_B,4] from OnlineReplica to OnlineReplica (state.change.logger)
[2015-01-12 15:52:31,476] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_A,6] from OnlineReplica to OnlineReplica (state.change.logger)
[2015-01-12 15:52:31,477] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_B,6] from OnlineReplica to OnlineReplica (state.change.logger)
[2015-01-12 15:52:31,478] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_A,2] from OnlineReplica to OnlineReplica (state.change.logger)
[2015-01-12 15:52:31,479] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_B,1] from OnlineReplica to OnlineReplica (state.change.logger)
[2015-01-12 15:52:31,480] TRACE Controller 3 epoch 18 changed state of replica 3 for partition [V1_TEST,6] from OnlineReplica to OnlineReplica (state.change.logger)
其他节点日志分析结果
a) 对于 5 节点上面的 controller.log 中显示,在 15:50:44 这个时间点上面, 5 6 两个节点的日志显示一致,如下:
ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
而下面的日志就更不一样了。 上面分析了 6 节点执行的情况。下面分析下 5 节点上面的执行情况。
b) 根据 5 的 server.log 发现除了执行了 KafkaController 的 handleNewSession 方法, 还执行的是 KafkaHealthcheck handleNewSession
KafkaHealthcheck 中的方法如下
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
在 KafkaServer 中有如下对于 KafkaHealthcheck 的启动调用。
/* tell everyone we are alive */
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck.startup()
KafkaServer 中也有关于 KafkaController 的启动, 代码如下:
kafkaController = new KafkaController(config, zkClient)
kafkaController.startup()
kafkaController.startup() 中也注册了 SessionExpirationListener,
registerSessionExpirationListener()
这个时候的问题是, 为什么 5 上面的服务调用到了这个方法,而 6 上面确没有呢??????????
也就是说,正常的情况下,如果zookeeper的session过期,会调用 KafkaController 中的 SessionExpirationListener 的 handleNewSession
也会调用 KafkaHealthcheck 中的 SessionExpireListener 的 handleNewSession 方法。 因为 IZkStateListener 封装了自动充连的代码, 如下:
private void processStateChanged(WatchedEvent event) {
LOG.info("zookeeper state changed (" + event.getState() + ")");
setCurrentState(event.getState());
if (getShutdownTrigger()) {
return;
}
try {
fireStateChangedEvent(event.getState());
if (event.getState() == KeeperState.Expired) {
reconnect();
fireNewSessionEvents();
}
} catch (final Exception e) {
throw new RuntimeException("Exception while restarting zk client", e);
}
}
所以会调用 handleNewSession 方法。
KafkaServer 分析
a) 接着上面的分析, 我们按照 5 上面的日志分析, 看到了有 ControllerEpochListener 的日志, 我们在
KafkaServer 中也找到了关玉 ControllerEpochListener
private def registerControllerChangedListener() {
zkClient.subscribeDataChanges(ZkUtils.ControllerEpochPath, new ControllerEpochListener(this))
}
关于这个是这样理解的,对于controller, 可能只有一个节点是启动 controller 的, 然后这个保存一个自增的id值保存在
controler_epoch 这个zookeeper节点上面。这个也是通过 KafkaController 的启动方法的注释中联想到的。具体如下:
/**
* Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
* is the controller. It merely registers the session expiration listener and starts the controller leader
* elector
*/
def startup() = {
inLock(controllerContext.controllerLock) {
info("Controller starting up");
registerSessionExpirationListener()
isRunning = true
controllerElector.startup
info("Controller startup complete")
}
}
线上环境
[zk: 10.3.63.204:2181(CONNECTED) 8] get /controller
{"version":1,"brokerid":0,"timestamp":"1407310302044"}
cZxid = 0x700000592
ctime = Wed Aug 06 15:32:01 CST 2014
mZxid = 0x700000592
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x700000592
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x147aa389edd0001
dataLength = 54
numChildren = 0
[zk: 10.3.63.204:2181(CONNECTED) 9] get /controller_epoch
18
cZxid = 0x100000085
ctime = Thu Jul 24 14:05:16 CST 2014
mZxid = 0x700000593
mtime = Wed Aug 06 15:32:01 CST 2014
pZxid = 0x100000085
cversion = 0
dataVersion = 17
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
测试环境
[zk: 10.3.63.9:2181(CONNECTED) 5] get /controller
{"version":1,"brokerid":1,"timestamp":"1421163253934"}
cZxid = 0x9003c9fe0
ctime = Tue Jan 13 23:33:14 CST 2015
mZxid = 0x9003c9fe0
mtime = Tue Jan 13 23:33:14 CST 2015
pZxid = 0x9003c9fe0
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x14a95a9b0dc0039
dataLength = 54
numChildren = 0
[zk: 10.3.63.9:2181(CONNECTED) 4] get /controller_epoch
20
cZxid = 0x700c05814
ctime = Sat Oct 11 11:03:34 CST 2014
mZxid = 0x9003c9fe1
mtime = Tue Jan 13 23:33:14 CST 2015
pZxid = 0x700c05814
cversion = 0
dataVersion = 19
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
通过上面的代码还看到,启动了一个 controllerElector, 从字面理解是启动了 controller 的选举。 这个Elector
实际上就是 ZookeeperLeaderElector, 这个看完以后的逻辑是 所有启动的节点都去 zookeeper 关注 /controller 这个
路径, 然后都尝试写入这个节点内容,谁写入谁就是controller的leader节点。然后主借点就回调 kafkaControler 中的
onControllerFailover 方法(这个方法是scala 中作为参数传入进去的)。
jstack 查找原因
a) 看到日志打印的情况不一样, 尤其是到 [delete-topics-thread], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
日志以后,/broker/ids/ 目录下面就找不到对应的节点了。而只有调用 KafkaHealthcheck 中的 handleNewSession 以后才能重新注册节点。代码如下:
/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
* any ephemeral nodes here.
*
* @throws Exception
* On any error.
*/
@throws(classOf[Exception])
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
b) 分别打印出 5 6 7 三个节点的日志, 参考附件。(此时只有 1 借点, 就是 5 那台服务器在 /broker/ids/ 目录下有对应的节点)
6 7 两台的节点这个线程通过 (jstack -l pid > jstack.txt) 查找如下, 发现会有死锁的情况。 里面的 .Utils$.inLock 出现两次
可能出现死锁的情况
"ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181" daemon prio=10 tid=0x00007fb10038e800 nid=0x7d93 waiting on condition [0x00007fb0f544a000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000e4f4a760> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
at kafka.controller.TopicDeletionManager.shutdown(TopicDeletionManager.scala:93)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:340)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:337)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:337)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067)
at kafka.utils.Utils$.inLock(Utils.scala:538)
at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067)
at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
5 节点的 如下;
"ZkClient-EventThread-12-10.3.63.8:2181,10.3.63.9:2181" daemon prio=10 tid=0x00007f527c386000 nid=0x5738 waiting on condition [0x00007f5272748000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000e4946a50> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
Locked ownable synchronizers:
- None
为什么有时候又能正常注册到 /broker/ids/ 目录下面呢, 因为注册 IZkStateListener 事件的时候, 有可能 KafkaHealthcheck 先注册????
死锁是因为 调用 KafkaController handleNewSession 时,先抢占锁 inLock(controllerContext.controllerLock), 然后抢占锁以后执行
onControllerResignation 方法时又去抢占锁 inLock(controllerContext.controllerLock) , 而且锁是同一把锁。 都是 controllerContext.controllerLock
线上环境对比测试
[root@SJSWT43-204 logs]# grep "ZK expired" ./controller.log*
./controller.log.2014-07-31-19:[2014-07-31 19:51:30,400] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-01-10:[2014-08-01 10:08:08,368] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-01-12:[2014-08-01 12:00:16,036] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-06-14:[2014-08-06 14:33:55,994] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
[root@SJSWT43-205 logs]# grep "ZK expired" ./controller.log*
./controller.log.2014-07-31-19:[2014-07-31 19:59:40,634] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-01-11:[2014-08-01 11:38:14,628] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-01-12:[2014-08-01 12:25:16,387] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-06-14:[2014-08-06 14:34:14,481] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
./controller.log.2014-08-06-14:[2014-08-06 14:35:31,266] INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
查看日志也出现了如下的日志:
[2014-08-06 14:33:55,994] INFO [SessionExpirationListener on 0], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)
[2014-08-06 14:33:56,005] INFO [delete-topics-thread], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
但是在对应的 jstack 日志种没有发现关于 Utils$.inLock( 的死锁。 但是 在 204 和 205 上的jstack日志中没有找到死锁的线程,
jstack 日志 具体参考附件。
相关推荐
10. **故障恢复测试**:模拟服务器宕机或网络中断,检查Kafka的恢复机制和数据一致性。 通过以上步骤,我们可以全面地测试Kafka的消息推送功能。WindowsFormsApp1作为测试工具,可以帮助我们直观地观察和控制生产与...
1. **日志存储**:Kafka将消息持久化到磁盘,形成一种类似于日志的结构,这样即使服务器宕机,数据也不会丢失。每条消息都有一个唯一的序列号,确保消息的顺序。 2. **批量发送**:生产者可以批量发送消息,减少...
5. **错误处理**:在生产环境中,我们需要处理网络故障、服务器宕机等问题。Java生产者提供了重试机制和回调函数,可以捕获发送失败的情况,并根据业务需求决定是否重新发送。 6. **配置优化**:为了获得最佳性能,...
2. **持久化**:Kafka将消息持久化到磁盘,即使在服务器宕机后也能保证数据不丢失。 3. **容错性**:通过复制和分区策略,Kafka能够容忍集群中的节点故障。 4. **实时处理**:Kafka支持实时数据流处理,可与其他...
如果一个消费者组中的消费者宕机,其分配的任务会转移到组内的其他消费者。 11. **偏移量管理**: 消费者记录它在每个分区上的读取位置(称为偏移量)。Kafka自动或手动提交偏移量,以确保消息不会被重复消费。 12....
- **错误处理和容错**:通过测试异常情况,如网络中断、服务器宕机等,检验Kafka的容错机制。 通过这个压缩包中的资源,你可以深入理解Kafka的架构、使用方式以及如何编写Java代码来操作Kafka。同时,实践案例将...
- **容错性**:如果某个消费者宕机,其负责的分区会被其他消费者接管,确保消息不会丢失。 - **消费者位移**:消费者可以保存其消费进度,即使重启后也能从上次的位置继续消费,无需重新处理旧消息。 **五、最佳...
在多版本并行开发测试环境中,Kafka作为消息中间件,需要处理服务复用和隔离的问题,确保不同版本的服务之间能够正确地发送和消费消息。这个问题比Dubbo的服务路由与隔离更为复杂,因为它涉及到不同版本之间的消息...
4. **错误处理和容错性**:测试 Kafka 在网络故障、服务器宕机等情况下的恢复能力,比如检查 ISR (In-Sync Replicas) 和副本策略。 5. **消息持久化**:确保消息在磁盘上的持久化过程无误,包括确认消息在写入后不会...
- **命令行工具**:Kafka提供了一系列命令行工具,如`kafka-console-producer`和`kafka-console-consumer`,方便进行生产消费测试和集群管理。 4. **Kafka的高级特性** - **Kafka Streams**:Kafka提供的原生流...
10. **测试和部署**:作为一款工具,"Kafka_Master_Control"需要进行充分的单元测试和集成测试,确保其在各种环境下的稳定性和可靠性。同时,了解Docker和Kubernetes等容器化技术可以帮助简化部署流程。 通过以上...
Kafka的特性之一是消息的持久化,即使在服务器宕机后也能保证数据不丢失。`kafka-clj`客户端利用这一特性,确保了消息的可靠传递。 ### 7. 集成其他Clojure库 `kafka-clj`与其他Clojure库如`core.async`、`aleph`...
- **故障恢复**:`kafka-connector` 应该有内置的机制来处理网络故障和服务器宕机,确保在这些问题发生后能够恢复连接并继续工作。 6. **集成与测试** - **与其他 Rust 库的兼容性**:`kafka-connector` 应该设计...
- 如果集群中某台服务器宕机,通常可以通过增加新的服务器并重新加入集群来恢复服务。 总的来说,Apache ZooKeeper 是一个至关重要的分布式协调工具,它的最新版本3.7.0在稳定性、性能和安全性上都有所提升,能够...
8. **故障恢复**:ZooKeeper通过选举机制来实现故障恢复,当主服务器宕机时,会选举新的主服务器来确保服务的连续性。 9. **ZAB协议**:ZooKeeper Atomic Broadcast(ZAB)协议是ZooKeeper实现数据一致性的基础,它...
此外,系统还进行了线上破坏性演练,模拟如机器宕机、网络问题、存储服务中断等场景,以验证系统的稳定性。 总的来说,Sunfire是针对万亿交易量级下实现秒级监控的解决方案,它通过高效的数据处理架构、精确的数据...
宕机可能由硬件故障、配置错误、资源耗尽等原因引起。应对策略包括定期维护、日志分析、故障转移和恢复策略。 **1.2.10 Hadoop 解决数据倾斜方案** 数据倾斜可能导致部分节点过载,解决方案包括重新设计Key以分散...