浏览 3824 次
锁定老帖子 主题:python 读log文件
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-02-07
最后修改:2010-02-22
部分源码: # -*- coding: cp936 -*- #操作系统:winxp #python版本:2.5 #匹配事务并计算 from datetime import datetime from DTime import DTime from DataConvert import DataConvert class log: def log(self): #放置未匹配完成的数据 temp=dict() #存放待匹配的数据 tempTime=dict() #存放已匹配完成的数据 avg=dict() dt=DTime() i=0 b=True dc=DataConvert() items=dc.getData() strs=items.values() datas=items.keys() while b: try: #判断数据在之前是否已经存在 order=strs[i][0] key=strs[i][5] dtime=strs[i][2] if temp.has_key(key): #调用时间转换函数 usetime=dt.convertTime(tempTime.get(key)) othertime=dt.convertTime(dtime) if usetime>othertime: usetime=usetime-othertime elif usetime<othertime: usetime=othertime-usetime else: usetime=usetime-usetime temp.pop(key) tempTime.pop(key) avg[order]=usetime else: tempTime[key]=dtime for k in datas: if items.get(k)==strs[i]: temp[key]=k i=i+1 except: b=False print '共处理',i,'条数据' dc.storeTemp(temp) return (avg,temp) #x=log() #t=x.log() # -*- coding: cp936 -*- #从日志文件中得到数据,并处理得到的数据 from File import File from filterData import filterData class DataConvert: def getData(self): i=0 b=True self.f=File() data=dict() ls=self.f.loadLog() fd=filterData() strs=fd.filterStr(ls) print '加载上次未匹配完的数据' t=self.f.loadTemp() if t!=None: if len(t)!=0: for value in t.values(): strs.append(value) print '成功添加上次未匹配完的数据',len(t),'条' #分解数据 while b: try: order=strs[i].split(' ',1) ip=order[1].split(' [',1) dtime=ip[1].split('] ',1) desc=dtime[1].split(': ') answer=desc[1] key=strs[i].split(' [') if data.get(strs[i])!=strs[i]: l=list() l.append(order[0]) l.append(ip[0]) l.append(dtime[0]) l.append(desc[0]) l.append(answer) l.append(key[0]) data[strs[i]]=l i=i+1 except: b=False print '数据转换处理完毕' return data def storeTemp(self,temp): self.f.storeTemp(temp) #dc=DataConvert() #dc.getData() # -*- coding: cp936 -*- #时间转换类 from datetime import datetime class DTime: #datetime(*(time.strptime(date_string, format)[0:6])) def convertTime(self,dtime): str=dtime.split(' ') d=str[0].split('-') t=str[1].split(':') yy=d[0] mm=d[1] dd=d[2] hh=t[0] m=t[1] ss=t[2] return datetime(int(yy),int(mm),int(dd),int(hh),int(m),int(ss)) # -*- coding: cp936 -*- #扫描日志文件 #log为新的要扫描的日志文件 #temp为上次扫描过程中未匹配完的的数据 import cPickle as p class File: def loadLog(self): try: f=open('log','r') strs=f.readlines() return strs except: print '加载文件失败,请查看文件是否存在' finally: f.close() def storeTemp(self,data): try: f=open('temp','w') p.dump(data, f) f.close() except: print '存储未匹配完成的数据失败' finally: f.close() def loadTemp(self): try: f=open('temp','r') data=p.load(f) return data except: print '文件中没有数据' finally: f.close() #f=File() #print f.loadTempTime() # -*- coding: cp936 -*- #过滤数据是否合法 class filterData: def filterStr(self,strs): i=0 print '一共扫描到',len(strs) ,'条数据' for spares in strs: spare=spares.split(' ') if len(spare)!=7: i=i+1 strs.remove(spares) if i!=0: print '丢弃了',i ,'条不合格的数据' return strs #fd=filterData() #fd.filterStr() # -*- coding: cp936 -*- #报表类 class Report: def report(self,args): avg=args[0] leave=args[1] values=avg.values() max=values[0] min=values[0] maxkeys=list() minkeys=list() #find the max value and min value for value in values: if max<value: max=value if min>value: min=value keys=avg.keys() print '--------------------------报告----------------------------' print '---完整事务---' print '完整的事务有:',len(avg),' 个' print ' 详情如下:' for key in keys: if avg.get(key)==max: maxkeys.append(key) if avg.get(key)==min: minkeys.append(key) #j=j+1 print ' 事务',key,' 的时间为:',avg.get(key) print ' 其中,最长时间相同的事务有:',len(maxkeys),'个' for maxkey in maxkeys: print ' 事务为',maxkey,' 的时间为:',avg.get(maxkey) print ' 其中,最短时间相同的事务有:',len(minkeys),'个' for minkey in minkeys: print ' 事务为',minkey,' 的时间为:',avg.get(minkey) print '----------------------------------------------------------' print '---不完整事务----' print '不完整的事务有:',len(leave),' 个' print ' 详情如下:' keys=leave.keys() for key in keys: print ' ',leave.get(key) print '--------------------------结束-----------------------------' # -*- coding: cp936 -*- #测试log类 import unittest from report import Report from log import log class TestLog(unittest.TestCase): def setUp(self): self.r=Report() self.Log=log() def tearDown(self): self.r=None self.Log=None def testLog(self): test=self.Log.log() assert test!=None,'测试通过' #打印报告 self.r.report(test) def main(self): suite = unittest.TestLoader().loadTestsFromTestCase(TestLog) unittest.TextTestRunner().run(suite) test=TestLog("testLog") test.main() 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |