#! /usr/bin/env python
#coding=utf-8
import threading,sys
import random
import time
from Queue import Queue
import MySQLdb,time,datetime
isFinish=False
count_num_sql='''select count(1) t from fact_user_msg '''
#con_sql='''where ( d_o_lastordertime>='2012-06-19' and s_usr_level>60 and i_u_verified=1 ) '''
con_sql='''where ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4 and i_u_verified=1 ) '''
#con_sq3='''where ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4 and i_u_verified=1 and f_o_total_spend_3m<200 and f_o_kdj_3m>300 and i_o_sendNumber_3m<10) '''
fetch_data_sql='''select s_o_usermob from fact_user_msg '''
pageSize=50000
condition = threading.Condition()
class Producer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
#建立和数据库系统的连接
conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307)
#conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306)
#获取操作游标
cursor_src = conn_src.cursor()
#cursor_src.execute("set autocommit = 1")
startTime=datetime.datetime.now()
print "----start time:"+startTime.strftime("%a, %d %b %Y %H:%M:%S +0000")
cursor_src.execute(count_num_sql+con_sql)
result = cursor_src.fetchone()
endTime=datetime.datetime.now()
print "----end time:"+endTime.strftime("%a, %d %b %Y %H:%M:%S +0000")
print "count cost time(s):"+str((endTime-startTime).seconds)
total=result[0]
print "data total lines:"+str(total)
maxNum=500000
if(total>maxNum):
total=maxNum
for i in range(0,total,pageSize):
#print self.getName()+'add to queue'+str(i)
self.sharedata.put(i)
#print self.getName()+'add end to queue'+str(i)
#time.sleep(random.randrange(10)/10.0)
#time.sleep(8)
for j in range(1,5):
#print 'add None to queue'+str(j)
self.sharedata.put(None)
#print '======== NEW ==========='
isFinish=True
conn_src.close();
if condition.acquire():
condition.notify()
condition.release()
print self.getName()+'Finished'
# Consumer thread
class Consumer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307,charset='utf8')
#conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306,charset='utf8')
#获取操作游标
cursor_src = conn_src.cursor()
while True:
try:
item=self.sharedata.get()
if item==None:
#print self.getName()+"is empty2"
self.sharedata.task_done()
break
#print self.getName()+'got a value:'+str(item)
print "exec fetcch start"+str(item)+datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0000")
cursor_src.execute(fetch_data_sql+con_sql+" limit "+str(item)+","+str(pageSize))
cursor_src.fetchone()
endTime1=datetime.datetime.now()
print "fetcch----end time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")
self.sharedata.task_done()
except Queue.empty:
print sys.exc_info()[:2]
break
except :
print "over"
break
#time.sleep(random.randrange(10)/10.0)
print self.getName()+'Finished'
#self.sharedata.task_done()
conn_src.close();
return
# Main thread
def main():
queue = Queue()
producer = Producer('Producer', queue)
consumer1 = Consumer('Consumer1', queue)
consumer2 = Consumer('Consumer2', queue)
consumer3 = Consumer('Consumer3', queue)
consumer4 = Consumer('Consumer4', queue)
print 'Starting threads ...'
producer.start()
startTime1=datetime.datetime.now()
print "----start1 time:"+startTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")
consumer1.start()
consumer2.start()
consumer3.start()
consumer4.start()
if condition.acquire():
condition.wait()
queue.join()
endTime1=datetime.datetime.now()
print "----end1 time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")
print "fetcch data cost time(s):"+str((endTime1-startTime1).seconds)
print 'All threads have terminated.'
if __name__ == '__main__':
main()
分享到:
相关推荐
infobright-4.0.7,32位系统,32位。
然而,对于学习和测试数据分析解决方案,以及小规模的项目,Infobright社区版是一个非常有价值的选择。 总的来说,Infobright社区版提供了一个强大且高效的列式数据仓库平台,适合对大量数据进行快速分析。通过使用...
5. 测试迁移后的数据,确保数据的完整性和一致性。 6. 配置和优化Infobright实例以适应查询负载和性能需求。 总之,“mysql.rar_infobright”是一个关于MySQL数据库到Infobright数据仓库迁移的资源包,通过这个...
Infobright是一款开源的数据仓库系统,专为大数据分析设计,具有高效能、高并发和低存储成本的特点。本文将详细解析Infobright的核心技术、安装过程以及如何利用Infobright-4.0.7-0-x86_64-ice.rpm安装包进行部署。 ...
Infobright是一款高性能的列式数据库系统,尤其适合大数据分析场景。其核心优势在于其独特的数据存储和查询优化机制,这使得它在处理大规模数据分析时能展现出极高的性能。 Infobright的架构基于MySQL,但采用了...
9. **扩展性**:随着数据量的增长,Infobright可以通过添加更多节点实现水平扩展,构建大规模并行处理集群,以应对日益增长的存储和计算需求。 10. **维护与备份**:Infobright提供了便捷的数据导入导出功能,以及...
Infobright是一款高效、轻量级的开源数据仓库解决方案,主要设计用于大数据分析。这款数据库管理系统以其出色的查询性能和极低的存储需求而受到关注。标题中的"infobright4.0.7-win.zip"表明这是一个针对Windows操作...
**Ubuntu安装Infobright及主从同步** Infobright是一款开源的企业级数据仓库系统,以其高效的数据压缩和快速查询性能而受到关注。在Ubuntu操作系统上安装Infobright并配置主从同步是数据库管理员常见的任务,这有助...
5. **分布式处理**:通过分布式架构,Infobright可以在多节点环境中扩展,以处理PB级别的大数据。 6. **低维护成本**:Infobright设计时考虑了低维护性,减少了数据库管理员的日常工作。 在使用Infobright构建数据...
在架构方面,Infobright 给我展示了不少新想法,算是受益颇多吧。首先是按列存储,然后把列数据切成小块(Data Pack),进行压缩和统计(DPN, Data Pack Node),然后再对多块数据之间进行知识关联(Knowledge Node...
Infobright是一款高效的数据仓库解决方案,专为大数据分析设计。这款产品以其优秀的性能、低存储成本和快速查询能力而闻名。在你提供的信息中,“infobright-4.0.7-0-x86_64-ice.rpm”是Infobright的一个特定版本,...
infobright-4.0.7-0-x86_64-ice.zip数据库rpm包文件(infobright)
3. **测试框架(Testing Framework)**:包含单元测试和集成测试,确保代码质量并验证功能的正确性。 4. **文档(Documentation)**:可能包括API参考、开发者指南、用户手册等内容,帮助开发者理解和使用...
综上所述,"infobright.7z"包含了Infobright数据库的相关资源,使用这些资源需要了解列式存储的优势、数据库的安装配置、查询优化、安全设置、监控维护以及备份恢复等多个方面的知识。对于需要处理大数据分析的企业...
6. **扩展性**:Infobright支持水平扩展,通过添加更多的服务器节点来提升处理能力。这种集群架构使得系统能够随着数据量的增长而扩展,无需重构整个基础设施。 7. **社区支持**:Infobright有一个活跃的开发者和...
Infobright是一款高效、开源的数据仓库系统,专为大规模数据集的分析而设计。RPM(Red Hat Package Manager)是Linux系统中广泛使用的软件包管理器,用于安装、升级、查询和卸载软件。在本例中,“infobright-rpm....
列式存储的优势在于,对于查询操作,尤其是涉及多列的聚合查询,它能显著提高查询速度,因为只需读取所需列的数据,而非整个行。这种设计在大数据分析领域非常受欢迎,因为它可以减少I/O操作,提升查询效率。 安装...
greenplum 简介及数据库对比 。 greenplum hive infobright 对比。