浏览 2782 次
锁定老帖子 主题:pycul写的对url地址的性能测试脚本
精华帖 (0) :: 良好帖 (0) :: 新手帖 (3) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-01-20
最后修改:2010-02-02
这个是对常规的url做性能测试用的. 例子: Python testtype1.py www.tudou.com 3 2 result.log 第一个参数可以是一个url 最好不要带&符号…如果url要带&符号 请用~amp; 代替 第二个参数 是 几秒后开始 第三个参数是 并发 个数(根据我在做当时直播服务器时测试的感觉,这个参数最大值在1500至2500中,一旦太大就脚本自己会crash,所以要实现更多的线程并发只能通过到多台机器上跑这个脚本,这也是那个性能测试工具做的事.) 第四个参数是日志文件地址 一般情况下这个脚本启动后,你不去kill进程200秒后会自己退出,当然如果你想延长时间可以自己该脚本中run_time这个值.你可以开2个终端到脚本的当前目录,一个用来启动脚本,一个用来 tail –f result.log(日志名),看数据如下: 创建2个线程 3秒后访问 tudou.com 日志会写到result.log中 每3秒写一次 日志的形式: Transactions:0,Availability:0.0,Elapsed_time:2.001,Data_transferred:0.0,Response_time:0.0,Transaction_rate:0.0,Throughput:0.0,Concurrency:0.0,Successful_transactions:0,Failed_transactions:0,Longest_transaction:0.0,Shortest_transaction:100000.0 Transactions:49246,Availability:95.2,Elapsed_time:8.017,Data_transferred:11.532,Response_time:6.635,Transaction_rate:150.727,Throughput:0.04,Concurrency:200.0,Successful_transactions:46875,Failed_transactions:0,Longest_transaction:0.19,Shortest_transaction:0.36 Transactions:98630,Availability:93.7,Elapsed_time:14.025,Data_transferred:23.303,Response_time:7.036,Transaction_rate:142.126,Throughput:0.04,Concurrency:200.0,Successful_transactions:92410,Failed_transactions:0,Longest_transaction:0.2,Shortest_transaction:0.36 对应的日志数据含义: #执行次数 #成功率 #总耗时 #数据量(MB) #响应时间(ms) #执行率(t/s) #吞吐速度(mb/s) #并发个数 #成功次数 #失败次数 #最长响应时间 #最短响应时间 )艾丝凡答复 这个脚本需要装下pycurl就可以跑了,linux,windows都可以跑 #! /usr/bin/env python # -*- coding: UTF-8 -*- # Author: zeal import pycurl,time,threading,random,StringIO import sys,socket socket.setdefaulttimeout(60) Longest_transaction = 0 Shortest_transaction = 100.0 error_amount = 0 pass_amount = 0 total_amount = 0 total_time = 0 downloaded = 0 threads_dict = {} threads_amount = 0 run_time = 200 begin_time = time.time() last_line="Transactions:%s,Availability:%s,Elapsed_time:%s,Data_transferred:%s,Response_time:%s,Transaction_rate:%s,Throughput:%s,Concurrency:%s,Successful_transactions:%s,Failed_transactions:%s,Longest_transaction:%s,Shortest_transaction:%s" print last_line #print 'last_linelen',len(last_line.split(',')),last_line.split(',') def write_data(filehandle): """ 计算数据,往日志文档写日志数据 """ Concurrency = threading.activeCount()-2 haveruntime = time.time() - begin_time try : Availability = ( pass_amount*1.0/total_amount ) Response_time = ( total_time/pass_amount ) Transaction_rate = ( pass_amount/haveruntime ) Throughput = ( downloaded/total_time )/(1024*1024) except Exception, e: print 'write data error%s'%e Availability = 0 Response_time = 0 Transaction_rate = 0 Throughput = 0 Concurrency = threading.activeCount()-2 haveruntime = time.time() - begin_time print 'haveruntime=',haveruntime data= (int(total_amount),#执行次数 round(Availability*100.0,1),#成功率 round(haveruntime,3),#总耗时 round(downloaded/(1048576.0),3 ),#数据量(MB) round(Response_time*1000,3),#响应时间(ms) round(Transaction_rate,3),#执行率(t/s) round(Throughput,2),#吞吐速度(mb/s) round(Concurrency,3),#并发个数 int(pass_amount),#成功次数 int(total_amount-pass_amount),#失败次数 round(Longest_transaction,2),#最长响应时间 round(Shortest_transaction*1000,2)#最短响应时间 ) print 'format data =',data result = last_line%data tmp=[e.split(":")[1] for e in result.strip().split(",")] print 'tmp=',tmp print time.ctime(),'result =',result filehandle.write(result+'\n') filehandle.flush() class Test(threading.Thread): def __init__(self,name): threading.Thread.__init__(self) self.name = 'thread_'+str(name) self.data = StringIO.StringIO() self.curl = pycurl.Curl() self.total_time = 0.0 self.downloaded = 0.0 self.pass_amount = 0.0 self.total_amount = 0.0 self.curl.setopt(pycurl.NOPROGRESS, 1) self.curl.setopt(pycurl.WRITEFUNCTION, self.data.write) self.curl.setopt(pycurl.MAXREDIRS, 5) self.curl.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) ; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; InfoPath.2)") self.curl.setopt(pycurl.NOSIGNAL, 1) self.info = { 'Transactions':0, 'Availability':0.0, 'Elapsed_time':0, 'Data_transferred':0, 'Response_time':0, 'Transaction_rate':0, 'Throughput':0, 'Concurrency':1, } def run (self): """ 主程序,对单个url做不断的请求,直到规定时间到,自动结束生命周期 """ while True: try: global error_amount,Longest_transaction,Shortest_transaction,pass_amount,total_amount,downloaded,total_time """ 控制单个线程的生命时间 """ if time.time()-begin_time>run_time: break url = random.choice(urllist) print url,'name',self.name # c = pycurl.Curl() self.curl.setopt(pycurl.URL, url) self.curl.perform() self.total_time += self.curl.getinfo(pycurl.TOTAL_TIME) total_time += self.curl.getinfo(pycurl.TOTAL_TIME) self.downloaded += self.curl.getinfo(pycurl.SIZE_DOWNLOAD) downloaded+= self.curl.getinfo(pycurl.SIZE_DOWNLOAD) """ 获取最小响应时间和最长响应时间,不断的替换这2个全局变量 """ if self.curl.getinfo(pycurl.TOTAL_TIME) > Longest_transaction: Longest_transaction = self.curl.getinfo(pycurl.TOTAL_TIME) if self.curl.getinfo(pycurl.TOTAL_TIME) < Shortest_transaction : Shortest_transaction = self.curl.getinfo(pycurl.TOTAL_TIME) """ 计算成功和总执行次数 """ self.pass_amount += 1 pass_amount =pass_amount + 1 self.total_amount +=1 total_amount =total_amount + 1 """ 每个线程的数据 """ if self.total_time and self.pass_amount: self.info = { 'Transactions':self.total_amount, 'Concurrency':1, 'Successful_transactions':self.pass_amount, 'Elapsed_time':self.total_time, 'Data_transferred':self.downloaded, 'Availability':self.pass_amount/self.total_amount, 'Response_time':self.total_time/self.pass_amount, 'Transaction_rate':self.pass_amount/self.total_time, 'Throughput':self.downloaded/self.total_time } except Exception, e: error_amount +=1 self.total_amount +=1 total_amount =total_amount + 1 self.curl = pycurl.Curl() print "self.curl.perform() Exception : %s"%e self.curl.close() def progress(self,download_t, downloaded, upload_total, uploaded): # print 'progress=',download_t,downloaded,upload_total,uploaded pass class Collector(threading.Thread): """ 收集每个线程的数据来汇总结果,每6秒往日志文档写一次数据 """ def __init__(self,path = 'result.log'): threading.Thread.__init__(self) self.log_path = path self.name = 'collector' def run (self): # try: log = open(log_path,'a') while True: # global error_amount,Longest_transaction,Shortest_transaction,pass_amount,threads_dict,total_amount,downloaded,total_time write_data(log) time.sleep(6) if time.time()-begin_time>run_time+4: write_data(log) time.sleep(3) break write_data(log) # except Exception, e: # print "Collector Exception : %s"%e log.close() if __name__=='__main__': """ 获取输入参数 """ if len(sys.argv)>2 : print 'sys.argv=',sys.argv cmd = str(sys.argv[-4]).replace('~amp;','&')#url地址 sleep_time = int(sys.argv[-3])#-time.time()#过几秒后开始执行,这个参数是配合分布式,多台机器一起跑脚本时用到,单个执行不是很重要 threads_amount = int(sys.argv[-2])#并发量,创建多少线程 log_path=str(sys.argv[-1]) #日志文件地址 else: """ 无输入参数时的默认参数 """ cmd = 'www.baidu.com' sleep_time = 3 threads_amount = 2 log_path = 'result.log' r_file = open(log_path,'a') init = last_line%(0,0,0,0,0,0,0,0,0,0,0,0) r_file.write(init+'\n') r_file.close() if sleep_time>0: print 'will sleep %d s'%sleep_time time.sleep(sleep_time) print 'start work~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' urls = cmd.split('@') c = Collector(log_path) collector = [c] c.setDaemon(1) c.start() urllist = urls print 'urllist=',urllist for i in range(threads_amount): t = Test(i) threads_dict[t.name] = t print 'threads_dict',threads_dict for t in threads_dict.values(): t.setDaemon(1) t.start() print t.name,'have started' for th in threads_dict.values(): th.join() print 'name=',th.name,'info=',th.info print th.name,'have dead' for th in collector: th.join() f = open(log_path,'a') print 'Longest_transaction=',Longest_transaction, print 'Shortest_transaction=',Shortest_transaction, print 'error_amount=',error_amount print 'pass_amount=',pass_amount print 'total_amount=',total_amount print 'Availability=',(pass_amount/total_amount)*100.0 print 'total_time=',total_time print 'over' 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |