`
zhangzhenjj
  • 浏览: 27859 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

求优化-俩线程,一个读,一个写

阅读更多

 

 有锁实现方案:

package com.boco.sfmhandler.bolts.sender.sort;

import java.io.Serializable;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import com.boco.sfmhandler.model.message.SMessage;

/**
 * 只能单线程调用next
 * 
 * @author boy
 */
public class WaitSortBlockingQueue implements SortQueue<SMessage> {
	private static final long serialVersionUID = -4793564853425866287L;

	private WaitSortBlockingQueue() {
		super();
	}

	private final ReentrantLock lock = new ReentrantLock(false);
	private final Condition empty = lock.newCondition();
	private final Condition full = lock.newCondition();

	private final AtomicLong atomicLong = new AtomicLong(0); // 初始时拿不到最小的sm的id,所以让其超时出去,并获取该id,以后进行累加
	private final TreeMap<Long, SMessage> map = new TreeMap<Long, SMessage>();
	private final int maxBlockSize = 10000;
	private final TimeUnit delayNanoUnit = TimeUnit.NANOSECONDS;
	private final long delayNanoTime = delayNanoUnit.convert(1500L,
			TimeUnit.MILLISECONDS); // 1.5s
	/**
	 * 线程私有
	 */
	private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return System.nanoTime();
		}
	};

	public SMessage next() {
		long curNanoTime = System.nanoTime();
		final ReentrantLock lock = this.lock;
		SMessage sm = null;
		try {
			lock.lockInterruptibly();
			emptyAwait();
			/**
			 * map 中保存的最小的sm的key
			 */
			long firstKey;
			while ((firstKey = map.firstKey()) != atomicLong.get()
					&& ((curNanoTime = System.nanoTime()) - preOutputNanoTime
							.get()) < delayNanoTime) {
				/**
				 * 如果firstkey < 当前累计位置,进行错误备份 这里对等待的时间没做任何修改
				 * 只当该sm是个过客,不影响其它sm的等待时间
				 */
				if (firstKey < atomicLong.get()) {
					sm = map.remove(firstKey);
					full.signal();
					failBackUp(sm);
					emptyAwait();
					continue;
				}
				long awitTime = delayNanoTime
						- (curNanoTime - preOutputNanoTime.get());
				empty.await(awitTime, delayNanoUnit);
			}
			sm = map.remove(firstKey);
			full.signal();
			atomicLong.set(firstKey + 1);
			setPreOutputNanoTime(curNanoTime);
		} catch (InterruptedException e) {
			empty.signal();
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
		return sm;
	}

	/**
	 * 如果是空,直接等待,put时会signal
	 * 
	 * @throws InterruptedException
	 */
	private void emptyAwait() throws InterruptedException {
		while (map.isEmpty()) {
			empty.await();
		}
	}

	/**
	 * 错误sm备份
	 * 
	 * @param sm
	 */
	private void failBackUp(SMessage sm) {

	}

	private void setPreOutputNanoTime(long nanoTime) {
		preOutputNanoTime.set(nanoTime);
	}

	@Override
	public SMessage next(long timeout, TimeUnit unit) {
		return null;
	}

	@Override
	public void put(long key, SMessage sm) {
		final ReentrantLock lock = this.lock;
		try {
			lock.lockInterruptibly();
			while (map.size() >= maxBlockSize)
				full.await();
			map.put(key, sm);
			empty.signal(); // 唤醒在next上等待的线程
		} catch (InterruptedException e) {
			full.signal();
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}

	@Override
	public void put(SMessage sm) {
		put(sm.getSortId(), sm);
	}

	private static class proxy implements Serializable {
		private static final long serialVersionUID = -3999942172367616131L;
		private final static WaitSortBlockingQueue waitSortBlockingQueue = new WaitSortBlockingQueue();
	}

	public static WaitSortBlockingQueue newInstance() {
		return proxy.waitSortBlockingQueue;
	}
}

 无所实现方案:(这个跳跃表貌似频繁的插入、移除有点慢,有木有更好的非阻塞、排序链表实现?)

package com.boco.sfmhandler.bolts.sender.sort;

import java.util.Comparator;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.boco.sfmhandler.model.message.SMessage;

public class NoAwaitSortQueue implements SortQueue<SMessage> {
	private static final long serialVersionUID = -6596506016522782898L;
	private static final int maxSize = 10000;
	private static Logger logger = Logger.getLogger(NoAwaitSortQueue.class);

	/**
	 * 线程私有
	 */
	private ThreadLocal<Long> preOutputNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return System.nanoTime();
		}
	};
	private ThreadLocal<Long> index = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return 0L;
		}
	};
	private ThreadLocal<Long> delayNanoTime = new ThreadLocal<Long>() {
		@Override
		protected Long initialValue() {
			return TimeUnit.NANOSECONDS.convert(1500L, TimeUnit.MILLISECONDS); // 1.5s
		}
	};

	private final ConcurrentSkipListSet<SMessage> concurrentSkipListSet = new ConcurrentSkipListSet<SMessage>(
			new Comparator<SMessage>() {
				@Override
				public int compare(SMessage o1, SMessage o2) {
					return Long.signum(o1.getSortId() - o2.getSortId());
				}
			});

	@Override
	public void put(SMessage value) {
		while (concurrentSkipListSet.size() >= maxSize) {
			//nothing
		}
		concurrentSkipListSet.add(value);
	}

	@Override
	public void put(long key, SMessage value) {
		put(value);
	}

	@Override
	public SMessage next() {
		do {
			if (concurrentSkipListSet.isEmpty()) {
				continue;
			}
			SMessage sm = concurrentSkipListSet.first();
			long sortId = sm.getSortId();
			long curNanoTime = System.nanoTime();
			final int k = Long.signum(sortId - index.get());
			switch (k) {
			case 0:
				if (checkSm(sm, curNanoTime, sortId)) {
					return sm;
				}
				break;
			case -1:
				if (concurrentSkipListSet.remove(sm)) {
					failBackUp(sm);
				}
				break;
			case 1:
				if ((curNanoTime - preOutputNanoTime.get()) < delayNanoTime
						.get()) {
					continue;
				} else {
					if (checkSm(sm, curNanoTime, sortId)) {
						return sm;
					}
				}
			}
		} while (true);
	}

	private boolean checkSm(SMessage sm, long curNanoTime, long sortId) {
		boolean b = concurrentSkipListSet.remove(sm);
		if (b) {
			preOutputNanoTime.set(curNanoTime);
			index.set(sortId + 1);
		}
		return b;
	}

	private void failBackUp(SMessage smessage) {

	}

	@Override
	public SMessage next(long timeout, TimeUnit unit) {
		return null;
	}

	private static class proxy {
		private static NoAwaitSortQueue noAwaitSortQueue = new NoAwaitSortQueue();
	}

	public static NoAwaitSortQueue newInstance() {
		return proxy.noAwaitSortQueue;
	}

	public static void main(String[] args) {
		new Thread(new Runnable() {

			@Override
			public void run() {
				for (long i = 0; true; i++) {
					SMessage sm = new SMessage();
					sm.setSortId(i);
					newInstance().put(sm);
				}
			}
		}).start();

		new Thread(new Runnable() {

			@Override
			public void run() {
				while (true) {
					SMessage sm = newInstance().next();
					logger.info(sm.getSortId());
				}
			}
		}).start();
	}

}

 

3
3
分享到:
评论
1 楼 zhangzhenjj 2013-09-12  
先自己顶一个!!!!!        

相关推荐

    一个线程写,一个线程读的copy工具

    "一个线程写,一个线程读的copy工具"是一个巧妙的设计,它利用了多线程的优势来提高文件复制的效率,而不是采用传统的单线程复制方式。下面我们将深入探讨这个话题,了解其工作原理、优势以及如何实现。 首先,我们...

    ICP算法加速优化-多线程和GPU

    ICP算法加速优化--多线程和GPU 已成功编译配置: Windows10下环境配置:cmake3.23.3+VS2019+CUDA11.1+PCL1.12.1 对应博文https://blog.csdn.net/taifyang/article/details/128042532

    实现一个数据单元,包括学号和姓名两部分。编写两个线程,一个线程往数据单元中写,另一个线程往出读。要求每写一次就往出读一次。

    数据单元应保持一个全局状态,表示当前是否可读或可写: ```python from threading import Thread, Lock class DataUnit: def __init__(self): self.info = StudentInfo() self.read_lock = Lock() self.write...

    RT-Thread线程-创建线程

    在RT-Thread中,创建线程意味着定义一个任务,分配一定的内存空间和优先级,使得这个任务能够在特定的时间片内执行。 STM32H750VBT6是STMicroelectronics推出的一款高性能MCU,拥有强大的Cortex-M7核心,适用于需要...

    操作系统-创建多线程-读者写者

    例如,可以使用一个互斥锁来保护对共享资源的访问,确保任何时候只有一个写操作正在进行,同时可以有多个读操作发生。 #### 小结 本文通过对一个简单的多线程程序的分析,介绍了如何在Windows环境中使用`...

    大恒-双相机开发-C#-多线程-项目开源

    本项目"大恒-双相机开发-C#-多线程"正是这样的一个实例,它利用C#语言进行编程,实现了对两台大恒相机的同时控制和数据采集,同时运用了多线程技术,极大地提高了系统的响应速度和处理能力。下面我们将详细探讨该...

    人工智能-项目实践-多线程-tonado的multi-thread 多线程封装.zip

    人工智能-项目实践-多线程-tonado的multi-thread 多线程封装 Quick Start 1.在“biz”目录中创建一个py文件,文件名任意但最好不要跟第三方库冲突 2.使用 "Router.route" 装饰器注册函数到路由表中,仿造示例即可 ...

    计算机网络课程设计----多线程Web服务器

    在Web服务器中,每个线程可以独立地处理一个客户端的请求,这样就避免了单线程模型中因处理一个请求而阻塞其他请求的情况。多线程使得服务器能同时处理多个连接,提高了服务的响应速度和并发能力。 设计一个多线程...

    delphi 多线程读写测试

    - **读写锁**:在读写测试中,通常会使用读写锁(TReadWriteLock 或 TReaderWriterLock),它允许多个读线程同时访问资源,但仅允许一个写线程进行修改,确保数据的一致性。 3. **多线程读写测试**: - **读写...

    javaSocket的Tcp通信方式两个线程,一个线程接收数据一个线程发送数据

    在"javaSocket的Tcp通信方式两个线程,一个线程接收数据一个线程发送数据"这个场景下,我们将探讨如何通过多线程来优化TCP通信的效率和响应性。 首先,TCP(Transmission Control Protocol)是一种面向连接的、可靠...

    MFC多线程编程实例----多线程画线源码

    MFC是一个面向对象的C++库,它为Windows应用程序开发提供了一套丰富的类,包括对多线程的支持。 在标题中提到的“MFC多线程编程实例——多线程画线源码”,我们主要关注的是如何在同一个窗口或图形界面上,通过多个...

    一个多线程同步读写的小程序

    在这个“一个多线程同步读写的小程序”中,我们看到开发者尝试通过创建读线程和写线程来同时进行数据的读取和写入,以优化程序的执行流程。 首先,让我们深入理解多线程的概念。线程是操作系统分配处理器时间的基本...

    线程异步工作,当一个线程结束时异步通知另一线程

    在多线程编程中,线程间的协作是关键任务之一,尤其当需要一个线程在完成特定工作后通知另一个线程继续执行时。这个过程通常涉及到线程同步和异步的概念。本文将深入探讨线程异步工作以及如何在C++中实现一个线程在...

    C#多线程读写sqlite

    例如,使用`lock`语句可以创建一个临界区,只允许一个线程在任何时候进入该区域执行特定代码。 5. **性能测试**:为了评估多线程读写SQLite的性能,通常会进行计时测试。这可以通过`Stopwatch`类来实现,它可以精确...

    cpp-多线程版的Twemproxy

    **cpp-多线程版的Twemproxy** `Twemproxy`,又称为`nutcracker`,是由Twitter开发的一款轻量级的代理服务,...此外,对于想要深入理解C++多线程编程、分布式缓存架构以及系统优化的人来说,这是一个宝贵的实践案例。

    MFC进度条样例-多线程+定时器+模式对话框

    本示例项目“MFC进度条样例-多线程+定时器+模式对话框”展示了如何在多线程环境中使用MFC来创建一个模式对话框,实时更新进度条以反映后台操作的进度。 首先,我们要理解什么是模式对话框(Modal Dialog)。模式...

    用多线程实现串口读写数据以及文件的读写

    2. 使用CreateThread函数创建一个新的线程,该线程将负责串口的读写操作。 3. 在新线程中,使用CAsyncSocket的成员函数如Connect、Accept、Send和Receive进行串口通信。 4. 实现消息机制,比如使用OnReceive和...

    操作系统实验-----MFC线程--购票系统演示

    操作系统实验中的“MFC线程--购票系统演示”是一个典型的多线程编程示例,它主要涉及了C++编程语言中的Microsoft Foundation Classes (MFC)库,以及操作系统层面的线程管理和线程同步概念。MFC是微软提供的一套面向...

    行业分类-设备装置-创建线程化多媒体对话.zip

    在IT行业中,线程化多媒体对话是一个...总之,创建线程化多媒体对话是一个涉及多方面技术的复杂任务,包括线程管理、多媒体处理、同步通信和性能优化等。理解和掌握这些知识点对于开发高效、稳定的设备装置至关重要。

    Qthread-多线程航拍图像序列拼接

    例如,一个线程可以负责读取图像,另一个线程可以负责图像预处理,再有一个线程负责图像拼接,它们可以并行执行,互不干扰。 Qt是一个流行的跨平台应用程序开发框架,提供了丰富的API供开发者使用,其中包括对多...

Global site tag (gtag.js) - Google Analytics