- 浏览: 14891 次
- 性别:
- 来自: 我在
文章分类
- 全部博客 (45)
- java_Collection (2)
- java_String (0)
- sql_oracle (9)
- html_table (1)
- js/Jquery (1)
- Eclipse (2)
- python (1)
- jit (0)
- jms (0)
- Google AdSense (0)
- tools (1)
- ajax (2)
- 我勒个去 (10)
- java_package (2)
- jsm (1)
- jvm_heap (1)
- Notepad++_plugin (1)
- JSON (0)
- css (1)
- Jquery_datatable (0)
- AOP (1)
- java_Map (1)
- java_Vm (2)
- JDK_JRE (1)
- sonar (1)
- java_thread (1)
- linux (1)
- 租房公积金 (0)
- spark (1)
- bookmark (0)
- netty (0)
- ubuntu server (1)
- maven (0)
- java_annotation (1)
最新评论
#coding=utf-8
'''
Created on 2015-12-24
@author: Administrator
'''
from selenium.webdriver.common.action_chains import ActionChains
from keywords import keywords
from utils import csvread
from utils import ftp
from utils import MML
from cspAutoTest.utils import oracle
import time
class pm(keywords.commonkeywords):
#运行mml命令
def run_mml(self,emsip,startTime,endTime,indexid='300248'):
mml = MML.MML()
#mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="2016-1-4 12:00:00",ENDDATE="2016-1-4 13:00:00",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="'+startTime+'",ENDDATE="'+endTime+'",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
tmp = mml.login(emsip, '21123', 'admin', '')
if tmp == False:
raise AssertionError("The mml login failed")
return
filename = mml.execute_mml(mmlstr)
mml.logout()
print filename
return filename
#从ems下载csv到本地
def download_csvfile(self,emsip,filename,indexid='300248'):
ftp.downloadfile(emsip,filename)
#解析csv获取指标值
def read_index(self):
tmp = csvread.readcsv()
return tmp
#新建性能监控项
def newMonitorItem(self,monitorName,indexid):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.implicitly_wait(2)
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#单击新建
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[1]/a').click()
#输入监控项名称
self.driver.find_element_by_xpath('/html/body/div/div/div[1]/input').send_keys(monitorName)
time.sleep(3)
#过滤指标
self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/div[1]/div[2]/label/input').send_keys(indexid)
self.driver.implicitly_wait(2)
#选中关联指标
table=self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/table')
self.driver.implicitly_wait(2)
tbody=table.find_element_by_xpath("/html/body/div/div/div[2]/div/div/div/div[1]/table/tbody")
self.driver.implicitly_wait(2)
trs=tbody.find_elements_by_tag_name('tr')
for tr in trs:
self.driver.implicitly_wait(2)
tds = tr.find_elements_by_tag_name("td")
if tds[1].text==indexid:
tds[0].find_elements_by_tag_name("input")[0].click()
break
#self.driver.find_element_by_id(indexid).click()
time.sleep(3)
#确认提交
self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[2]/button[1]').click()
#确认
time.sleep(3)
self.driver.switch_to_alert().accept()
#确认监控项是否已入库
def confirmMonitorItem(self,monitorName,indexid,dbpath):
sql='select t.indexid from GUTLCSP_RM4X.INDEXBASEINFO t where t.indexname=:indexname'
para={'indexname':monitorName}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
print result[0][0]
sql2='select * from CSPPM.PM_SUPERVIEW_ITEM_TABLE t where t.indexid=:indexid and t.privateindexid=:privateindexid'
para2={'indexid':result[0][0],'privateindexid': indexid }
result = oratool.query(sql2,para2)
oratool.close()
return len(result)>0
#查看监控项详细信息
def scanMonitorInfo(self,monitorName,product):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.implicitly_wait(2)
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#获取性能监控项列表table
self.driver.implicitly_wait(2)
tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
self.driver.implicitly_wait(2)
trs = tbody.find_elements_by_tag_name("tr")
for i in range(len(trs)):
tds = trs[i].find_elements_by_tag_name("td")
print tds[1].text
if tds[1].text==monitorName:
tds[1].find_elements_by_tag_name("a")[0].click()
div=self.driver.find_element_by_xpath('/html/body/div[3]/div[2]')
lis=div.find_elements_by_tag_name("li")
for i in range(len(lis)):
if lis[i].text==product:
lis[i].find_elements_by_tag_name("a")[0].click()
break
#获取tag关闭按钮
self.driver.implicitly_wait(2)
closetag=self.driver.find_element_by_xpath('/html/body/div[3]/div[1]/div/a')
self.driver.implicitly_wait(2)
table=self.driver.find_element_by_id('table-3')
tbody=table.find_elements_by_tag_name('tbody')
trs=tbody[0].find_elements_by_tag_name('tr')
#指标名称
tds=trs[0].find_elements_by_tag_name('td')
indexname=tds[1].text
#指标类型
tds=trs[1].find_elements_by_tag_name('td')
mocname=tds[1].text
#严重阈值
tds=trs[2].find_elements_by_tag_name('td')
seriousThreshold=tds[1].text
#重要阈值
tds=trs[3].find_elements_by_tag_name('td')
majorThreshold=tds[1].text
#公式
tds=trs[4].find_elements_by_tag_name('td')
formula=tds[1].text
#关闭tag页
closetag.click()
self.driver.switch_to_window(self.mainPage)
self.driver.implicitly_wait(4)
about = self.driver.find_element_by_xpath('/html/body/div[1]/div/div/ul/li[2]/a')
about.click()
#self.driver.implicitly_wait(4)
#ActionChains(self.driver).move_to_element(about).perform()
return (indexname,mocname,seriousThreshold,majorThreshold,formula)
#查询监控项详细信息
def queryMonitorInfo(self,monitorName,indexid,dbpath):
sql='select t1.indexname,t2.mocname,t1.criticalmax,t1.majormax,t1.indexcount from CSPPM.PM_INDEX_TABLE t1,CSPPM.PM_MOTYPE_TABLE t2 where t1.mocid=t2.mocid and t1.indexid=:indexid'
para={'indexid':indexid}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
#print result[0]
oratool.close()
return self.decodegb2312(result[0])
#删除监控项
def deleteMonitorItem(self,monitorName):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#获取性能监控项列表table
tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
trs = tbody.find_elements_by_tag_name("tr")
for i in range(len(trs)):
tds = trs[i].find_elements_by_tag_name("td")
print tds[1].text
if tds[1].text==monitorName:
tds[0].find_elements_by_tag_name("input")[0].click()
time.sleep(3)
#确认删除提交
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[2]/a').click()
#确认
time.sleep(3)
self.driver.switch_to_alert().accept()
break
def getNetworkSummary2G(self,url):
self.driver.get(url)
time.sleep(5)
rncNum2g = self.driver.find_element_by_id("rncNum2g")
remoteSpNum2g = self.driver.find_element_by_id("remoteSpNum2g")
alarmLinkNum2g = self.driver.find_element_by_id("alarmLinkNum2g")
return (int(rncNum2g.text),int(remoteSpNum2g.text),int(alarmLinkNum2g.text))
def getNetworkSummary3G(self,url):
self.driver.get(url)
time.sleep(5)
rncNum3g = self.driver.find_element_by_id("rncNum3g")
remoteSpNum3g = self.driver.find_element_by_id("remoteSpNum3g")
alarmLinkNum3g = self.driver.find_element_by_id("alarmLinkNum3g")
return (int(rncNum3g.text),int(remoteSpNum3g.text),int(alarmLinkNum3g.text))
def queryNetworkSummary2G(self,dbpath):
sql1="select t1.neid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t2 where t2.sgsncnid is not null union select t3.neid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
sql21="select t1.oid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t2 where t2.dpc is not null"
sql22="select t3.oid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t3 where t3.sgsncnid is not null union select t4.oid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'gv3'"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
rncNum2g = oratool.queryWithNoPara(sql1)
remoteSpNum2g1 = oratool.queryWithNoPara(sql21)
remoteSpNum2g2 = oratool.queryWithNoPara(sql22)
alarmLink = oratool.queryWithNoPara(sql3)
return (len(rncNum2g),len(remoteSpNum2g1)+len(remoteSpNum2g2),len(alarmLink))
oratool.close()
def queryNetworkSummary3G(self,dbpath):
sql1="select t1.neid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.neid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
sql2="select t1.oid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.oid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.oid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'wcdma'"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
rncNum3g = oratool.queryWithNoPara(sql1)
remoteSpNum3g = oratool.queryWithNoPara(sql2)
alarmLink = oratool.queryWithNoPara(sql3)
return (len(rncNum3g),len(remoteSpNum3g),len(alarmLink))
oratool.close()
def verifyNetworkSummary2G(self,url,dbpath):
temp1 = self.queryNetworkSummary2G(dbpath)
temp2 = self.getNetworkSummary2G(url)
print temp1
print temp2
if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
raise Exception("not equal")
self.driver.back()
return True
def querySiteNum(self,dbpath,monitorTaskID,product):
if(product.lower()=="all"):
product=''
sql = 'select t.* from GUTLCSP_RM4X.SCENENE t,GUTLCSP_RM4X.MONITORTASK t1 where t.groupid=t1.groupid and t1.monitorid=:monitorTaskID and t.modeidlist like '+"'%"+product+"%'"
para={'monitorTaskID':monitorTaskID}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
#print result[0]
oratool.close()
return len(result)
def getSiteNumInPage(self,url):
self.driver.get(url)
time.sleep(5)
sceneSiteNum = self.driver.find_element_by_id("sceneSiteNum")
return int(sceneSiteNum.text)
#场景管理 --> 查看
#url = "http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
def checkScene(self,dbpath,url):
#跳转到sceneManagementTest.html页面
print "into method"
self.driver.get(url)
time.sleep(1)
#得到第一个场景的name和id
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
strlist = contentStr.split(',')
sceneId = strlist[0]
sceneName=strlist[1]
#点击“查看”
self.driver.find_element_by_id("tagA").click()
#从数据库中查询当前场景的网元信息
sql = "with t1 as (select ne.*,t2.rncid,t2.product,t2.displayname,t2.isbs,t2.modename,t6.area from GUTLCSP_RM4X.scenene ne left join (select me.oid,me.rncid,me.product,me.displayname,me.isbs,me.modename from GUTLCSP_RM4X.sdrmgr_nodeme me union select v3.oid,v3.bssfunctionid as rncid,v3.product,v3.displayname,v3.isbs,v3.modename from GUTLCSP_RM4X.gv3_bts v3 union select v4.oid,v4.bssfunctionid as rncid,v4.product,v4.displayname,v4.isbs,v4.modename from GUTLCSP_RM4X.gv4_bts v4 union select w.oid,'---' as rncid,w.product,w.displayname,'false' as isbs,w.modename from GUTLCSP_RM4X.wcdma_me w union select bsc.oid,'---' as rncid,bsc.product,bsc.displayname,'false' as isbs,bsc.modename from GUTLCSP_RM4X.gv3_bsc bsc union select rnc.oid,'---' as rncid,rnc.product,rnc.displayname,'false' as isbs,rnc.modename from GUTLCSP_RM4X.urop_rncmanagedelement rnc)t2 on t2.oid=ne.neoid left join (select t4.oid,t5.name as area from uep4x.res_treenode_table t4 left join GUTLCSP_RM4X.RES_GROUP t5 on t4.parentoid=t5.oid where t5.name is not null) t6 on t6.oid=ne.neoid where 1=1) select t7.* from ( select row_number() over(order by rncid asc) rn,t1.* from t1 where t1.groupid="+sceneId+") t7 where rn>0 and rn<=10"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
groupId = results[0][1]
neoid = results[0][2]
modeIdList = results[0][3]
gutneoid = results[0][4]
rncid = results[0][10]
nename = results[0][12]
#从当前页面抓取数据为了比较
allLi = self.driver.find_elements_by_tag_name('li')
#groupId_=allLi[0].text.split('-')[1]
#print groupId
#print groupId_
#if(groupId==groupId_):
#print "the groupId is same"
#else :
#raise Exception("the groupID is not same ")
gutneoid_=allLi[1].text.split('_')[1]
print gutneoid
print gutneoid_
if(gutneoid==gutneoid_):
print "the gutneoid is same"
else :
raise Exception("the gutneoid is not same ")
#modeidlist_=allLi[2].text.split('_')[1]
#print modeIdList
#print modeidlist_
#if(modeIdList==modeidlist_):
#print "the modeidlist is same"
#else :
#raise Exception("the modeidlist is not same ")
nename_=allLi[3].text.split('_')[1]
print nename
print nename_
if(nename_==nename):
print "the nename is same"
else :
raise Exception("the nename is not same ")
neoid_ = allLi[4].text.split('_')[1]
print neoid
print neoid_
if(neoid==neoid_):
print "the neoid is same"
else :
raise Exception("the neoid is not same ")
rncid_ = allLi[6].text.split('_')[1]
print rncid
print rncid_
if(rncid !=rncid_):
raise Exception("the rncid is not same ")
oratool.close()
#删除场景
def deleteScene(self,dbpath,url):
#跳转到sceneManagementTest.html页面
print "into method"
self.driver.get(url)
time.sleep(1)
#得到第一个场景的name和id
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
strlist = contentStr.split(',')
sceneId = strlist[0]
sceneName=strlist[1]
#点击“删除”
self.driver.find_element_by_id("delete").click()
#抓取删除响应的结果
print "start find_elements_by_tag_name"
pElement = self.driver.find_elements_by_tag_name('p')[0]
print "finish find_element_by_tag_name"
result = pElement.text
print result
#根据响应的结果与数据库记录比较
sql = "select count(*) as countNum from GUTLCSP_RM4X.sceneinfo where groupid = " +sceneId
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
print results
countNum = results[0][0]
print "countNum > 0 , scene is not be deleted"
if((result=="succes")&(countNum==0)):
print "delete scene success"
if((result=="forbidden")&(countNum!=0)):
print "can't delete scene because there are monitorTasks are running"
if(result=="fail"):
raise Exception("delete scene fail")
oratool.close()
#查询监控页面监控任务的名称与状态
def queryMonitorTaskInfo(self,dbpath,url):
#抓取网页的数据
self.driver.get(url)
time.sleep(5)
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
#根据监控任务Id和name跳转到趋势图界面
strlist = contentStr.split(',') # 用逗号分割str字符串,并保存到列表
id=strlist[0]
taskName=strlist[1]
#触发"--停止--"按钮
self.driver.find_element_by_id("stop").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为停止
if(stateFromDB!=2):
raise Exception("the state is :"+str(stateFromDB)+" so stop operation error ")
#触发"--暂停--"按钮
self.driver.find_element_by_id("pause").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为暂停
if(stateFromDB!=1):
raise Exception("the state is :"+str(stateFromDB)+" so pause operation error ")
#触发"--开始--"按钮
self.driver.find_element_by_id("start").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为开始
if(stateFromDB!=0):
raise Exception("the state is :"+str(stateFromDB)+" so start operation error ")
oratool.close()
self.driver.back()
def verifySiteNumInMap(self,url,dbpath,monitorTaskID,product):
temp1 = self.querySiteNum(dbpath,monitorTaskID,product)
temp2 = self.getSiteNumInPage(url)
#print temp1
#print temp2
if(temp1!=temp2):
raise Exception("site number is not equal!")
self.driver.back()
return True
def verifyNetworkSummary3G(self,url,dbpath):
temp1 = self.queryNetworkSummary3G(dbpath)
temp2 = self.getNetworkSummary3G(url)
print temp1
print temp2
if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
raise Exception("not equal")
self.driver.back()
return True
p = pm()
p.open2Browser("http://10.62.45.141:21180")
p.getMainPage("admin")
t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
#t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneMonitorTaskAutoTest.html"
#p.checkScene("10.62.45.169/CSP",t)
#p.queryMonitorTaskInfo("10.62.45.169/CSP",t)
p.deleteScene("10.62.45.169/CSP",t)
'''
Created on 2015-12-24
@author: Administrator
'''
from selenium.webdriver.common.action_chains import ActionChains
from keywords import keywords
from utils import csvread
from utils import ftp
from utils import MML
from cspAutoTest.utils import oracle
import time
class pm(keywords.commonkeywords):
#运行mml命令
def run_mml(self,emsip,startTime,endTime,indexid='300248'):
mml = MML.MML()
#mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="2016-1-4 12:00:00",ENDDATE="2016-1-4 13:00:00",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="'+startTime+'",ENDDATE="'+endTime+'",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
tmp = mml.login(emsip, '21123', 'admin', '')
if tmp == False:
raise AssertionError("The mml login failed")
return
filename = mml.execute_mml(mmlstr)
mml.logout()
print filename
return filename
#从ems下载csv到本地
def download_csvfile(self,emsip,filename,indexid='300248'):
ftp.downloadfile(emsip,filename)
#解析csv获取指标值
def read_index(self):
tmp = csvread.readcsv()
return tmp
#新建性能监控项
def newMonitorItem(self,monitorName,indexid):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.implicitly_wait(2)
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#单击新建
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[1]/a').click()
#输入监控项名称
self.driver.find_element_by_xpath('/html/body/div/div/div[1]/input').send_keys(monitorName)
time.sleep(3)
#过滤指标
self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/div[1]/div[2]/label/input').send_keys(indexid)
self.driver.implicitly_wait(2)
#选中关联指标
table=self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/table')
self.driver.implicitly_wait(2)
tbody=table.find_element_by_xpath("/html/body/div/div/div[2]/div/div/div/div[1]/table/tbody")
self.driver.implicitly_wait(2)
trs=tbody.find_elements_by_tag_name('tr')
for tr in trs:
self.driver.implicitly_wait(2)
tds = tr.find_elements_by_tag_name("td")
if tds[1].text==indexid:
tds[0].find_elements_by_tag_name("input")[0].click()
break
#self.driver.find_element_by_id(indexid).click()
time.sleep(3)
#确认提交
self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[2]/button[1]').click()
#确认
time.sleep(3)
self.driver.switch_to_alert().accept()
#确认监控项是否已入库
def confirmMonitorItem(self,monitorName,indexid,dbpath):
sql='select t.indexid from GUTLCSP_RM4X.INDEXBASEINFO t where t.indexname=:indexname'
para={'indexname':monitorName}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
print result[0][0]
sql2='select * from CSPPM.PM_SUPERVIEW_ITEM_TABLE t where t.indexid=:indexid and t.privateindexid=:privateindexid'
para2={'indexid':result[0][0],'privateindexid': indexid }
result = oratool.query(sql2,para2)
oratool.close()
return len(result)>0
#查看监控项详细信息
def scanMonitorInfo(self,monitorName,product):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.implicitly_wait(2)
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#获取性能监控项列表table
self.driver.implicitly_wait(2)
tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
self.driver.implicitly_wait(2)
trs = tbody.find_elements_by_tag_name("tr")
for i in range(len(trs)):
tds = trs[i].find_elements_by_tag_name("td")
print tds[1].text
if tds[1].text==monitorName:
tds[1].find_elements_by_tag_name("a")[0].click()
div=self.driver.find_element_by_xpath('/html/body/div[3]/div[2]')
lis=div.find_elements_by_tag_name("li")
for i in range(len(lis)):
if lis[i].text==product:
lis[i].find_elements_by_tag_name("a")[0].click()
break
#获取tag关闭按钮
self.driver.implicitly_wait(2)
closetag=self.driver.find_element_by_xpath('/html/body/div[3]/div[1]/div/a')
self.driver.implicitly_wait(2)
table=self.driver.find_element_by_id('table-3')
tbody=table.find_elements_by_tag_name('tbody')
trs=tbody[0].find_elements_by_tag_name('tr')
#指标名称
tds=trs[0].find_elements_by_tag_name('td')
indexname=tds[1].text
#指标类型
tds=trs[1].find_elements_by_tag_name('td')
mocname=tds[1].text
#严重阈值
tds=trs[2].find_elements_by_tag_name('td')
seriousThreshold=tds[1].text
#重要阈值
tds=trs[3].find_elements_by_tag_name('td')
majorThreshold=tds[1].text
#公式
tds=trs[4].find_elements_by_tag_name('td')
formula=tds[1].text
#关闭tag页
closetag.click()
self.driver.switch_to_window(self.mainPage)
self.driver.implicitly_wait(4)
about = self.driver.find_element_by_xpath('/html/body/div[1]/div/div/ul/li[2]/a')
about.click()
#self.driver.implicitly_wait(4)
#ActionChains(self.driver).move_to_element(about).perform()
return (indexname,mocname,seriousThreshold,majorThreshold,formula)
#查询监控项详细信息
def queryMonitorInfo(self,monitorName,indexid,dbpath):
sql='select t1.indexname,t2.mocname,t1.criticalmax,t1.majormax,t1.indexcount from CSPPM.PM_INDEX_TABLE t1,CSPPM.PM_MOTYPE_TABLE t2 where t1.mocid=t2.mocid and t1.indexid=:indexid'
para={'indexid':indexid}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
#print result[0]
oratool.close()
return self.decodegb2312(result[0])
#删除监控项
def deleteMonitorItem(self,monitorName):
self.driver.switch_to_frame("page-mainIframesystem")
#单击性能监控项列表
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
#获取性能监控项列表table
tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
trs = tbody.find_elements_by_tag_name("tr")
for i in range(len(trs)):
tds = trs[i].find_elements_by_tag_name("td")
print tds[1].text
if tds[1].text==monitorName:
tds[0].find_elements_by_tag_name("input")[0].click()
time.sleep(3)
#确认删除提交
self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[2]/a').click()
#确认
time.sleep(3)
self.driver.switch_to_alert().accept()
break
def getNetworkSummary2G(self,url):
self.driver.get(url)
time.sleep(5)
rncNum2g = self.driver.find_element_by_id("rncNum2g")
remoteSpNum2g = self.driver.find_element_by_id("remoteSpNum2g")
alarmLinkNum2g = self.driver.find_element_by_id("alarmLinkNum2g")
return (int(rncNum2g.text),int(remoteSpNum2g.text),int(alarmLinkNum2g.text))
def getNetworkSummary3G(self,url):
self.driver.get(url)
time.sleep(5)
rncNum3g = self.driver.find_element_by_id("rncNum3g")
remoteSpNum3g = self.driver.find_element_by_id("remoteSpNum3g")
alarmLinkNum3g = self.driver.find_element_by_id("alarmLinkNum3g")
return (int(rncNum3g.text),int(remoteSpNum3g.text),int(alarmLinkNum3g.text))
def queryNetworkSummary2G(self,dbpath):
sql1="select t1.neid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t2 where t2.sgsncnid is not null union select t3.neid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
sql21="select t1.oid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t2 where t2.dpc is not null"
sql22="select t3.oid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t3 where t3.sgsncnid is not null union select t4.oid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'gv3'"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
rncNum2g = oratool.queryWithNoPara(sql1)
remoteSpNum2g1 = oratool.queryWithNoPara(sql21)
remoteSpNum2g2 = oratool.queryWithNoPara(sql22)
alarmLink = oratool.queryWithNoPara(sql3)
return (len(rncNum2g),len(remoteSpNum2g1)+len(remoteSpNum2g2),len(alarmLink))
oratool.close()
def queryNetworkSummary3G(self,dbpath):
sql1="select t1.neid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.neid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
sql2="select t1.oid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.oid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.oid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'wcdma'"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
rncNum3g = oratool.queryWithNoPara(sql1)
remoteSpNum3g = oratool.queryWithNoPara(sql2)
alarmLink = oratool.queryWithNoPara(sql3)
return (len(rncNum3g),len(remoteSpNum3g),len(alarmLink))
oratool.close()
def verifyNetworkSummary2G(self,url,dbpath):
temp1 = self.queryNetworkSummary2G(dbpath)
temp2 = self.getNetworkSummary2G(url)
print temp1
print temp2
if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
raise Exception("not equal")
self.driver.back()
return True
def querySiteNum(self,dbpath,monitorTaskID,product):
if(product.lower()=="all"):
product=''
sql = 'select t.* from GUTLCSP_RM4X.SCENENE t,GUTLCSP_RM4X.MONITORTASK t1 where t.groupid=t1.groupid and t1.monitorid=:monitorTaskID and t.modeidlist like '+"'%"+product+"%'"
para={'monitorTaskID':monitorTaskID}
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
result = oratool.query(sql,para)
#print result[0]
oratool.close()
return len(result)
def getSiteNumInPage(self,url):
self.driver.get(url)
time.sleep(5)
sceneSiteNum = self.driver.find_element_by_id("sceneSiteNum")
return int(sceneSiteNum.text)
#场景管理 --> 查看
#url = "http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
def checkScene(self,dbpath,url):
#跳转到sceneManagementTest.html页面
print "into method"
self.driver.get(url)
time.sleep(1)
#得到第一个场景的name和id
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
strlist = contentStr.split(',')
sceneId = strlist[0]
sceneName=strlist[1]
#点击“查看”
self.driver.find_element_by_id("tagA").click()
#从数据库中查询当前场景的网元信息
sql = "with t1 as (select ne.*,t2.rncid,t2.product,t2.displayname,t2.isbs,t2.modename,t6.area from GUTLCSP_RM4X.scenene ne left join (select me.oid,me.rncid,me.product,me.displayname,me.isbs,me.modename from GUTLCSP_RM4X.sdrmgr_nodeme me union select v3.oid,v3.bssfunctionid as rncid,v3.product,v3.displayname,v3.isbs,v3.modename from GUTLCSP_RM4X.gv3_bts v3 union select v4.oid,v4.bssfunctionid as rncid,v4.product,v4.displayname,v4.isbs,v4.modename from GUTLCSP_RM4X.gv4_bts v4 union select w.oid,'---' as rncid,w.product,w.displayname,'false' as isbs,w.modename from GUTLCSP_RM4X.wcdma_me w union select bsc.oid,'---' as rncid,bsc.product,bsc.displayname,'false' as isbs,bsc.modename from GUTLCSP_RM4X.gv3_bsc bsc union select rnc.oid,'---' as rncid,rnc.product,rnc.displayname,'false' as isbs,rnc.modename from GUTLCSP_RM4X.urop_rncmanagedelement rnc)t2 on t2.oid=ne.neoid left join (select t4.oid,t5.name as area from uep4x.res_treenode_table t4 left join GUTLCSP_RM4X.RES_GROUP t5 on t4.parentoid=t5.oid where t5.name is not null) t6 on t6.oid=ne.neoid where 1=1) select t7.* from ( select row_number() over(order by rncid asc) rn,t1.* from t1 where t1.groupid="+sceneId+") t7 where rn>0 and rn<=10"
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
groupId = results[0][1]
neoid = results[0][2]
modeIdList = results[0][3]
gutneoid = results[0][4]
rncid = results[0][10]
nename = results[0][12]
#从当前页面抓取数据为了比较
allLi = self.driver.find_elements_by_tag_name('li')
#groupId_=allLi[0].text.split('-')[1]
#print groupId
#print groupId_
#if(groupId==groupId_):
#print "the groupId is same"
#else :
#raise Exception("the groupID is not same ")
gutneoid_=allLi[1].text.split('_')[1]
print gutneoid
print gutneoid_
if(gutneoid==gutneoid_):
print "the gutneoid is same"
else :
raise Exception("the gutneoid is not same ")
#modeidlist_=allLi[2].text.split('_')[1]
#print modeIdList
#print modeidlist_
#if(modeIdList==modeidlist_):
#print "the modeidlist is same"
#else :
#raise Exception("the modeidlist is not same ")
nename_=allLi[3].text.split('_')[1]
print nename
print nename_
if(nename_==nename):
print "the nename is same"
else :
raise Exception("the nename is not same ")
neoid_ = allLi[4].text.split('_')[1]
print neoid
print neoid_
if(neoid==neoid_):
print "the neoid is same"
else :
raise Exception("the neoid is not same ")
rncid_ = allLi[6].text.split('_')[1]
print rncid
print rncid_
if(rncid !=rncid_):
raise Exception("the rncid is not same ")
oratool.close()
#删除场景
def deleteScene(self,dbpath,url):
#跳转到sceneManagementTest.html页面
print "into method"
self.driver.get(url)
time.sleep(1)
#得到第一个场景的name和id
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
strlist = contentStr.split(',')
sceneId = strlist[0]
sceneName=strlist[1]
#点击“删除”
self.driver.find_element_by_id("delete").click()
#抓取删除响应的结果
print "start find_elements_by_tag_name"
pElement = self.driver.find_elements_by_tag_name('p')[0]
print "finish find_element_by_tag_name"
result = pElement.text
print result
#根据响应的结果与数据库记录比较
sql = "select count(*) as countNum from GUTLCSP_RM4X.sceneinfo where groupid = " +sceneId
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
print results
countNum = results[0][0]
print "countNum > 0 , scene is not be deleted"
if((result=="succes")&(countNum==0)):
print "delete scene success"
if((result=="forbidden")&(countNum!=0)):
print "can't delete scene because there are monitorTasks are running"
if(result=="fail"):
raise Exception("delete scene fail")
oratool.close()
#查询监控页面监控任务的名称与状态
def queryMonitorTaskInfo(self,dbpath,url):
#抓取网页的数据
self.driver.get(url)
time.sleep(5)
print "start find_elements_by_tag_name"
firstLi = self.driver.find_elements_by_tag_name('li')[0]
print "finish find_element_by_tag_name"
contentStr = firstLi.text
print contentStr
#根据监控任务Id和name跳转到趋势图界面
strlist = contentStr.split(',') # 用逗号分割str字符串,并保存到列表
id=strlist[0]
taskName=strlist[1]
#触发"--停止--"按钮
self.driver.find_element_by_id("stop").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
oratool = oracle.oracle()
oratool.connect('system','oracle',dbpath)
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为停止
if(stateFromDB!=2):
raise Exception("the state is :"+str(stateFromDB)+" so stop operation error ")
#触发"--暂停--"按钮
self.driver.find_element_by_id("pause").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为暂停
if(stateFromDB!=1):
raise Exception("the state is :"+str(stateFromDB)+" so pause operation error ")
#触发"--开始--"按钮
self.driver.find_element_by_id("start").click()
#从数据库查询该监控任务的状态
sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
results = oratool.queryWithNoPara(sql)
stateFromDB = results[0][2]
#判断是否当前监控任务的状态是否为开始
if(stateFromDB!=0):
raise Exception("the state is :"+str(stateFromDB)+" so start operation error ")
oratool.close()
self.driver.back()
def verifySiteNumInMap(self,url,dbpath,monitorTaskID,product):
temp1 = self.querySiteNum(dbpath,monitorTaskID,product)
temp2 = self.getSiteNumInPage(url)
#print temp1
#print temp2
if(temp1!=temp2):
raise Exception("site number is not equal!")
self.driver.back()
return True
def verifyNetworkSummary3G(self,url,dbpath):
temp1 = self.queryNetworkSummary3G(dbpath)
temp2 = self.getNetworkSummary3G(url)
print temp1
print temp2
if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
raise Exception("not equal")
self.driver.back()
return True
p = pm()
p.open2Browser("http://10.62.45.141:21180")
p.getMainPage("admin")
t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
#t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneMonitorTaskAutoTest.html"
#p.checkScene("10.62.45.169/CSP",t)
#p.queryMonitorTaskInfo("10.62.45.169/CSP",t)
p.deleteScene("10.62.45.169/CSP",t)
相关推荐
python3.7_python3.8_python3.9_python3.10对应的dlib安装包.whl.zippython3.7_python3.8_python3.9_python3.10对应的dlib安装包.whl.zippython3.7_python3.8_python3.9_python3.10对应的dlib安装包.whl.zippython...
Python零基础入门到精通Python零基础入门到精通学习教程--Python零基础入门到精通Python零基础入门到精通学习教程--Python零基础入门到精通Python零基础入门到精通学习教程--Python零基础入门到精通Python零基础入门...
python管理系统(python+mysql)代码.zipPython管理系统(python+mysql)代码.zipPython管理系统(python+mysql)代码.zipPython管理系统(python+mysql)代码.zipPython管理系统(python+mysql)代码.zipPython管理...
Python 根据用户输入编码批量生成EAN-13条形码 Python源码Python 根据用户输入编码批量生成EAN-13条形码 Python源码Python 根据用户输入编码批量生成EAN-13条形码 Python源码Python 根据用户输入编码批量生成EAN-13...
Python 如何批量替换Word文档中的指定内容(包括页眉)Python源码Python 如何批量替换Word文档中的指定内容(包括页眉)Python源码Python 如何批量替换Word文档中的指定内容(包括页眉)Python源码Python 如何批量...
Python 如何批量提取Word文档的页码(并计算总页码)Python源码Python 如何批量提取Word文档的页码(并计算总页码)Python源码Python 如何批量提取Word文档的页码(并计算总页码)Python源码Python 如何批量提取Word...
Linux下 源码安装 Python-2.7.18,解压Python-2.7.18.zip后 [root@RedHatEnterpriseLinux9 ~]# tar -zxvf Python-2.7.18.tgz [root@RedHatEnterpriseLinux9 ~]# cd Python-2.7.18 [root@RedHatEnterpriseLinux9 ~]#...
150套Python实战操作源码,其中包含了12章节(详见文件目录): 01python核心基础应用(13套) 02python字符串处理(9套) 03python文件操作(9套) 04pythonGUl界面开发(13套) 05python图形图像与多媒体(15套) ...
PythoPython 禁止窗体显示最大化按钮及调整窗体大小 Python源码Python 禁止窗体显示最大化按钮及调整窗体大小 Python源码n 禁止窗体显示最大化按钮及调整窗体大小 Python源码Python 禁止窗体显示最大化按钮及调整...
Python基础入门知识教程 Python是一个功能强大且广泛应用的高级编程语言。了解Python的发展历史、优缺点、应用场景、数据类型转换、编写第一个Python程序、注释的引入、中文支持、输入变量、类型标示符、关键字、...
source insight python Python.CLF 语言包 SourceInsight作如下配置: (1)选择Options > Preferences,单击Languages选项; (2)单击import按钮,装载并导入Python.CLF; (3)这时可以看到,左栏语言列表多...
Java到Python的转换工具,如标题“java2python”所示,是编程领域中的一种实用技术,旨在帮助开发者将已有的Java代码转换为Python语言。这种转换对于那些熟悉Java但希望进入Python生态系统,或者想要利用Python特定...
Python 解决调用Word2007时出现“尚未调用Colnitialize”错误 Python源码Python 解决调用Word2007时出现“尚未调用Colnitialize”错误 Python源码Python 解决调用Word2007时出现“尚未调用Colnitialize”错误 Python...
Python是一种高级编程语言,以其简洁明了的语法和丰富的标准库而受到广泛欢迎。"Python八股文"可能指的是对Python基础知识的一种系统性总结。在Python中,有以下几个关键概念值得深入探讨: 1. **内置电池...
首先,要实现C++调用Python,通常会用到Python的`Python.h`头文件,它是Python的C API,允许C/C++代码与Python解释器进行交互。但是,这种常规方法要求目标系统已经安装了Python环境。为了解决这个问题,项目可能...
Python聊天机器人,智能问答系统 Python聊天机器人,智能问答系统 Python聊天机器人,智能问答系统 Python聊天机器人,智能问答系统 Python聊天机器人,智能问答系统 Python聊天机器人,智能问答系统 Python...
Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋Python实战游戏源码- 中国象棋...
python计算机二级题库(附带答案)python计算机二级题库(附带答案)python计算机二级题库(附带答案)python计算机二级题库(附带答案)python计算机二级题库(附带答案)python计算机二级题库(附带答案)python...
python开发基于Django的投票系统源代码。python开发基于Django的投票系统源代码python开发基于Django的投票系统源代码python开发基于Django的投票系统源代码python开发基于Django的投票系统源代码python开发基于...