`

试用CountDownLatch的副成品,多线程群发邮件小程序

 
阅读更多
简要说明
1、基于javamail所做的批量发邮件小程序,SingleSender是普通发送,ThreadSender是多线程发送,MysqlSender是从数据库获取数据再调用ThreadSender来发送。
2、为啥写这个小程序呢?没啥特别的,只因最近想起thinking in java里面介绍的CountDownLatch类,为了练手多线程相关的技术而写了这个小程序,整个程序于我而言核心在ThreadSender里面的CountDownLatch。
3、jar依赖情况
1) 必须:activation.jar, mail.jar, commons-logging.jar, log4j-xx.jar, zTool-xx.jar(我写的另外一些小工具类,源码在 https://z-tool.googlecode.com/svn/trunk/zTool)
2) 可能需要:mysql-connector-java-xx.jar,velocity-xx-dep.jar
(注:上面版本号用xx代替了,依据实际情况找个合适的版本即可)

下面贴出SingleSender和ThreadSender的源码。详细源码在 https://github.com/auzll/zBatchSender


public final class SingleSender extends AbstractSender {
	private static final Log LOG = LogFactory.getLog(SingleSender.class);
	
	/**
	 * 发送一批邮件
	 * @param entities 邮件实体列表
	 */
	public final List<MailEntity> send(List<MailEntity> entities) {
		int entitiesSize = null != entities ? entities.size() : 0;
		if (LOG.isInfoEnabled()) {
			LOG.info("method:send,entitiesSize:" + entitiesSize
					+ ",senderParam:[" + this.toSimpleLog() + "]"
					+ ",thread:" + Thread.currentThread().toString());
		}
		
		if (entitiesSize < 1) {
			return entities;
		}
		
		/** TODO Test code begin */
		// 模拟测试一下好了,不做实际发信
		for (MailEntity entity : entities) {
			try {
				TimeUnit.SECONDS.sleep(new Random().nextInt(3));
			} catch (InterruptedException e) {
			}
			
			if (LOG.isInfoEnabled()) {
				LOG.info("method:send,result:success,entity:[" 
						+ entity.toSimpleLog() + "],thread:" 
						+ Thread.currentThread().toString());
			}
		}
		if (true) return entities;
		
		/** TODO Test code end */
		
		Transport transport = null;
		Session session = null;
		int curTimes = 0;
		Properties sessionProperties = newProperties();
		for (MailEntity entity : entities) {
			try {
				if (null == transport || !transport.isConnected() || curTimes == transportUsingTimes) {
					
					if (null != transport && transport.isConnected()) {
						transport.close();
					}
					
					session = Session.getInstance(sessionProperties);
					transport = session.getTransport(new URLName("smtp",
							smtpHost, smtpPort, null, 
							from.getAddress(), password)); // 获取新的transport
					curTimes = 0;
					transport.connect(); 
					
					if (LOG.isDebugEnabled()) {
						LOG.debug("method:send,desc:transport connect,thread:" 
								+ Thread.currentThread().toString());
					}
				}
				
				MimeMessage message = new MimeMessage(session);
				
				if (null != from) {
					message.setFrom(from);
				}
				
				if (null != entity.getTo()) {
					message.addRecipient(RecipientType.TO, entity.getTo());
				}
				
				if (null != entity.getCcTo()) {
					message.addRecipient(RecipientType.CC, entity.getCcTo());
				}
				
				if (null != entity.getBccTo()) {
					message.addRecipient(RecipientType.BCC, entity.getBccTo());
				}

				if (null != subject) {
					message.setSubject(subject);
				}
				
				if (null != entity.getContent()) {
					message.setText(entity.getContent(), charset);
				} else if (null != content) {
					message.setText(content, charset);
				} 
				
				// 发送邮件
				transport.sendMessage(message, message.getAllRecipients());
				
				// 成功发送
				entity.setSuccess(true);
				
				if (LOG.isInfoEnabled()) {
					LOG.info("method:send,result:success,entity:[" 
							+ entity.toSimpleLog() + "],thread:" 
							+ Thread.currentThread().toString());
				}
				
			} catch (Exception e) {
				// 发送失败
				entity.setSuccess(false);
				
				if (LOG.isDebugEnabled()) {
					LOG.debug("method:send,result:fail,entity:[" + entity.toSimpleLog()
							+ "],thread:" + Thread.currentThread().toString(), e);
				} else {
					LOG.info("method:send,result:fail,entity:[" + entity.toSimpleLog() 
							+ "],thread:" + Thread.currentThread().toString() 
							+ ",e:" + e.getMessage());
				}
				
			} finally {
				curTimes++;
			}
		}
		
		if (null != transport && transport.isConnected()) {
			try {
				transport.close();
			} catch (Exception e) {
				LOG.info("method:send,desc:close transport,thread:" 
						+ Thread.currentThread().toString() , e);
			}
		}
		
		return entities;
		
	}
	
	/**
	 * 发送一封邮件
	 * @param entity 邮件实体
	 */
	public final MailEntity send(MailEntity entity) {
		List<MailEntity> entities = new ArrayList<MailEntity>();
		entities.add(entity);
		send(entities);
		return entity;
	}
	
	private Properties newProperties() {
		String address = from.getAddress();
		String host = address;
		
		int atIndex = address.indexOf('@');
		if (-1 != atIndex) {
			host = address.substring(atIndex + 1);
		}
		
		Properties props = new Properties();
		props.put("mail.smtp.localhost", host);
		props.put("mail.from", address);
		
		props.put("mail.debug", mailDebug);
		
		if (null != password) {
			props.put("mail.smtp.auth", true);
		}
		
		return props;
	}
}


public final class ThreadSender extends AbstractSender {
	private static final Log LOG = LogFactory.getLog(ThreadSender.class);
	
	private class Worker implements Runnable {
		private CountDownLatch latch;
		private List<MailEntity> entities;
		
		public Worker(CountDownLatch latch, List<MailEntity> entities) {
			this.latch = latch;
			this.entities = entities;
		}
		
		public void run() {
			new SingleSender()
				.charset(charset)
				.smtpHost(smtpHost)
				.smtpPort(smtpPort)
				.transportUsingTimes(transportUsingTimes)
				.from(from)
				.password(password)
				.subject(subject)
				.content(content)
				.send(entities);
			
			if (LOG.isDebugEnabled()) {
				LOG.debug("method:Worker$run,desc:latch count down,thread:" 
						+ Thread.currentThread().toString());
			}
			
			latch.countDown();
		}
		
	}
	
	/** 默认的最大线程数量:Runtime.getRuntime().availableProcessors() * 2 */
	public static final int DEFAULT_THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2;
	
	/** 每个线程单次任务默认的发送邮件量 */
	public static final int DEFAULT_TASK_OF_EACH_THREAD = 10;
	
	/** 线程数量 */
	private int threadSize = DEFAULT_THREAD_SIZE;
	
	/** 每个线程单次任务最大发送邮件量 */
	private int taskOfEachThread = DEFAULT_TASK_OF_EACH_THREAD;
	
	private ExecutorService executorService;
	
	/** 是否在发送邮件后关闭executorService,若executorService由外界传入就不关闭,否则关闭 */
	private boolean shutdownExecutor = false;
	
	public ThreadSender() {
		this(Executors.newCachedThreadPool());
		this.shutdownExecutor = true;
	}

	public ThreadSender(ExecutorService executorService) {
		this.executorService = executorService;
	}

	/**
	 * 发送一批邮件
	 * @param entities 邮件实体列表
	 */
	public final List<MailEntity> send(List<MailEntity> entities) {
		int entitiesSize = null != entities ? entities.size() : 0;
		if (LOG.isInfoEnabled()) {
			LOG.info("method:send,entitiesSize:" + entitiesSize
					+ ",thread:" + Thread.currentThread().toString());
		}
		
		if (entitiesSize < 1) {
			return entities;
		}
		
		int maxTask = threadSize * taskOfEachThread;
		if (entities.size() > maxTask) {
			throw new BatchSendException("Too much task, max is " + maxTask 
					+ ", current is " + entities.size());
		}
		
		int i = 0, toIndex = 0, len = entities.size();
		int count = len / taskOfEachThread;
		if (len % taskOfEachThread > 0) {
			count++;
		}
		CountDownLatch latch = new CountDownLatch(count);
		int realCount = 0;
		while (i < len) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("method:send,desc:split task,taskIndex:" + realCount);
			}
			toIndex = i + taskOfEachThread;
			if (toIndex > len) {
				toIndex = len;
			}
			executorService.execute(new Worker(latch, entities.subList(i, toIndex)));
			i = toIndex;
			
			realCount++;
		}
		
		if (count != realCount) {
			throw new BatchSendException("Unexpected error[count != realCount], count is " 
					+ count + ", realCount is " + realCount);
		}
		
		try {
			if (LOG.isDebugEnabled()) {
				LOG.debug("method:send,desc:begin await,thread:" 
						+ Thread.currentThread().toString());
			}
			latch.await();
			if (LOG.isDebugEnabled()) {
				LOG.debug("method:send,desc:finish await,thread:" 
						+ Thread.currentThread().toString());
			}
		} catch (InterruptedException e) {
			throw new BatchSendException(e);
		}
		
		if (shutdownExecutor) {
			if (LOG.isDebugEnabled()) {
				LOG.debug("method:send,desc:try shutdown executorService,thread:" 
						+ Thread.currentThread().toString());
			}
			executorService.shutdownNow();
		}
		
		return entities;
	}
	
	public ThreadSender taskOfEachThread(int taskOfEachThread) {
		this.taskOfEachThread = taskOfEachThread;
		return this;
	}
	
	public ThreadSender threadSize(int threadSize) {
		this.threadSize = threadSize;
		return this;
	}
}
分享到:
评论

相关推荐

    mybaits 多线程 实现数据批量插入 (运用CountDownLatch实现闭锁)

    本文将详细介绍如何利用MyBatis结合多线程和CountDownLatch闭锁来实现数据的批量插入。 首先,我们来看`mybatis批处理`。MyBatis的批处理功能允许我们在一次数据库连接中执行多条SQL语句,从而减少了数据库连接的...

    多线程countDownLatch方法介绍

    在Java多线程编程中,CountDownLatch是一个非常重要的同步工具类,它可以帮助我们协调多个线程之间的交互。本文将深入探讨CountDownLatch的工作原理、使用场景以及相关源码分析。 CountDownLatch是一个计数器,初始...

    如何使用CountDownLatch同步java多线程

    如何使用CountDownLatch同步java多线程 CountDownLatch 是 Java 并发编程中的一种常用工具,用于控制多个线程的并发执行。它可以实现多个线程之间的同步操作,使得线程之间可以协调工作,避免了线程之间的互相干扰...

    利用 CountDownLatch 类实现线程同步

    `CountDownLatch` 是一个计数器,可以用于协调多个线程间的活动,等待所有线程完成各自的任务后,主线程或其他线程才能继续执行。 首先,让我们深入理解 `CountDownLatch` 的工作原理。`CountDownLatch` 在构造时会...

    java使用CountDownLatch等待多线程全部执行完成

    Java 使用 CountDownLatch 等待多线程全部执行完成 CountDownLatch 是 Java 中的...CountDownLatch 是 Java 中一个非常强大且灵活的同步工具类,使用它可以轻松地实现多线程的同步操作,提高程序的执行效率和可靠性。

    多线程下载器用java写的小程序

    这个小程序的主要目的是通过分割大文件并同时从服务器下载多个部分,来提高文件下载速度。下面将详细介绍多线程下载器的设计和实现原理,以及如何利用Java进行编程。 首先,我们需要理解多线程的概念。在Java中,...

    JAVA多线程CountDownLatch使用详解

    JAVA多线程CountDownLatch使用详解 JAVA多线程CountDownLatch是JAVA多线程编程中的一种同步工具,主要用来让某个线程等待其他线程执行完毕后再继续执行。下面我们将详细介绍JAVA多线程CountDownLatch的使用和原理。...

    多线程终极案例程序(多线程进阶)

    这个“多线程终极案例程序”旨在帮助Java开发者深化对多线程的理解,通过一个具体的“多兵种联合攻击防御塔”游戏案例来实践。下面将详细解释该案例中的关键知识点。 首先,多线程是并发执行多个任务的能力,可以...

    多线程并行执行,汇总结果

    "CountDownLatch" 和 "Thread" 是Java编程语言中实现多线程并行执行的关键工具,它们有助于提高程序的运行效率和响应速度。 **多线程并行执行** 多线程并行执行是指在同一个程序中同时运行多个线程,每个线程负责...

    java小程序中动画及音乐多线程的应用源码下载

    - 使用`Platform.runLater()`方法,可以确保在JavaFX的应用程序线程(也称为事件调度线程)中执行UI相关的更新,防止出现线程安全问题。 2. **音乐播放多线程**: - Java提供了`javax.sound.sampled`包来处理音频...

    多线程demo程序-轻松掌握多线程技术

    在编程领域,多线程是实现并发执行任务的关键技术,特别是在服务器端开发和高并发应用中,多线程能够充分利用CPU资源,提高程序的运行效率。这个名为"多线程demo程序-轻松掌握多线程技术"的项目,旨在帮助开发者理解...

    java多线程文件传输

    - **CountDownLatch**:允许一个线程等待其他线程完成操作,常用于多线程并发测试。 - **CyclicBarrier**:允许多个线程等待直到达到某个屏障点再一起继续执行。 - **Semaphore**:信号量,控制同时访问特定资源...

    多线程执行完后主程序再执行(包括需要子线程返回结果)

    标题提到的“多线程执行完后主程序再执行(包括需要子线程返回结果)”是一个典型的多线程同步问题。在这个场景中,主程序会启动多个子线程去执行不同的任务,然后等待所有子线程执行完毕,最后处理子线程返回的结果...

    JAVAJAVA多线程教学演示系统论文

    1. **多线程基础**:论文可能会首先介绍多线程的基本概念,解释为什么在JAVA中需要使用多线程,以及多线程如何提升程序的执行效率。这部分内容可能会涉及到线程的创建、启动、同步和通信等基础知识。 2. **JAVA多...

    Java中CountDownLatch进行多线程同步详解及实例代码

    Java中CountDownLatch进行多线程同步详解及实例代码 CountDownLatch是Java中的一种多线程同步...CountDownLatch是一种非常有用的多线程同步辅助类,通过它可以轻松地同步多个任务的执行,从而提高程序的效率和可靠性。

    Java多线程详解(超详细)_狂神说笔记完整版_项目代码_适合小白随课程学习

    - `CyclicBarrier`和`CountDownLatch`:用于线程同步,协调多个线程同时开始或结束操作。 了解并熟练掌握这些知识点,你就能在实际开发中灵活运用Java的多线程特性,编写出高效、稳定的并发程序。通过实践和不断...

    多线程demo/java多线程练习

    在Java编程中,多线程是一项关键技能,它能让程序同时执行多个任务,提升系统效率。本项目"多线程demo/java多线程练习"旨在通过实际操作来深入理解和掌握多线程技术,同时模拟数据库操作,这在现代应用程序开发中至...

    汪文君JAVA多线程编程实战(完整不加密)

    Java多线程允许程序同时执行多个独立的代码段,这在处理大数据、网络通信、用户界面更新等场景中尤其有用。书中详细介绍了Java多线程的核心概念,如线程的创建、启动、同步、协作以及生命周期管理。读者将学习如何...

    多线程程序小例子实现

    在编程领域,多线程是实现并发执行任务的重要机制,特别是在现代计算机系统中,它能够充分利用多核处理器的计算能力,提高程序的运行效率。本文将深入探讨如何在实际编程中实现多线程,以及如何管理和监控线程状态,...

    Java_多线程与并发编程总结.doc

    Java多线程与并发编程是Java开发中至关重要的一部分,它涉及到如何高效地利用CPU资源,以实现程序的并行执行。在操作系统层面,多任务和多进程是通过分配不同的内存空间来实现的,而线程则共享同一进程的内存,这...

Global site tag (gtag.js) - Google Analytics