【背景】由于公司业务的需要,需要开发一个消息转发服务器,总体需求大致为:要求每秒的接入能力不低于1000,需要做数据统计(重点),数据不能缓存,尽可能快的转发,延时尽可能的低,可以允许丢失一些消息。
按照设计规格,每秒产生的统计数据不低于1000,这些数据需要入库,提供给报表做分析。因此,对统计数据t_statistics这表来说,并发写的鸭梨很大。由于要求尽可能的低延时,因此,每一个服务接入线程单独的存储统计数据也不现实:使用数据库线程池或者单线程直接存储都会对延时(需要对请求端有应答)有较大影响。 大致的数据流图如下
数据库使用的是mysql5.5 最终部署OS为linux2.6,对于mysql,经过测试,单线程单表每秒写入1300是比较稳定的速率,因此,基于要高于设计规格的思想,接入速率也要高于1000。那么问题来了-----对于每秒产生1000条统计数据应该如何插入数据库呢?每个服务线程中单独插入肯定不现实,因为这要维持一个庞大的数据库线程池,或者排队,带来的是延迟。
既然接入速率和存储速率能够匹配,因此,换个角度考虑的话,这就是个很基本的 生产者---消费者模型,上文中已经提到,单个线程的写入能力已经能达到要求,因此,重点是协调 N---1 模型的并发竞争。
基于N---1 的生产者---消费者模型,或者消费者比较少的模型,一个很重要的点在于如何通知消费者消费数据。太频繁的通知将带来消费低效,毕竟线程切换是需要付出代价的,因此,批次的概念在这里被很好的运用了。
基于一个批次的存储,也就是当生产者发现一个批次的数据达到一定的阀值时候,通知消费者来消费当前这个批次,问题又来了:当消费者消费某个批次的数据时候,生产者最新生产的数据应该怎么办呢,如果消费者---生产者基于共享队列(链表)的话,将会频繁的上锁、解锁和通知,这对编程来讲也带来复杂度。
实际上,上文中已经提到,当一个批次满的时候,就应该通知消费者写数据库,那么在消费者消费数据的空档期,新产生的数据仍然应该能够快速的消费掉,至少不能让消费者多等,那么是否能再开辟一个批次呢?
OK,N---1 的消费者--生产者模型基本出来了,基于多个链表的缓冲区,消费者集中向一个链表中写数据,当当前链表数据满时,通知消费者消费,后面的消费者开始切换链表,使用新的空的链表来存储数据。
在个人开发的消息转发服务器中,使用了6个List,阀值配置的是1w,也是就是当某个List的size达到1w的时候,就开始切换,聪明的你也许很快就想到了,瓶颈会再次出现在 List 切换的时候,实际上这个可以不算是问题,因为使用了多个List,在特定的时候哪些List是可用的,哪些List是消费者应该消费的,这些需要建立索引, 建立索引的好处是公平、避免单个队列数据过大和通知遗漏。
基于以上一些分析,个人写了如下的一点代码,使用了泛型,可通用。
package com.rockton.gps.router.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
* @author taoyu
*
* @param <T>
*/
public class ChunkManager<T> implements Runnable {
Log log = LogFactory.getLog(ChunkManager.class);
int chunkNum;
int chunkSize;
List<Chunk<T>> chunkList;// 数据块链表
ChunkConsumer<T> consumer;// 实际的消费者
Object lock = new Object();// 数据块索引切换锁
volatile boolean isStop;
final Queue<Integer> fullIndex = new ConcurrentLinkedQueue<Integer>();
final Queue<Integer> emptyIndexs = new ConcurrentLinkedQueue<Integer>();
public ChunkManager(int chunkNum, int chunkSize) {
this.chunkNum = chunkNum;
this.chunkSize = chunkSize;
chunkList = new ArrayList<Chunk<T>>(this.chunkNum);
for (int i = 0; i < this.chunkNum; i++) {
Chunk<T> c = new Chunk<T>();
c.setLock(false);
chunkList.add(c);
emptyIndexs.add(Integer.valueOf(i));
}
}
public void setChunkConsumer(ChunkConsumer<T> consumer) {
this.consumer = consumer;
}
public void start() {
if (null == consumer) {
throw new IllegalStateException("ChunkConsumer could not be null");
}
Thread t = new Thread(this);
t.setDaemon(true);
t.setName("T_ChunkManager");
t.start();
}
public void add(T e) {
Integer index = null;
while ((index = emptyIndexs.peek()) == null)
;
chunkList.get(index).add(e);// 线存储再检查,允许小范围的写入并发
if (chunkList.get(index).size() >= chunkSize) {
// 这一步很重要,在临界状态下可以避免很多的锁定操作
if (fullIndex.contains(index)) {
return;
}
synchronized (lock) {
if (fullIndex.contains(index)) {
return;
}
emptyIndexs.poll();
fullIndex.offer(index);
lock.notify();
}
}
}
@Override
public void run() {
while (!isStop) {
try {
doConsume();
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
}
private void doConsume() throws Exception {
synchronized (lock) {
while (fullIndex.isEmpty()) {
lock.notify();
lock.wait();
if (isStop) {
return;
}
}
}
// 如下的代码应该在synchronized之外,很多人习惯于放在里面,这样会导致锁定时间过长
Integer index = null;
while ((index = fullIndex.peek()) != null) {
Chunk<T> c = chunkList.get(index);
log.info("Persistence the chunk , record is : " + c.size());
try {
consumer.consume(c.getList());
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
c.getList().clear();
fullIndex.poll();//
emptyIndexs.offer(index);
}
log.info("Persistence the chunk over");
}
}
public void exitAndFlush() {
synchronized (lock) {
isStop = Boolean.TRUE;
lock.notifyAll();
}
for (int i = 0; i < chunkList.size(); i++) {
List<T> datas = chunkList.get(i).getList();
synchronized (datas) {
consumer.consume(datas);
}
}
}
}
如下是spring的配置
<bean id="chunkManager" class="com.rockton.gps.router.concurrent.ChunkManager">
<constructor-arg index="0" value="6" />
<constructor-arg index="1" value="10000" />
</bean>
经过严密的测试,接入速率为 1.3K---1.4K 的时候,数据库操作比较稳定,如下图
上图是对转发数据的统计做的一个简单图表,实际上由于不能做到发送速率稳定,因此上图的数据会有一些波动,但是没有丢包。
也许,你会认为,如果请求数要高于1.4,那么数据库瓶颈就出来了呀,对,你可以考虑增加一个消费线程,但是这样做存储效率会提升但是不会翻番的。对于一个系统,是有最大负载的,都会有瓶颈的,超出部分,应该勇敢 say no.
欢迎拍砖!
- 大小: 7.4 KB
- 大小: 36.8 KB
分享到:
相关推荐
《关于Go性能优化的思考》这本书提供了许多关于如何编写高效Go代码的策略和技巧。本文将深入探讨这些最佳实践,帮助开发者提升程序性能。 1. **内存管理与垃圾回收** Go语言采用自动垃圾回收机制,但在某些场景下...
### 高性能高并发服务器架构的关键知识点 ...以上内容总结了构建高性能高并发服务器架构的关键知识点和技术方案,涵盖了从理论到实践的各个方面。希望这些信息能够帮助读者更好地理解和实施高并发网站的架构设计。
- **重要性**: 在当前互联网高速发展的背景下,越来越多的网站和服务需要面对大量的在线用户,因此构建高性能、高并发的服务器架构变得尤为重要。 #### 2. 关键技术与策略 - **负载均衡**: 使用负载均衡技术可以将...
除了上述几个重点外,文档还可能包含了对美团iOS客户端架构的具体技术实现的深入分析,如iOS客户端如何应对高并发请求,如何利用多线程技术优化数据处理,以及如何通过设计合理的数据结构和算法来提升应用性能。...
MySQL性能优化是数据库管理中的关键环节,对于提升整个系统运行效率具有重大意义。在PHP高级开发工程师的工作中,深入理解并掌握MySQL的性能优化技巧,能够显著提高应用程序的响应速度和用户体验。架构设计在此过程...
性能优化不是非黑即白的事情,很多时候需要通过试验和错误来找到最佳实践。以上提到的策略并非仅适用于JPA,许多原则也适用于其他持久化框架和数据库交互场景。通过密切关注这些方面,可以显著提升JPA应用的性能,...
【阿里JAVA性能优化实战】是面向Java开发人员和架构设计人员的专业课程,旨在提供系统优化的知识和技巧。本文将深入探讨Java性能优化的关键点,并基于给出的部分内容进行详细阐述。 性能调优对于任何互联网公司都是...
1. **高并发性**:支持大量用户同时访问。 2. **短事务**:事务通常涉及少量数据修改,执行速度快。 3. **强一致性**:保证数据的一致性和准确性。 4. **读写操作平衡**:既有大量的读取操作也有频繁的写入操作。 5....
App 研发效能提升和性能优化实践 App工厂架构设计及应用 ArcoDesign:企业设计系统实践与思考 TDSQL升级版架构和关键技术介绍 DDD、BFF 和API First 在企业应用服务的实践和思考 DevOps助力BIP数字化转型提能增效 ...
性能测试是IT行业中一个至关重要的环节,特别是在软件开发与运维过程中,它确保系统在高负载、高并发等情况下能够正常运行。"性能测试问题分析思路"这个主题涉及到多个关键领域,包括性能测试的设计、执行、监控以及...
现在,我们进入了并行计算的时代,这意味着软件设计者需要重新思考如何利用多核处理器来提升应用程序的性能。 **摩尔定律的转变**:摩尔定律原本预测每18个月芯片上的晶体管数量会翻一番,从而带来性能的提升。然而...
首先,【高并发系统设计】涉及到的是在大量用户同时访问时,如何保证系统性能和可用性。这不仅关乎到系统的吞吐量,还关系到响应速度和稳定性。在设计高并发系统时,常见的挑战包括资源的合理分配、负载均衡、数据...
作为一本专业领域的书籍,它深入浅出地介绍了ORACLE数据库的性能优化技巧,是DBA们提升技能的重要参考资料。 首先,我们需要了解什么是DBA。DBA,即Database Administrator,是负责管理和维护数据库系统的专业人士...
随着技术的不断发展,微服务架构已经成为构建大型、复杂、分布式系统的一种主流方式,尤其在处理高并发、高可用性以及快速迭代的需求时,其优势尤为显著。本文将主要围绕以下几个关键知识点展开: 1. 微服务概念:...
### Java企业版中性能调节的最佳实践 #### 一、引言 在当今高度竞争的商业环境中,企业级应用...通过对软件栈各个层面的深入理解和不断实践,我们能够有效提升Java企业版应用的性能表现,为用户提供更优质的服务体验。
6. **PHP框架与性能**:分析常见PHP框架(如Laravel, Symfony)在性能优化上的特点和策略,如何选择合适的框架,以及如何对其进行定制以提高性能。 7. **Web服务器与PHP集成**:讲解如何配置Nginx、Apache等Web...
Java性能调优是Java开发中不可或缺的一项技能,尤其在高并发、大数据量的互联网环境中,系统的性能优化至关重要。本文将从实战角度出发,探讨如何通过深入理解和应用Java底层源码来提升系统性能。 首先,要成为一名...