`

多线程合并查询

阅读更多
package com.anyec.zk;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONObject;

public class AsyncQueFutherTask {
	Logger logger=LoggerFactory.getLogger(AsyncQueFutherTask.class);
	Random r = new Random();
	private static BlockingQueue<Long> bq = new LinkedBlockingQueue<Long>(20);
//	private final static ThreadPoolExecutor ses = new ThreadPoolExecutor(20, 40, 1L, TimeUnit.MINUTES,
//			new ArrayBlockingQueue<Runnable>(100), new ThreadPoolExecutor.DiscardPolicy());
//	ExecutorService ses =new ThreadPoolExecutor(20, 40, 1L, TimeUnit.MINUTES,
//			new ArrayBlockingQueue<Runnable>(1), new ThreadPoolExecutor.DiscardOldestPolicy());
	ExecutorService ses=Executors.newCachedThreadPool();
//	ExecutorService ses=Executors.newFixedThreadPool(20);
//	ExecutorService ses=new ThreadPoolExecutor(0, 200,
//            60L, TimeUnit.SECONDS,
//            new SynchronousQueue<Runnable>());
	static ConcurrentMap<Long, FutureTask2<JSONObject>> orderFutureMap = new ConcurrentHashMap<Long, FutureTask2<JSONObject>>(
			64, 0.75f, 1);
	Timer timer = new Timer();
	static int count = 0;

	public void init() {
		timer.schedule(new TimerTask() {
			@Override
			public void run() {
				count++;
				// logger.info("第"+count+"次开始执行");
				Set<Long> orderIds = new HashSet<>();
//				Iterator<Long> it = bq.iterator();
//				while (!bq.isEmpty()) {
//					orderIds.add(bq.poll());
//				}
				bq.drainTo(orderIds,100);
				Map<Long, JSONObject> rmap = mapByIds(orderIds);
				for (Long orderId : orderIds) {
					FutureTask2<JSONObject> f = orderFutureMap.get(orderId);
					JSONObject jo = rmap.get(orderId);
					if (f != null) {
						f.set(jo);
					} else {
						logger.info(orderId + " FutureTask2 is null");
					}
				}
				rmap.clear();
				rmap = null;
				// logger.info("第"+count+"次执行结束"+orderIds.size());
			}
		}, 10 * 1000, 100);
	}

	public Map<Long, JSONObject> mapByIds(Set<Long> orderIds) {
		Map<Long, JSONObject> map = new HashMap<>();

		for (Long orderId : orderIds) {
			JSONObject jo = new JSONObject();
			jo.put("orderId", orderId);
			jo.put("orderNum", r.nextInt(200));
			map.put(orderId, jo);
		}
		return map;
	}

	public JSONObject queryOrder(Long orderId) {
		FutureTask2<JSONObject> ft = new FutureTask2<JSONObject>(new Callable<JSONObject>() {
			@Override
			public JSONObject call() throws Exception {
				JSONObject jo = orderFutureMap.get(orderId).get();
				orderFutureMap.remove(orderId);
				return jo;
			}
		});
		try {
			orderFutureMap.put(orderId, ft);
			bq.put(orderId);
			
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		ft.setOrderId(orderId);
		Future<JSONObject> f = (Future<JSONObject>) ses.submit(ft);
		JSONObject jo = null;
		try {
			jo = ft.get();
			orderFutureMap.remove(orderId);
		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();

		}
		if (jo == null) {
			jo = new JSONObject();
		}
		logger.info(orderId + " 执行结果 " + jo.toJSONString());
		return jo;
	}

	public static void main(String[] args) {
		int i = 1;
		AsyncQueFutherTask task = new AsyncQueFutherTask();
		task.init();
		try {
			Thread.sleep(10 * 1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		while (i < 100000) {
			int j = ++i;
			new Thread(new Runnable() {
				@Override
				public void run() {
					task.queryOrder(new Long(j));
				}
			}).start();
		}
	}
}

class FutureTask2<T> extends FutureTask<T> {
	private Long orderId;

	public FutureTask2(Callable<T> callable) {
		super(callable);
	}

	public FutureTask2(Runnable runnable, T result) {
		super(runnable, result);
	}

	public Long getOrderId() {
		return orderId;
	}

	public void setOrderId(Long orderId) {
		this.orderId = orderId;
	}

	public void set(T v) {
		super.set(v);
	}
}

class Param {

}

 

 

分享到:
评论

相关推荐

    java多线程查询数据库

    在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用Java的多线程技术和线程池来实现并发查询数据库,以及相关的文件`BatchDataUtil....

    大数据量多线程执行分页查询

    多线程技术与分页查询相结合,可以有效地解决这一问题,提高系统性能并优化用户体验。以下是对标题和描述中涉及知识点的详细解释: 1. **大数据量处理**:当数据库中的数据达到百万甚至亿级时,单线程查询可能导致...

    (orc + snappy / zlib ) 多线程并行合并小文件工具类 (出自:flink自定义合并orc小文件处)

    为了解决这个问题,我们可以采用各种合并策略,其中一种是通过多线程并行合并小文件。本项目提供的工具类就基于orc格式,利用了snappy或zlib压缩,用于在Flink中自定义合并orc小文件。 orc是一种高效的列式存储格式...

    FileSplitter4Linux 多线程文件分割合并器

    FileSplitter4Linux_多线程文件分割合并器.7z =========================================== 多线程文件分割、合并器 v2.0.1 for Windows / Linux

    Powerbuilder中实现多线程同步查询数据 源程序

    4. **结果合并**:所有线程完成后,主线程收集并合并查询结果,呈现给用户。 **六、注意事项** 在实现多线程查询时,开发者需要注意以下几点: - **异常处理**:确保每个线程都有适当的错误处理机制,以防数据库...

    java并发(二十四)多线程结果组装

    在Java编程中,多线程是并发处理任务的关键技术,特别是在高性能、高并发的应用场景下。本篇将探讨“多线程结果组装”的主题,它涉及到如何在多个并发执行的任务完成后,有效地收集并整合这些任务的结果。这个过程...

    多线程并行执行,汇总结果

    在IT行业中,多线程并行执行是一种常见的优化策略,特别是在处理大数据量或者需要高性能计算的任务时。"CountDownLatch" 和 "Thread" 是Java编程语言中实现多线程并行执行的关键工具,它们有助于提高程序的运行效率...

    C#开发的多线程文件分割、合并工具

    本项目涉及的关键技术是“多线程文件分割与合并”,这在处理大量数据和大文件时非常实用。下面将详细介绍这些概念及其在C#中的实现。 首先,我们来看“文件分割”。文件分割是指将一个大文件分成多个小文件,以便于...

    python 实现多线程下载m3u8格式视频并使用fmmpeg合并

    多线程下载m3u8格式的视频能显著提高下载速度,特别是在网络条件不稳定或者视频文件较大的情况下。 本实例主要分为以下几个步骤: 1. **预下载**:首先,我们需要获取m3u8文件。这通常通过发送HTTP请求到m3u8链接...

    四线程合并同步下载程序_visualbasic_下载_多线程下载_多线程_

    本文将深入探讨Visual Basic(VB6)如何实现四线程合并同步下载程序,以及它在多线程下载中的应用。 首先,我们要理解什么是多线程。线程是操作系统分配CPU时间的基本单元,一个进程可以包含一个或多个线程。在单...

    poi多线程大数据导出excel文件.zip

    本项目“poi多线程大数据导出excel文件”提供了一个解决方案,利用多线程来提高Excel的大数据导出效率。 Apache POI 3.1版本是较早的版本,而项目中使用了更新的4.1版本,这意味着它可能利用了更多优化和新特性。在...

    多线程拆分,合并文件

    NULL 博文链接:https://wentise.iteye.com/blog/1472493

    四线程合并同步下载程序_visualbasic_下载_多线程下载_多线程_源码.zip

    这是一个基于Visual Basic开发的四线程合并同步下载程序的源码资源。Visual Basic是一种由Microsoft开发的面向对象的编程环境,常用于创建Windows应用程序。在这个项目中,开发者利用多线程技术来提高文件下载的效率...

    C#多线程排序例子

    在IT行业中,多线程是一种常见的编程技术,尤其在C#这样的高级编程语言中,它被广泛用于提高程序的执行效率和并发性。本示例“C#多线程排序例子”聚焦于如何利用多线程来提升排序操作的速度。 首先,让我们了解什么...

    java多线程导出excel(千万级别)优化

    Java多线程导出Excel是处理大数据量时的一种高效策略,尤其在面对千万级别的数据时。传统的Apache POI库在处理大规模数据时可能会遇到栈溢出(StackOverflowError)和内存溢出(OutOfMemoryError)等问题,因为这些...

    C# Winform 多线程下载

    在C# Winform应用中实现多线程下载是一项常见的任务,尤其在处理大文件或需要提高下载速度的情况下。本文将详细讲解如何利用C#的多线程技术来创建一个Winform应用程序,实现高效的文件下载功能。 首先,我们需要...

    多线程多任务下载软件.zip易语言项目例子源码下载

    在这个“多线程多任务下载软件.zip”压缩包中,包含的是一个易语言项目的源码,可以作为学习和参考的实例。下面将详细解释多线程和多任务下载的概念,以及如何在易语言中实现这些功能。 1. **多线程技术**:在...

    yolov5目标检测多线程C++部署

    ** yolov5目标检测多线程C++部署详解** 在计算机视觉领域,目标检测是一项关键技术,用于识别图像或视频帧中的特定对象。YOLO(You Only Look Once)是一种高效的目标检测算法,以其实时性能和相对较高的准确性而...

    lucene索引优化多线程多目录创建索引

    本教程主要探讨的是如何利用Lucene进行索引优化,特别是通过多线程和处理多个目录来提高索引创建效率。 首先,我们需要理解Lucene的索引原理。Lucene将文档分解为词项(tokens),并对每个词项创建倒排索引。倒排...

Global site tag (gtag.js) - Google Analytics