`

python

 
阅读更多
#coding=utf-8
'''
Created on 2015-12-24

@author: Administrator
'''
from selenium.webdriver.common.action_chains import ActionChains
from keywords import keywords
from utils import csvread
from utils import ftp
from utils import MML
from cspAutoTest.utils import oracle
import time
class pm(keywords.commonkeywords):
    #运行mml命令
    def run_mml(self,emsip,startTime,endTime,indexid='300248'):
        mml = MML.MML()
        #mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="2016-1-4 12:00:00",ENDDATE="2016-1-4 13:00:00",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
        mmlstr='QUERY PMDATA:NETYPEID="UROP.RNC-MO",MEASOBJTYPEID="wPm.UtranCell",QUERYITEMLIST="INDEX"-"300248"--,MEASOBJMOI="urop:OMMOID=ih48vwn2-2@sbn=233@me=233"-,STARTDATE="'+startTime+'",ENDDATE="'+endTime+'",QUERYGRAN=HOURSUM,FILETYPE=CSV;'
        tmp = mml.login(emsip, '21123', 'admin', '')
        if tmp == False:
            raise AssertionError("The mml login failed")
            return
        filename = mml.execute_mml(mmlstr)
        mml.logout()
        print filename
        return filename
       
    #从ems下载csv到本地
    def download_csvfile(self,emsip,filename,indexid='300248'):      
        ftp.downloadfile(emsip,filename)
   
    #解析csv获取指标值
    def read_index(self):
        tmp = csvread.readcsv()
        return tmp
   
    #新建性能监控项
    def newMonitorItem(self,monitorName,indexid):
        self.driver.switch_to_frame("page-mainIframesystem")
        #单击性能监控项列表
        self.driver.implicitly_wait(2)
        self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
        #单击新建
        self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[1]/a').click()
        #输入监控项名称
        self.driver.find_element_by_xpath('/html/body/div/div/div[1]/input').send_keys(monitorName)
        time.sleep(3)
        #过滤指标
        self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/div[1]/div[2]/label/input').send_keys(indexid)
        self.driver.implicitly_wait(2)
        #选中关联指标
        table=self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[1]/table')
        self.driver.implicitly_wait(2)
        tbody=table.find_element_by_xpath("/html/body/div/div/div[2]/div/div/div/div[1]/table/tbody")
        self.driver.implicitly_wait(2)
        trs=tbody.find_elements_by_tag_name('tr')
        for tr in trs:
            self.driver.implicitly_wait(2)
            tds = tr.find_elements_by_tag_name("td")
            if tds[1].text==indexid:
                tds[0].find_elements_by_tag_name("input")[0].click()
                break
        #self.driver.find_element_by_id(indexid).click()
        time.sleep(3)
        #确认提交
        self.driver.find_element_by_xpath('/html/body/div/div/div[2]/div/div/div/div[2]/button[1]').click()
        #确认
        time.sleep(3)
        self.driver.switch_to_alert().accept()
   
    #确认监控项是否已入库
    def confirmMonitorItem(self,monitorName,indexid,dbpath):
        sql='select t.indexid from GUTLCSP_RM4X.INDEXBASEINFO t where t.indexname=:indexname'
        para={'indexname':monitorName}
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        result = oratool.query(sql,para)
        print result[0][0]

        sql2='select * from CSPPM.PM_SUPERVIEW_ITEM_TABLE t where t.indexid=:indexid and t.privateindexid=:privateindexid'
        para2={'indexid':result[0][0],'privateindexid': indexid }
        result = oratool.query(sql2,para2)
        oratool.close()
        return len(result)>0
   
    #查看监控项详细信息
    def scanMonitorInfo(self,monitorName,product):
        self.driver.switch_to_frame("page-mainIframesystem")
        #单击性能监控项列表
        self.driver.implicitly_wait(2)
        self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
        #获取性能监控项列表table
        self.driver.implicitly_wait(2)
        tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
        self.driver.implicitly_wait(2)
        trs = tbody.find_elements_by_tag_name("tr")
        for i in range(len(trs)):
            tds = trs[i].find_elements_by_tag_name("td")
            print tds[1].text
            if tds[1].text==monitorName:
                tds[1].find_elements_by_tag_name("a")[0].click()
                div=self.driver.find_element_by_xpath('/html/body/div[3]/div[2]')
                lis=div.find_elements_by_tag_name("li")
                for i in range(len(lis)):
                    if lis[i].text==product:
                        lis[i].find_elements_by_tag_name("a")[0].click()
                        break
        #获取tag关闭按钮
        self.driver.implicitly_wait(2)
        closetag=self.driver.find_element_by_xpath('/html/body/div[3]/div[1]/div/a')
        self.driver.implicitly_wait(2)
        table=self.driver.find_element_by_id('table-3')
        tbody=table.find_elements_by_tag_name('tbody')
        trs=tbody[0].find_elements_by_tag_name('tr')
        #指标名称
        tds=trs[0].find_elements_by_tag_name('td')
        indexname=tds[1].text
        #指标类型
        tds=trs[1].find_elements_by_tag_name('td')
        mocname=tds[1].text
        #严重阈值
        tds=trs[2].find_elements_by_tag_name('td')
        seriousThreshold=tds[1].text
        #重要阈值
        tds=trs[3].find_elements_by_tag_name('td')
        majorThreshold=tds[1].text
        #公式
        tds=trs[4].find_elements_by_tag_name('td')
        formula=tds[1].text
        #关闭tag页
        closetag.click()
        self.driver.switch_to_window(self.mainPage)
        self.driver.implicitly_wait(4)
        about = self.driver.find_element_by_xpath('/html/body/div[1]/div/div/ul/li[2]/a')
        about.click()
        #self.driver.implicitly_wait(4)
        #ActionChains(self.driver).move_to_element(about).perform()
        return (indexname,mocname,seriousThreshold,majorThreshold,formula)
   
    #查询监控项详细信息
    def queryMonitorInfo(self,monitorName,indexid,dbpath):
        sql='select t1.indexname,t2.mocname,t1.criticalmax,t1.majormax,t1.indexcount from CSPPM.PM_INDEX_TABLE t1,CSPPM.PM_MOTYPE_TABLE t2 where t1.mocid=t2.mocid and t1.indexid=:indexid'
        para={'indexid':indexid}
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        result = oratool.query(sql,para)
        #print result[0]
        oratool.close()
        return self.decodegb2312(result[0])  


    #删除监控项
    def deleteMonitorItem(self,monitorName):
        self.driver.switch_to_frame("page-mainIframesystem")
        #单击性能监控项列表
        self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/p/span').click()
        #获取性能监控项列表table
        tbody=self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/div/div/div/div/table/tbody')
        trs = tbody.find_elements_by_tag_name("tr")
        for i in range(len(trs)):
            tds = trs[i].find_elements_by_tag_name("td")
            print tds[1].text
            if tds[1].text==monitorName:
                tds[0].find_elements_by_tag_name("input")[0].click()
                time.sleep(3)
                #确认删除提交
                self.driver.find_element_by_xpath('/html/body/div[1]/div[5]/div/ul/li[2]/a').click()
                #确认
                time.sleep(3)
                self.driver.switch_to_alert().accept()
                break
       
    def getNetworkSummary2G(self,url):
        self.driver.get(url)
        time.sleep(5)
        rncNum2g = self.driver.find_element_by_id("rncNum2g")
        remoteSpNum2g = self.driver.find_element_by_id("remoteSpNum2g")
        alarmLinkNum2g = self.driver.find_element_by_id("alarmLinkNum2g")
        return (int(rncNum2g.text),int(remoteSpNum2g.text),int(alarmLinkNum2g.text))
   
    def getNetworkSummary3G(self,url):
        self.driver.get(url)
        time.sleep(5)
        rncNum3g = self.driver.find_element_by_id("rncNum3g")
        remoteSpNum3g = self.driver.find_element_by_id("remoteSpNum3g")
        alarmLinkNum3g = self.driver.find_element_by_id("alarmLinkNum3g")
        return (int(rncNum3g.text),int(remoteSpNum3g.text),int(alarmLinkNum3g.text))
   
    def queryNetworkSummary2G(self,dbpath):
        sql1="select t1.neid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t2 where t2.sgsncnid is not null union select t3.neid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
        sql21="select t1.oid from GUTLCSP_RM4X.GV3_BSC_OFFICESEL t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.GV4_BSC_OFFICESEL t2 where t2.dpc is not null"
        sql22="select t3.oid from GUTLCSP_RM4X.GV3_BSC_SGSNSEL t3 where t3.sgsncnid is not null union select t4.oid from GUTLCSP_RM4X.GV4_BSC_SGSNSEL t4 where t4.sgsncnid is not null"
        sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'gv3'"
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        rncNum2g = oratool.queryWithNoPara(sql1)
        remoteSpNum2g1 = oratool.queryWithNoPara(sql21)
        remoteSpNum2g2 = oratool.queryWithNoPara(sql22)
        alarmLink = oratool.queryWithNoPara(sql3)
        return (len(rncNum2g),len(remoteSpNum2g1)+len(remoteSpNum2g2),len(alarmLink))
        oratool.close()
       
    def queryNetworkSummary3G(self,dbpath):
        sql1="select t1.neid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.neid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.neid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.neid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
        sql2="select t1.oid from GUTLCSP_RM4X.WCDMA_IUCSLINK t1 where t1.dpc is not null union select t2.oid from GUTLCSP_RM4X.WCDMA_IUPSLINK t2 where t2.dpc is not null union select t3.oid from GUTLCSP_RM4X.WV4_IUCSLINK t3 where t3.dpc is not null union select t4.oid from GUTLCSP_RM4X.WV4_IUPSLINK t4 where t4.dpc is not null"
        sql3="select distinct t.linkoid from CSPFM.CSPALARM t where t.alarmtype = 10004100 and t.linkoid is not null and t.dispproduct = 'wcdma'"
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        rncNum3g = oratool.queryWithNoPara(sql1)
        remoteSpNum3g = oratool.queryWithNoPara(sql2)
        alarmLink = oratool.queryWithNoPara(sql3)
        return (len(rncNum3g),len(remoteSpNum3g),len(alarmLink))
        oratool.close()
  
    def verifyNetworkSummary2G(self,url,dbpath):
        temp1 = self.queryNetworkSummary2G(dbpath)
        temp2 = self.getNetworkSummary2G(url)
        print temp1
        print temp2
        if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
            raise Exception("not equal")
        self.driver.back()
        return True
   
    def querySiteNum(self,dbpath,monitorTaskID,product):
        if(product.lower()=="all"):
            product=''
        sql = 'select t.* from GUTLCSP_RM4X.SCENENE t,GUTLCSP_RM4X.MONITORTASK t1 where t.groupid=t1.groupid and t1.monitorid=:monitorTaskID and t.modeidlist like '+"'%"+product+"%'"
        para={'monitorTaskID':monitorTaskID}
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        result = oratool.query(sql,para)
        #print result[0]
        oratool.close()
        return len(result)
   
    def getSiteNumInPage(self,url):
        self.driver.get(url)
        time.sleep(5)
        sceneSiteNum = self.driver.find_element_by_id("sceneSiteNum")
        return int(sceneSiteNum.text)


        #场景管理 --> 查看
   #url = "http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
    def checkScene(self,dbpath,url):
        #跳转到sceneManagementTest.html页面
        print "into method"
        self.driver.get(url)
        time.sleep(1)
        #得到第一个场景的name和id
        print "start find_elements_by_tag_name"
        firstLi = self.driver.find_elements_by_tag_name('li')[0]
        print "finish find_element_by_tag_name"
        contentStr = firstLi.text
        print contentStr
        strlist = contentStr.split(',')
        sceneId = strlist[0]
        sceneName=strlist[1]
        #点击“查看”
        self.driver.find_element_by_id("tagA").click()
        #从数据库中查询当前场景的网元信息
        sql = "with t1 as (select ne.*,t2.rncid,t2.product,t2.displayname,t2.isbs,t2.modename,t6.area from GUTLCSP_RM4X.scenene ne left join (select me.oid,me.rncid,me.product,me.displayname,me.isbs,me.modename from GUTLCSP_RM4X.sdrmgr_nodeme me  union select v3.oid,v3.bssfunctionid as rncid,v3.product,v3.displayname,v3.isbs,v3.modename from GUTLCSP_RM4X.gv3_bts v3  union select v4.oid,v4.bssfunctionid as rncid,v4.product,v4.displayname,v4.isbs,v4.modename from GUTLCSP_RM4X.gv4_bts v4 union select w.oid,'---' as rncid,w.product,w.displayname,'false' as isbs,w.modename from GUTLCSP_RM4X.wcdma_me w union select bsc.oid,'---' as rncid,bsc.product,bsc.displayname,'false' as isbs,bsc.modename from GUTLCSP_RM4X.gv3_bsc bsc union select rnc.oid,'---' as rncid,rnc.product,rnc.displayname,'false' as isbs,rnc.modename from GUTLCSP_RM4X.urop_rncmanagedelement rnc)t2 on t2.oid=ne.neoid left join (select t4.oid,t5.name as area from uep4x.res_treenode_table t4 left join GUTLCSP_RM4X.RES_GROUP t5 on t4.parentoid=t5.oid where t5.name is not null) t6 on t6.oid=ne.neoid where 1=1) select t7.* from ( select row_number() over(order by rncid asc) rn,t1.* from t1 where t1.groupid="+sceneId+") t7 where rn>0 and rn<=10"
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        results = oratool.queryWithNoPara(sql)
        groupId = results[0][1]
        neoid = results[0][2]
        modeIdList = results[0][3]
        gutneoid = results[0][4]
        rncid = results[0][10]
        nename = results[0][12]
        #从当前页面抓取数据为了比较
        allLi = self.driver.find_elements_by_tag_name('li')
        #groupId_=allLi[0].text.split('-')[1]
        #print groupId
        #print groupId_
        #if(groupId==groupId_):
            #print "the groupId is same"
        #else :
            #raise Exception("the groupID is not same ")

        gutneoid_=allLi[1].text.split('_')[1]
        print gutneoid
        print gutneoid_
        if(gutneoid==gutneoid_):
            print "the gutneoid is same"
        else :
            raise Exception("the gutneoid is not same ")

        #modeidlist_=allLi[2].text.split('_')[1]
        #print modeIdList
        #print modeidlist_
        #if(modeIdList==modeidlist_):
            #print "the modeidlist is same"
        #else :
            #raise Exception("the modeidlist is not same ")


        nename_=allLi[3].text.split('_')[1]
        print nename
        print nename_
        if(nename_==nename):
           print "the nename is same"
        else :
            raise Exception("the nename is not same ")


        neoid_ = allLi[4].text.split('_')[1]
        print neoid
        print neoid_
        if(neoid==neoid_):
            print "the neoid is same"
        else :
            raise Exception("the neoid is not same ")


        rncid_ = allLi[6].text.split('_')[1]
        print rncid
        print rncid_
        if(rncid !=rncid_):
            raise Exception("the rncid  is not same ")

        oratool.close()


        #删除场景
    def deleteScene(self,dbpath,url):
        #跳转到sceneManagementTest.html页面
        print "into method"
        self.driver.get(url)
        time.sleep(1)
        #得到第一个场景的name和id
        print "start find_elements_by_tag_name"
        firstLi = self.driver.find_elements_by_tag_name('li')[0]
        print "finish find_element_by_tag_name"
        contentStr = firstLi.text
        print contentStr
        strlist = contentStr.split(',')
        sceneId = strlist[0]
        sceneName=strlist[1]
        #点击“删除”
        self.driver.find_element_by_id("delete").click()
        #抓取删除响应的结果
        print "start find_elements_by_tag_name"
        pElement = self.driver.find_elements_by_tag_name('p')[0]
        print "finish find_element_by_tag_name"
        result = pElement.text
        print result
        #根据响应的结果与数据库记录比较
        sql = "select count(*) as countNum from  GUTLCSP_RM4X.sceneinfo where groupid = " +sceneId
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        results = oratool.queryWithNoPara(sql)
        print results
        countNum = results[0][0]
        print "countNum > 0 , scene is not be deleted"
        if((result=="succes")&(countNum==0)):
            print "delete scene  success"
        if((result=="forbidden")&(countNum!=0)):
            print "can't  delete scene because there are monitorTasks are running"
        if(result=="fail"):
            raise Exception("delete scene fail")
        oratool.close()


       

        #查询监控页面监控任务的名称与状态
    def queryMonitorTaskInfo(self,dbpath,url):
        #抓取网页的数据
        self.driver.get(url)
        time.sleep(5)
        print "start find_elements_by_tag_name"
        firstLi = self.driver.find_elements_by_tag_name('li')[0]
        print "finish find_element_by_tag_name"
        contentStr = firstLi.text
        print contentStr
        #根据监控任务Id和name跳转到趋势图界面
        strlist = contentStr.split(',') # 用逗号分割str字符串,并保存到列表
        id=strlist[0]
        taskName=strlist[1]
        #触发"--停止--"按钮
        self.driver.find_element_by_id("stop").click()
        #从数据库查询该监控任务的状态
        sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
        oratool = oracle.oracle()
        oratool.connect('system','oracle',dbpath)
        results = oratool.queryWithNoPara(sql)
        stateFromDB = results[0][2]
        #判断是否当前监控任务的状态是否为停止
        if(stateFromDB!=2):
           raise Exception("the state is :"+str(stateFromDB)+"  so stop operation error ")
        #触发"--暂停--"按钮
        self.driver.find_element_by_id("pause").click()  
        #从数据库查询该监控任务的状态
        sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
        results = oratool.queryWithNoPara(sql)
        stateFromDB = results[0][2]
        #判断是否当前监控任务的状态是否为暂停
        if(stateFromDB!=1):
           raise Exception("the state is :"+str(stateFromDB)+"  so pause operation error ")
        #触发"--开始--"按钮
        self.driver.find_element_by_id("start").click()
        #从数据库查询该监控任务的状态
        sql =' select t.monitorid,t.monitortaskname,t.taskstate from GUTLCSP_RM4X.monitortask t where monitorid = '+id
        results = oratool.queryWithNoPara(sql)
        stateFromDB = results[0][2]
        #判断是否当前监控任务的状态是否为开始
        if(stateFromDB!=0):
           raise Exception("the state is :"+str(stateFromDB)+"  so start operation error ")
        oratool.close()
        self.driver.back()

   
    def verifySiteNumInMap(self,url,dbpath,monitorTaskID,product):
        temp1 = self.querySiteNum(dbpath,monitorTaskID,product)
        temp2 = self.getSiteNumInPage(url)
        #print temp1
        #print temp2
        if(temp1!=temp2):
            raise Exception("site number is not equal!")
        self.driver.back()
        return True
   
    def verifyNetworkSummary3G(self,url,dbpath):
        temp1 = self.queryNetworkSummary3G(dbpath)
        temp2 = self.getNetworkSummary3G(url)
        print temp1
        print temp2
        if(temp1[0]!=temp2[0] or temp1[1]!=temp2[1] or temp1[2]!=temp2[2]):
            raise Exception("not equal")
        self.driver.back()
        return True



p = pm()
p.open2Browser("http://10.62.45.141:21180")
p.getMainPage("admin")
t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneManagementTest.html"
#t="http://10.62.45.141:21180/web/res/web-csp-scene/test/sceneMonitorTaskAutoTest.html"


#p.checkScene("10.62.45.169/CSP",t)


#p.queryMonitorTaskInfo("10.62.45.169/CSP",t)


p.deleteScene("10.62.45.169/CSP",t)
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics