`
jamie.wang
  • 浏览: 347632 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Sensu Server源码剖析

阅读更多

Sensu Server是Sensu的核心组件,是Sensu业务逻辑实现的核心组件。

Server的启动

Sensu Server的入口在sensu/bin/sensu-server
options = Sensu::CLI.read
Sensu::Server::Process.run(options)
和这里Client几乎一样,解析命令行参数,调用Process.run
run方法也是,创建一个Server的服务实例,启动服务:server.start,然后处理信号
再看start方法

def start
  setup_redis
  setup_transport
  bootstrap
end

 
setup_redis:连接redis;
setup_transport:连接RabbitMQ;
bootstrap:启动;
接下来看bootstrap方法

def bootstrap
  setup_keepalives
  setup_results
  setup_master_monitor
  @state = :running
end

 
setup_keepalives:保持和客户端的联系:订阅客户端的注册的消息,当客户端注册过来(客户端每过20秒会注册自己),则将客户端添加到客户端队列里;
setup_results:订阅check的结果并处理结果;
setup_master_monitor:为保证server的HA定时选举master;
最后将该server的状态设置为running
这时候server就启动起来了

处理check结果

重点关注:process_check_result方法,该方法主要对check的结果存储,并对相同错误累计,并处理结果

def process_check_result(result)
  @logger.debug("processing result", :result => result)
  @redis.get("client:#{result[:client]}") do |client_json| # 找到result json里对应的client
    unless client_json.nil?
      client = MultiJson.load(client_json)
      check = case
      when @settings.check_exists?(result[:check][:name]) && !result[:check][:standalone] # 如果check不在配置里,且不为standalone check
        @settings[:checks][result[:check][:name]].merge(result[:check]) # 则添加一个check
      else
        result[:check]
      end
      aggregate_check_result(result) if check[:aggregate] # 如果需要聚合,聚合check结果,稍后再看
      store_check_result(client, check) do # 存储check结果
        check_history(client, check) do |history, total_state_change| # 计算状态改变百分比
          check[:history] = history
          check[:total_state_change] = total_state_change
          # 创建一个事件,如果该次check失败,则增加该事件的失败次数,并记录,如果之前失败,这次成功,则删除该事件的失败记录
          update_event_registry(client, check) do |event|
            process_event(event) # 调用handler处理事件
          end
        end
      end
    else
      @logger.warn("client not in registry", :client => result[:client])
    end
  end
end

 
存储结果的方法:store_check_result

def store_check_result(client, check, &callback)
  @redis.sadd("history:#{client[:name]}", check[:name]) # 添加该check到该client的history里,如果已存在则忽略,history:#{client[:name]}包含了该client的所有check
  result_key = "#{client[:name]}:#{check[:name]}"
  history_key = "history:#{result_key}"
  @redis.rpush(history_key, check[:status]) do # 将check的结果push到历史队列里
  @redis.set("execution:#{result_key}", check[:executed])
    @redis.ltrim(history_key, -21, -1) # 只保存21条历史记录
    callback.call
  end
end

 
该方法将check的结果添加到历史队列里,即:redis的队列里。并且只保存21条历史记录。
处理事件方法:process_event,该方法获取事件里的handlers,并循环处理,在处理前会过滤和修改。

def process_event(event)
  log_level = event[:check][:type] == "metric" ? :debug : :info # 如果是metric,日志级别为debug
  @logger.send(log_level, "processing event", :event => event)
  event_bridges(event)
  handler_list = Array((event[:check][:handlers] || event[:check][:handler]) || "default") # 找到处理的handler(s)
  handlers = derive_handlers(handler_list) # 根据handler名字找到handler定义
  handlers.each do |handler| # 循环处理
    @handling_event_count += 1
    filter_event(handler, event) do |event| # 过滤事件
      mutate_event(handler, event) do |event_data| # 修改事件
        handle_event(handler, event_data) # 处理事件
      end
    end
  end
end

 
handler的handle_event方法,记录日志,调用handler_type_router,该方法根据不同的处理类型调用不同的处理方法

def handler_type_router(handler, event_data)
  case handler[:type]
  when "pipe"
    pipe_handler(handler, event_data)
  when "tcp"
    tcp_handler(handler, event_data)
  when "udp"
    udp_handler(handler, event_data)
  when "transport"
    transport_handler(handler, event_data)
  when "extension"
    handler_extension(handler, event_data)
  end
end

 
pipe_handler:执行配置的脚本,并日志记录结果;
 tcp_handler:发送数据到handler配置的TCP主机和端口;
udp_handler:和tcp_handler一样,只是发送的时UDP包;
transport_handler:发布一个事件到RabbitMQ;
handler_extension:处理扩展;

 

 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics