一、入口类是Main方法,读取zoo.cfg文件
org.apache.zookeeper.server.quorum.QuorumPeerMain
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
二、读取zoo.cfg配置文件,初始化参数
protected void initializeAndRun(String[] args){
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
//读取zoo.cfg文件,默认通过args传递进来的
config.parse(args[0]);
}
// 启动定时任务,清理ZK的数据目录,防止文件过大
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
三、调用Properties 方法load配置文件
/**
* Parse a ZooKeeper configuration file
* @param path the patch of the configuration file
* @throws ConfigException error processing configuration
*/
public void parse(String path) throws ConfigException {
File configFile = new File(path);
LOG.info("Reading configuration from: " + configFile);
try {
if (!configFile.exists()) {
throw new IllegalArgumentException(configFile.toString()
+ " file is missing");
}
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
} finally {
in.close();
}
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
四、遍历Properties 获取属性值
/**
* Parse config from a Properties.
* @param zkProp Properties to parse from.
* @throws IOException
* @throws ConfigException
*/
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
int clientPort = 0;
String clientPortAddress = null;
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = value;
} else if (key.equals("dataLogDir")) {
dataLogDir = value;
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
} else if (key.equals("maxClientCnxns")) {
maxClientCnxns = Integer.parseInt(value);
} else if (key.equals("minSessionTimeout")) {
minSessionTimeout = Integer.parseInt(value);
} else if (key.equals("maxSessionTimeout")) {
maxSessionTimeout = Integer.parseInt(value);
} else if (key.equals("initLimit")) {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
} else if (key.equals("quorumListenOnAllIPs")) {
quorumListenOnAllIPs = Boolean.parseBoolean(value);
} else if (key.equals("peerType")) {
if (value.toLowerCase().equals("observer")) {
peerType = LearnerType.OBSERVER;
} else if (value.toLowerCase().equals("participant")) {
peerType = LearnerType.PARTICIPANT;
} else
{
throw new ConfigException("Unrecognised peertype: " + value);
}
} else if (key.equals( "syncEnabled" )) {
syncEnabled = Boolean.parseBoolean(value);
} else if (key.equals("autopurge.snapRetainCount")) {
snapRetainCount = Integer.parseInt(value);
} else if (key.equals("autopurge.purgeInterval")) {
purgeInterval = Integer.parseInt(value);
} else if (key.startsWith("server.")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
String parts[] = value.split(":");
if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
//Zookeeper 配置集群的三种方法
LOG.error(value
+ " does not have the form host:port or host:port:port " +
" or host:port:port:type");
}
InetSocketAddress addr = new InetSocketAddress(parts[0],
Integer.parseInt(parts[1]));
if (parts.length == 2) {
servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
} else if (parts.length == 3) {
InetSocketAddress electionAddr = new InetSocketAddress(
parts[0], Integer.parseInt(parts[2]));
servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
electionAddr));
} else if (parts.length == 4) {
InetSocketAddress electionAddr = new InetSocketAddress(
parts[0], Integer.parseInt(parts[2]));
LearnerType type = LearnerType.PARTICIPANT;
if (parts[3].toLowerCase().equals("observer")) {
type = LearnerType.OBSERVER;
observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
electionAddr,type));
} else if (parts[3].toLowerCase().equals("participant")) {
type = LearnerType.PARTICIPANT;
servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
electionAddr,type));
} else {
throw new ConfigException("Unrecognised peertype: " + value);
}
}
} else if (key.startsWith("group")) {
int dot = key.indexOf('.');
long gid = Long.parseLong(key.substring(dot + 1));
numGroups++;
String parts[] = value.split(":");
for(String s : parts){
long sid = Long.parseLong(s);
if(serverGroup.containsKey(sid))
throw new ConfigException("Server " + sid + "is in multiple groups");
else
serverGroup.put(sid, gid);
}
} else if(key.startsWith("weight")) {
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
serverWeight.put(sid, Long.parseLong(value));
} else {
System.setProperty("zookeeper." + key, value);
}
}
// Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3)
// PurgeTxnLog.purge(File, File, int) will not allow to purge less
// than 3.
if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) {
LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount
+ ". Defaulting to " + MIN_SNAP_RETAIN_COUNT);
snapRetainCount = MIN_SNAP_RETAIN_COUNT;
}
if (dataDir == null) {
throw new IllegalArgumentException("dataDir is not set");
}
if (dataLogDir == null) {
dataLogDir = dataDir;
} else {
if (!new File(dataLogDir).isDirectory()) {
throw new IllegalArgumentException("dataLogDir " + dataLogDir
+ " is missing.");
}
}
if (clientPort == 0) {
throw new IllegalArgumentException("clientPort is not set");
}
if (clientPortAddress != null) {
this.clientPortAddress = new InetSocketAddress(
InetAddress.getByName(clientPortAddress), clientPort);
} else {
this.clientPortAddress = new InetSocketAddress(clientPort);
}
if (tickTime == 0) {
throw new IllegalArgumentException("tickTime is not set");
}
if (minSessionTimeout > maxSessionTimeout) {
throw new IllegalArgumentException(
"minSessionTimeout must not be larger than maxSessionTimeout");
}
if (servers.size() == 0) {
if (observers.size() > 0) {
throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
}
// Not a quorum configuration so return immediately - not an error
// case (for b/w compatibility), server will default to standalone
// mode.
return;
} else if (servers.size() == 1) {
if (observers.size() > 0) {
throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
}
// HBase currently adds a single server line to the config, for
// b/w compatibility reasons we need to keep this here.
LOG.error("Invalid configuration, only one server specified (ignoring)");
servers.clear();
} else if (servers.size() > 1) {
if (servers.size() == 2) {
LOG.warn("No server failure will be tolerated. " +
"You need at least 3 servers.");
} else if (servers.size() % 2 == 0) {
LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
if (initLimit == 0) {
throw new IllegalArgumentException("initLimit is not set");
}
if (syncLimit == 0) {
throw new IllegalArgumentException("syncLimit is not set");
}
/*
* If using FLE, then every server requires a separate election
* port.
*/
if (electionAlg != 0) {
for (QuorumServer s : servers.values()) {
if (s.electionAddr == null)
throw new IllegalArgumentException(
"Missing election port for server: " + s.id);
}
}
/*
* Default of quorum config is majority
*/
if(serverGroup.size() > 0){
if(servers.size() != serverGroup.size())
throw new ConfigException("Every server must be in exactly one group");
/*
* The deafult weight of a server is 1
*/
for(QuorumServer s : servers.values()){
if(!serverWeight.containsKey(s.id))
serverWeight.put(s.id, (long) 1);
}
/*
* Set the quorumVerifier to be QuorumHierarchical
*/
quorumVerifier = new QuorumHierarchical(numGroups,
serverWeight, serverGroup);
} else {
/*
* The default QuorumVerifier is QuorumMaj
*/
LOG.info("Defaulting to majority quorums");
quorumVerifier = new QuorumMaj(servers.size());
}
// Now add observers to servers, once the quorums have been
// figured out
servers.putAll(observers);
File myIdFile = new File(dataDir, "myid");
if (!myIdFile.exists()) {
throw new IllegalArgumentException(myIdFile.toString()
+ " file is missing");
}
BufferedReader br = new BufferedReader(new FileReader(myIdFile));
String myIdString;
try {
myIdString = br.readLine();
} finally {
br.close();
}
try {
serverId = Long.parseLong(myIdString);
MDC.put("myid", myIdString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("serverid " + myIdString
+ " is not a number");
}
// Warn about inconsistent peer type
LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER
: LearnerType.PARTICIPANT;
if (roleByServersList != peerType) {
LOG.warn("Peer type from servers list (" + roleByServersList
+ ") doesn't match peerType (" + peerType
+ "). Defaulting to servers list.");
peerType = roleByServersList;
}
}
}
五、启动清理数据文件的进程
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
timer = new Timer("PurgeTask", true);
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
六、启动线程进行清理
static class PurgeTask extends TimerTask {
private String logsDir;
private String snapsDir;
private int snapRetainCount;
public PurgeTask(String dataDir, String snapDir, int count) {
logsDir = dataDir;
snapsDir = snapDir;
snapRetainCount = count;
}
@Override
public void run() {
LOG.info("Purge task started.");
try {
PurgeTxnLog.purge(new File(logsDir), new File(snapsDir), snapRetainCount);
} catch (Exception e) {
LOG.error("Error occured while purging.", e);
}
LOG.info("Purge task completed.");
}
}
七、真正的过滤删除文件
/**
* purges the snapshot and logs keeping the last num snapshots
* and the corresponding logs.
* @param dataDir the dir that has the logs
* @param snapDir the dir that has the snapshots
* @param num the number of snapshots to keep
* @throws IOException
*/
public static void purge(File dataDir, File snapDir, int num) throws IOException {
if (num < 3) {
throw new IllegalArgumentException("count should be greater than 3");
}
FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);
// found any valid recent snapshots?
// files to exclude from deletion
Set<File> exc=new HashSet<File>();
List<File> snaps = txnLog.findNRecentSnapshots(num);
if (snaps.size() == 0)
return;
File snapShot = snaps.get(snaps.size() -1);
for (File f: snaps) {
exc.add(f);
}
long zxid = Util.getZxidFromName(snapShot.getName(),"snapshot");
exc.addAll(Arrays.asList(txnLog.getSnapshotLogs(zxid)));
final Set<File> exclude=exc;
class MyFileFilter implements FileFilter{
private final String prefix;
MyFileFilter(String prefix){
this.prefix=prefix;
}
public boolean accept(File f){
if(!f.getName().startsWith(prefix) || exclude.contains(f))
return false;
return true;
}
}
// add all non-excluded log files
List<File> files=new ArrayList<File>(
Arrays.asList(txnLog.getDataDir().listFiles(new MyFileFilter("log."))));
// add all non-excluded snapshot files to the deletion list
files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(new MyFileFilter("snapshot."))));
// remove the old files
for(File f: files)
{
System.out.println("Removing file: "+
DateFormat.getDateTimeInstance().format(f.lastModified())+
"\t"+f.getPath());
if(!f.delete()){
System.err.println("Failed to remove "+f.getPath());
}
}
}
八、这个函数功能自己研究
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
相关推荐
《Zookeeper 3.4.6:超稳定版详解与应用》 Zookeeper,作为Apache的一个顶级项目,是分布式协调服务的基石,广泛应用于大数据、云计算等领域的分布式系统中。Zookeeper 3.4.6是其历史上的一个重要版本,以其高度的...
在标题“zookeeper-3.4.6_zookeeper_”中,我们可以看到这是关于Zookeeper的一个特定版本——3.4.6的讨论。这个版本的发布对于理解和使用Zookeeper至关重要,因为它包含了该框架的稳定性和功能增强。 在描述“注册...
《Zookeeper 3.4.6:分布式协调服务详解》 Apache ZooKeeper 是一个开源的分布式协调服务,它为分布式应用程序提供一致性服务。在本文中,我们将深入探讨Zookeeper 3.4.6版本,了解其核心概念、功能以及如何进行...
《Apache ZooKeeper 3.4.6:分布式协调服务详解》 Apache ZooKeeper 是一个开源的分布式协调服务,它为分布式应用提供了一个高效且可靠的命名服务、配置管理、集群同步和分布式锁等基础功能。在Zookeeper 3.4.6版本...
zookeeper-3.4.6下载 zookeeper-3.4.6下载zookeeper-3.4.6下载zookeeper-3.4.6下载zookeeper-3.4.6下载zookeeper-3.4.6下载zookeeper-3.4.6下载zookeeper-3.4.6下载
1. **源代码**:提供了完整的Zookeeper服务器端和客户端的源代码,开发者可以通过阅读源码了解其内部工作原理,或者进行定制化开发。 2. **文档**:包括用户指南、管理员手册和开发者指南,帮助用户快速上手,理解...
在了解Zookeeper3.4.6的安装过程之前,我们首先需要理解Zookeeper的基本概念和架构。Zookeeper采用的是基于Paxos算法的ZAB协议,保证了数据的一致性和可靠性。它的架构是由一个或多个Server节点组成的集群,每个...
Zookeeper是Apache Hadoop项目下的一个子项目,它是一个分布式的,开放源码的分布式应用程序协调服务。作为一款高效、可靠的分布式协调框架,Zookeeper在分布式环境中扮演着至关重要的角色,尤其在解决一致性问题上...
首先,你需要从Apache官方网站获取ZooKeeper 3.4.6的安装包,文件名为`zookeeper-3.4.6.tar.gz`。下载完成后,将其解压到你的服务器或本地机器的合适目录,例如 `/usr/local`: ```bash wget ...
1. 下载Zookeeper 3.4.6源码:从Apache官方网站下载Zookeeper 3.4.6的源代码压缩包,解压到本地目录。 2. 创建Eclipse项目:在Eclipse中,选择File > New > Java Project,输入项目名(例如“Zookeeper_3.4.6”),...
总的来说,ZooKeeper 3.4.6在Windows上的使用为开发者提供了便捷的本地环境,便于学习、测试和调试分布式系统,尤其是对于使用Dubbo构建的应用,它扮演了关键的角色,帮助实现服务治理和分布式一致性。了解并熟练...
以下是关于"zookeeper3.4.6和jdk1.7(linux)"的详细知识点: 1. **Zookeeper**: - **定义**:Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态,并根据...
Zookeeper 3.4.6版本是该系统的一个稳定版本,提供了许多关键特性和改进。 一、Zookeeper的核心概念 1. 会话(Session):在Zookeeper中,客户端与服务器之间的连接称为会话。会话具有一定的超时时间,如果在超时...
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的接口和性能高效、功能稳定的系统提供给用户。...
《Zookeeper 3.4.6:分布式协调服务的核心与实践》 Zookeeper,作为Apache的一个顶级项目,是分布式系统中的关键组件,尤其在大型分布式环境中的服务协调、配置管理、集群状态管理等方面发挥着重要作用。本文将深入...
zookeeper3.4.6.rar"所涵盖的是一个整合了三个关键组件的压缩包:Tomcat服务器、Dubbo服务治理框架以及ZooKeeper分布式协调服务。这个组合在企业级Java应用开发中非常常见,主要用于构建高效、可扩展的微服务架构。 ...
在本文中,我们将详细介绍如何在Linux环境下安装和配置Zookeeper 3.4.6版本。 首先,安装Zookeeper的前提条件是需要JDK 1.6或更高版本。确保已正确安装并设置了Java环境变量。可以通过`java -version`命令来检查...
《Curator与Zookeeper在3.4.6与2.9.1版本中的协同工作》 Apache Curator和Zookeeper是两个在分布式系统管理中至关重要的组件。Zookeeper作为一个开源的分布式协调服务,广泛用于配置维护、命名服务、分布式同步等...