`

Thread_跨节点集合查询

 
阅读更多

项目中数据库进行了水平切分,为了处理跨节点集合查询,采用了多线程并发操作的方式来处理,并且对各线程执行的结果进行操作,如果是返回结果集,则合并排序;如果是聚合操作,则求和。个人觉得该实现方式很有代表性,所以抽取了原型实现备忘。

 

集合处理:

 

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;

/**
 * 集合查询
 * 
 * 场景:	数据库水平切分后,集合查询要对所有数据源进行操作;
 * 例如:	select count(*) from tablename,这个sql要广播到所有节点上执行;
 * 关键点:	对从各数据源查询得到的记录数进行求和.
 * 
 * @author charles
 *
 */
public class Service {
	private static ExecutorService exec;
	private CompletionService<Object> completionService;
	
	
	public String invoke(String name){
		//init fixedpool
		FixedConnectionPool fixedPool = new FixedConnectionPool();
		exec = fixedPool.getExec();
		completionService = fixedPool.getCompletionService();
		
		for(int i = 0; i < 10; i++){
			Task task = new Task(name);
			if (!exec.isShutdown()) {
				completionService.submit(task);
			}
		}
		
		return getResult();
	}
	
	
	private String getResult(){
		Object result = null;
		Object obj = null;
		
		for(int i = 0; i < 10; i++){
			try {
				obj = completionService.take().get();
				result = merge(obj,result);
				
			} catch (InterruptedException e) {
				System.out.println(">>>> Service InterruptedException : "+ e.getMessage());
				e.printStackTrace();
				obj = null;
				
			} catch (ExecutionException e) {
				System.out.println(">>>> Service ExecutionException : "+ e.getMessage());
				e.printStackTrace();
				obj = null;
				
			} catch(Exception e){
				System.out.println(">>>> Service Exception : "+ e.getMessage());
				e.printStackTrace();
				obj = null;
			}
		}
		
		
		if(result != null)
			return (String)result;
		else
			return "";
	}
	
	
	public String merge(Object obj, Object result) throws Exception {
		if(obj == null)
			obj = "";
		
		if(result == null)
			result = "";
		
		//System.out.println(">>>> merge : "+ obj +"\n"+ result);
		return obj +"\n"+ result;
	}
	
	
	public static void main(String[] args){
		System.out.println(">>>> Interface : "+ new Service().invoke("冯天魁"));
		
		if (!exec.isShutdown())
			exec.shutdown();
	}
}

 

 

任务:

/**
 * 任务
 * @author charles
 *
 */
public class Task implements Callable<Object> {
	private String name;
	private static int i = 0;
	
	/**
	 * i是多线程共享变量
	 */
	public synchronized Object call() throws Exception {
		i++;
		return "Hello "+ name + i +", you are in a fixedpool!";
	}
	
	public Task(String name){
		this.name = name;
	}
}

 

 

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class FixedConnectionPool {
	private int cpuCoreNumber = 2;
	private ExecutorService exec;
	private CompletionService<Object> completionService;
	
	
	public FixedConnectionPool(){
		this.cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		//this.exec = Executors.newCachedThreadPool();
		this.exec = Executors.newFixedThreadPool(cpuCoreNumber);
		this.completionService = new ExecutorCompletionService<Object>(exec);
	}
	
	
	public int getCpuCoreNumber() {
		return cpuCoreNumber;
	}


	public void setCpuCoreNumber(int cpuCoreNumber) {
		this.cpuCoreNumber = cpuCoreNumber;
	}


	public ExecutorService getExec() {
		return exec;
	}


	public void setExec(ExecutorService exec) {
		this.exec = exec;
	}


	public CompletionService<Object> getCompletionService() {
		return completionService;
	}

	
	public void setCompletionService(CompletionService<Object> completionService) {
		this.completionService = completionService;
	}
	
}


 

0
0
分享到:
评论
5 楼 charles751 2012-09-20  
dacoolbaby 写道
觉得用Java进行排序效率很低。
一般是不是应该在数据库前面再建一个像memcache这样的东西?



怎么得到的java排序效率低?

你打算用memcached做数据缓存还是什么?

4 楼 dacoolbaby 2012-09-20  
觉得用Java进行排序效率很低。
一般是不是应该在数据库前面再建一个像memcache这样的东西?
3 楼 charles751 2012-09-19  
一般都是分页显示,每页显示的记录数不会很大,所以一般情况下结果集不会很大。
2 楼 charles751 2012-09-19  
zean 写道
这个问题很久以前就想过,如果多个节点返回的结果集都很大,怎么排序?


结果集大也没办法,汇总后排序才能保证排序的正确性。
1 楼 zean 2012-09-19  
这个问题很久以前就想过,如果多个节点返回的结果集都很大,怎么排序?

相关推荐

    Java8之lambda最佳实践_动力节点Java学院整理

    通过链式调用 map(), filter(), reduce() 等方法,开发者可以实现对集合的复杂操作,而无需显式使用循环。以下是一个简单的示例,展示了如何使用 Lambda 和 Stream API 打印列表中的元素: ```java // Prior Java 8...

    动力节点Java基础301集_史上最全的Java基础教程

    动力节点的《Java基础301集》是一个全面且深入的Java入门教程,涵盖了从基础知识到高级特性的广泛内容,适合Java初学者和寻求巩固基础的学习者。以下是一些主要的知识点详解: 1. **数据结构**:教程详细介绍了单项...

    动力节点Java基础301集

    根据提供的文件信息,“动力节点Java基础301集”是一套非常全面的Java基础教程,旨在为初学者和有一定基础的学习者提供系统化的学习资源。接下来,我们将从以下几个方面对这套教程涉及的重要知识点进行总结: ### ...

    Java多线程学习-动力节点共22页.pdf.zip

    本资料“Java多线程学习-动力节点共22页.pdf.zip”提供了对Java多线程的深入学习,旨在帮助开发者掌握这一关键技术。 1. **线程基础**:Java中的线程是通过`Thread`类或者实现`Runnable`接口来创建的。通过继承`...

    动力节点JavaSE培训日志

    11. **多线程**:理解并发编程,创建线程的方式(Thread类和Runnable接口),线程同步(synchronized关键字、wait()、notify()、notifyAll())和死锁。 12. **枚举与注解**:枚举类型提供了一种安全的常量表示方式...

    java基础(多线程,IO,集合,网络编程,泛型)

    - **创建线程**:Java提供两种方式创建线程,通过实现Runnable接口或继承Thread类。 - **线程状态**:线程有新建、就绪、运行、阻塞和死亡五种状态。 - **线程同步**:包括synchronized关键字、wait()、notify()...

    并行计算机架构与编程上机实验程序MPI.zip

    并行计算机架构与编程是计算机科学中的一个重要领域,它涉及到如何利用多个处理器或者计算节点同时处理任务,以提高计算效率。MPI(Message Passing Interface)是一种广泛使用的并行编程模型,尤其在高性能计算和大...

    java面试题集合

    - `LinkedList`使用双向链表实现,其特点是按索引访问相对较慢,但插入和删除操作非常高效,因为只需要修改相邻节点的引用。它也是非线程安全的,适合于需要频繁插入或删除元素的场景。 多线程在Java中主要有两种...

    二叉树模版

    二叉树是由n(n≥0)个有限节点组成的一个有穷集合,这些节点按照特定的规则进行排列,每个节点最多有两个子节点,分别称为左子节点和右子节点。二叉树常用于表示层次关系、搜索、排序等问题。 1. **普通二叉树...

    数据结构--二叉树--线索化二叉树

    二叉树是由n(n≥0)个有限节点组成一个具有层次关系的集合,通常表现为一个有根的层次结构。每个节点最多有两个子节点,分别称为左子节点和右子节点。二叉树的特性使其在解决某些问题时展现出较高的效率。 本文...

    数据结构课件:第4章 树1.ppt

    其他节点分为若干个不相交的集合,每个集合都是以根节点为起点的子树。在树中,节点之间的关系是有序的,即一个节点可以有多个子节点,但只有一个父节点(除了根节点)。节点的子节点数量称为节点的度。 2. **...

    P2P_vb.rar_VBP_p2p Visual Basic_p2p vb_vb p2p

    而"P2P代码"很可能就是实际的P2P网络程序的源代码文件,可能是多个VB文件的集合,包含了主程序、网络通信逻辑、数据处理等功能模块。 深入研究这个P2P项目,我们可以学习到以下知识点: 1. **P2P网络架构**:了解...

    清华殷人昆数据结构笔记(c++)6

    线索化二叉树是一种增强的二叉树,通过在节点中添加线索(thread)来指示前驱和后继节点,使得在非递归情况下也能进行遍历,提高查找效率。 ### 堆 堆是一种特殊的树形数据结构,通常为完全二叉树。分为大顶堆和小...

    CAJ1939.rar_1939常用ID_CAN_CAN PGN_J1939 PGN_J1939 collection

    标题中的"CAJ1939.rar_1939常用ID_CAN_CAN PGN_J1939 PGN_J1939 collection"表明这是一个关于J1939协议的资源集合,包含了一些常用的ID和PGN(Parameter Group Number)信息。 J1939协议的核心是PGN,它是数据帧的...

    env2_cpp.rar_berkley

    8. **common.h**:公共函数和常量的集合,可能包含了跨文件使用的通用代码。 通过分析这些文件,我们可以预期这个项目会涵盖Berkley DB的基本使用,如数据库的创建和操作,事务处理,以及复杂的特性,如多点数据...

    node-threadpool:使用工作线程的节点线程池

    使用节点的新worker_thread API,池中的线程可以相互传递消息,以及对共享内存进行读写。 用法 完整的API文档可以在这里找到: : 。 如果您熟悉Java的线程池API,则应该非常熟悉: import { Executors } from ...

    JAVA面试题和面试技巧集合

    理解流的方向(输入/输出)、层次(节点流/处理流)以及NIO(非阻塞I/O)框架。 8. **反射与动态代理**:反射允许在运行时检查和操作类、接口、字段和方法。动态代理则可以在运行时创建代理类,用于实现AOP(面向切...

    Java面试题,集合多种面试,跟Java有关的面试

    5. **多线程**:掌握线程的创建方式(通过Thread类和实现Runnable接口),线程同步机制(synchronized,wait(),notify(),notifyAll()),死锁的概念和避免方法,以及线程池的使用(ExecutorService,...

    GenerateIdBatch

    我们希望能够将总订单应用于跨节点创建的实体以供以后审计。 用法: 根据您对批处理大小、线程等的要求设置 config.properties 文件。运行 GenerateIds 将创建 ConsumerThread 的实例,或者在外部编写您自己的...

    Java_mianshi_timu.rar_java 试题_java 面试_应聘

    4. **多线程**:线程的创建方式(Thread类和Runnable接口),线程同步机制(synchronized关键字、wait/notify、Lock接口),死锁的概念及避免策略。 5. **异常处理**:异常的分类,try-catch-finally语句块,自定义...

Global site tag (gtag.js) - Google Analytics