`
xly1981
  • 浏览: 146686 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

基于zookeeper动态扩展处理分类数据

阅读更多
背景:
日终处理分户账记录的转逾期,数据量越来越大,单机处理时间已经不能忍受,考虑重构批处理逻辑。
场景特性:分户账转逾期处理,每条记录和别的记录互不相干
因此可以考虑把分户账信息分类处理,该方案可以方便的让执行程序随着分户账数据的不断增加,任意扩展到多个虚拟机,或者在同一个JVM内使用多线程处理。
待完善部分:某个任务处理失败,需要在回调函数增加处理,记录失败的Id号,因为是跑批另外最后两个类里面还可以加上工作日信息,在处理逻辑中加一层校验;在后台修复数据后,增加添加任务接口,让剩余的这个ID的创建新的节点,并重新执行。

第一步;
数据预处理,基于分户账记录的主键,hash后对128(数值可以取大点)取模,把数据分成128份,在此字段建索引

第二步:
利用zookeeper,建立阻塞消息队列

第三部:
任务分发系统MissionMaker定时执行,从数据库取出128这个值和任务名称,在/Queue/operation_yuqi节点下,以此创建128个持久化排序队列,把i值放到节点取值里面

第四步:
任务执行系统TaskExecuter监听/Queue/operation_yuqi节点的队列变化情况,发生变更,就在里面取出一个节点,并读出节点的数据(0-127),然后执行自己管理的部分分户账数据




阻塞队列

public class DistributedBlockingQueue<T> {      
	protected final ZkClient zkClient;
	protected final String root;

	protected static final String Node_NAME = "n_";

	
    public DistributedBlockingQueue(ZkClient zkClient, String root,String taskName) {
    	this.zkClient = zkClient;
		this.root = root.concat("/").concat(taskName);
	}
    
    public boolean offer(T element) throws Exception{
    	String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
        try {
            zkClient.createPersistentSequential(nodeFullPath , element);
        }catch (ZkNoNodeException e) {
        	zkClient.createPersistent(root);
        	offer(element);
        } catch (Exception e) {
            throw ExceptionUtil.convertToRuntimeException(e);
        }
        return true;
    }
    
	public T poll(TaskCallBack back) throws Exception {
		while (true){
			
			final CountDownLatch    latch = new CountDownLatch(1);
			final IZkChildListener childListener = new IZkChildListener() {
				
				public void handleChildChange(String parentPath, List<String> currentChilds)
						throws Exception {
					System.out.println(Thread.currentThread().getName()+",发现任务队列长度发生变化!");
					latch.countDown();
				}
			};
			zkClient.subscribeChildChanges(root, childListener);
			try{
				List<String> list = zkClient.getChildren(root);
				T node = null;
				if (list.size() == 0) {
					
				}else{
					Collections.sort(list, new Comparator<String>() {
						public int compare(String lhs, String rhs) {
							return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
						}
					});
				}
				
				String nodeN = "";
				for ( String nodeName : list ){
					String nodeFullPath = root.concat("/").concat(nodeName);
					try {
						node = (T) zkClient.readData(nodeFullPath);
						Boolean bb = zkClient.delete(nodeFullPath);
						if(bb){
							nodeN = nodeName;
						}
					} catch (ZkNoNodeException e) {
						node=null;
						// ignore
					}
					break;
				}
	            if (node != null && null!=nodeN && !"".equals(nodeN)){
					back.doTask(node);
	            }else{
	            	latch.await();
	            }
			}finally{
				zkClient.unsubscribeChildChanges(root, childListener);
				
			}
			
		}
	}
    
	public T getPoll() throws Exception {
		
		try {

			List<String> list = zkClient.getChildren(root);
			if (list.size() == 0) {
				return null;
			}
			Collections.sort(list, new Comparator<String>() {
				public int compare(String lhs, String rhs) {
					return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
				}
			});
			
			for ( String nodeName : list ){
				String nodeFullPath = root.concat("/").concat(nodeName);	
				try {
					T node = (T) zkClient.readData(nodeFullPath);
					zkClient.delete(nodeFullPath);
					return node;
				} catch (ZkNoNodeException e) {
					// ignore
				}
			}
			return null;
		} catch (Exception e) {
			throw ExceptionUtil.convertToRuntimeException(e);
		}

	}



	private String getNodeNumber(String str, String nodeName) {
		int index = str.lastIndexOf(nodeName);
		if (index >= 0) {
			index += Node_NAME.length();
			return index <= str.length() ? str.substring(index) : "";
		}
		return str;
	}
	

}

//创建任务
public class MissionMaker implements Runnable{
	
	private String root;
	
	private ZkClient zkClient;
	
	public MissionMaker(String url,String root){
		this.zkClient =  new ZkClient(url, 5000, 5000, new SerializableSerializer());
		this.root = root;
	}
	
	public TaskBean getTaskBean(){
		TaskBean taskBean = new TaskBean();
		taskBean.setTaskName("operation_yuqi");//逾期
		taskBean.setHashNum(16);
		return taskBean;
	}
	
	//创建任务
	public void createTasks() throws Exception {
		TaskBean taskBean = getTaskBean();
		DistributedBlockingQueue queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskBean.getTaskName());
		System.out.println("create mission:"+taskBean.getHashNum());
		for(int i=0;i<taskBean.getHashNum();i++){
			OprBean oprBean = new OprBean();
			oprBean.setHashId(i+"");
			queueMaker.offer(oprBean);
		}
	}
	
	public void run(){
		try {
			while (true) {
				createTasks();
				Thread.sleep(30000);
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

//执行任务
public class TaskExecuter implements Runnable{

	private String root;
	
	private ZkClient zkClient;
	
	private String taskName;
	
	private DistributedBlockingQueue queueMaker;
	
	public TaskExecuter(String url,String root,String taskName){
		this.zkClient =  new ZkClient(url, 5000, 5000, new SerializableSerializer());
		this.root = root;
		this.taskName = taskName;
		this.queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskName);
	}
	
	public void run(){
		try {
			queueMaker.poll(new TaskCallBack<OprBean>() {

				@Override
				public void doTask(OprBean oprBean) {
					// TODO Auto-generated method stub
					try {
						System.out.println("线程"+Thread.currentThread().getName()+"执行计算任务,taskName="+taskName+",hashId = "+oprBean.getHashId());
						Thread.sleep(2000);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}

				@Override
				public void afterTask(String hashId) {
					// TODO Auto-generated method stub
					System.out.println("更新数据库记录,taskName="+taskName+",hashId = "+hashId);
				}
				
			});
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
}

//回调接口
public interface TaskCallBack<T> {

	void doTask(T oprBean);
	
	void afterTask(String hashId);
}


序列化到zookeeper的类
public class OprBean implements Serializable{

	/**
	 * 
	 */
	private static final long serialVersionUID = 2840329402832770757L;

	private String hashId;
	
	private String oprResult;

	public String getHashId() {
		return hashId;
	}

	public void setHashId(String hashId) {
		this.hashId = hashId;
	}

	public String getOprResult() {
		return oprResult;
	}

	public void setOprResult(String oprResult) {
		this.oprResult = oprResult;
	}
	
	
}


获取数据库信息的类
public class TaskBean {
	
	private String taskName;
	
	private Integer hashNum;

	public String getTaskName() {
		return taskName;
	}

	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}

	public Integer getHashNum() {
		return hashNum;
	}

	public void setHashNum(Integer hashNum) {
		this.hashNum = hashNum;
	}
	
	

}
分享到:
评论

相关推荐

    基于Zookeeper和guava动态限流 源码

    本文将深入探讨如何利用Zookeeper和Guava来实现动态限流,并基于提供的"基于Zookeeper和guava动态限流 源码"进行解析。我们将首先理解Zookeeper和Guava的基本概念,然后介绍它们如何协同工作以实现动态限流,最后会...

    基于ZooKeeper的分布式Session实现

    标题 "基于ZooKeeper的分布式Session实现" 涉及的是在分布式系统中如何利用Apache ZooKeeper来管理和共享Session信息。ZooKeeper是一款开源的分布式协调服务,它为分布式应用程序提供了一个简单一致的接口,用于处理...

    基于zookeeper和storm的车载流式计算框架

    为了实时处理这些数据并从中提取有价值的信息,一种基于Zookeeper和Storm的车载流式计算框架被提出。本文将详细介绍这一框架的设计原理、关键技术以及应用场景。 #### 二、Zookeeper简介 Apache Zookeeper是一个...

    基于zookeeper的hadoop ha集群安装过程

    基于ZooKeeper的Hadoop HA集群可以提供高可用性和可扩展性,满足企业对大数据处理的需求。在本文中,我们将详细介绍基于ZooKeeper的Hadoop HA集群的安装过程。 一、 安装环境介绍 在安装基于ZooKeeper的Hadoop HA...

    基于ZooKeeper的一种分布式通信服务中间件的研究.pdf

    本研究基于ZooKeeper,设计并实现了一种分布式通信服务中间件。ZooKeeper是一个开源的分布式协调服务,它能够提供高性能、可扩展的服务协调功能,如分布式锁、配置管理、命名服务等。在本研究中,利用ZooKeeper提供...

    基于ZooKeeper的一种分布式系统架构设计与实现.pdf

    本文介绍了一种基于ZooKeeper的分布式系统架构设计与实现,展示了如何通过分布式服务模型和异步通信来实现服务注册、服务监管、服务加载、通信服务等关键功能,并通过实际项目对所设计架构进行测试验证,证明了该...

    基于zookeeper注册发现服务的springbootDemo

    总的来说,这个"基于zookeeper注册发现服务的springbootDemo"是一个很好的学习资源,它涵盖了SpringBoot、Zookeeper、微服务、MyBatis和Durid等多个重要技术点,有助于理解并实践分布式系统中的服务治理。

    C#基于zookeeper分布式锁的实现源码

    总之,C#中基于ZooKeeper的分布式锁实现涉及对ZooKeeper的操作,包括创建临时顺序节点、监听节点变化以及正确释放锁。这样的实现方式保证了在分布式环境下的并发控制和数据一致性,同时具备良好的扩展性和容错性。...

    基于ZooKeeper的警用装备分布式数据交换系统的设计与实现.pdf

    7. 大数据技术在业务数据采集和交换中的应用:大数据技术的应用可以解决警用装备管理中业务数据采集和交换过程中的可靠性问题,通过对数据的集中采集、分析和同步,确保数据处理和分析结果的准确性和实时性,进而...

    maxwell 基于zookeeper的高可用方案

    Maxwell 是一个实时的数据同步工具,它可以从MySQL数据库中捕获更改,并将这些更改实时地发布到...这种架构允许在主节点故障时快速恢复服务,保证了数据的连续性和一致性,同时也提供了对集群动态扩展和管理的灵活性。

    基于ZooKeeper的配置信息存储方案的设计与实现

    在当今的IT领域,云计算和分布式计算已经成为研究和应用的热点,特别是在处理海量数据的背景下,分布式计算的重要性日益凸显。分布式计算的一个关键环节是分布式应用的配置信息管理,它对于系统的稳定运行和快速响应...

    基于Zookeeper搭建Kafka高可用集群

    以上就是基于Zookeeper搭建Kafka高可用集群的详细步骤和知识点,确保了服务的稳定性和扩展性。在实际生产环境中,还需要考虑网络环境、安全性、监控和日志管理等因素,以确保整个系统的高效运行。

    基于Zookeeper实现分布式锁实践教程

    分布式锁的实现基于Zookeeper的临时节点和监视器机制。通常,每个想要获取锁的客户端会在特定的zNode下创建一个临时节点。如果创建成功,说明获取了锁;如果失败,说明已有其他客户端持有锁。客户端还可以设置监视器...

    Storm+Zookeeper 流模式大数据处理部署手册

    - 扩展性:随着业务增长,可以动态添加更多服务器来扩展集群,提高处理能力。 - 安全性:考虑实施安全策略,如SSL加密、访问控制等,以保护数据安全。 总结,通过本方案,我们可以构建一个稳定、高效的大数据处理...

    基于ZooKeeper的分布式Session实现_已发布.docx

    【基于ZooKeeper的分布式Session实现】 ZooKeeper是一个分布式协调服务,源于Apache Hadoop项目,现已成为一个独立的子项目。它旨在提供高可用性、高性能的协调服务,适用于分布式环境中的命名、配置管理、同步和组...

    基于zookeeper+levelDB的ActiveMQ集群测试代码

    接下来,我们将详细讲解如何配置和运行这个基于ZooKeeper和LevelDB的ActiveMQ集群测试代码: 1. **安装与配置ZooKeeper**: 首先,你需要在集群的每台机器上安装ZooKeeper,配置`conf/zoo.cfg`文件,设置集群节点...

    基于Zookeeper 和 Dubbo框架的电商项目

    《基于Zookeeper和Dubbo框架的电商项目解析》 在当今的互联网时代,电商项目已经成为了商业运营的重要组成部分。为了实现高效、稳定的运行,开发者们通常会采用分布式架构来构建电商平台。本项目就是一个典型的例子...

    zookeeper查看工具

    3. **节点数据查看与编辑**:用户可以查看和修改Zookeeper中的任何节点数据,包括二进制数据和文本数据。 4. **权限管理**:支持查看和编辑节点的ACL(访问控制列表),实现对Zookeeper资源的细粒度访问控制。 5. ...

Global site tag (gtag.js) - Google Analytics