`
edison0951
  • 浏览: 71732 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

用PYTHON多线程处理Sphinx遇到的问题

阅读更多
  关于Sphinx的介绍,请移步到http://www.iteye.com/topic/122696这阅读
  PYTHON对多线程的支持相当好,虽然很多人诟病他的GIL,首先来说说threading,它是基于thread模块的,它有两种方式,前一种方式如下。
 
  def myfunction(args):
    do_someting
  
  def main(num):
    threads = []
    # 创建一个锁
    mutex = threading.Lock()
    # 创建线程对象
    for x in range(0, num):
        threads.append(threading.Thread(target=myfunction, args=(,)))
    # 启动所有线程
    for t in threads:
        t.start()
    # 等待所有子线程退出
    for t in threads:
        t.join() 
  if __name__ == "__main__":
    main(10)
  

  当然了,我自己更偏爱与第二种方式,他更像JAVA多线程的写法。
 
  class Mythread(threading.Thread):
    #
    def __init__(self,parameter):
      self.parameter = parameter
    def run(self):
      #加锁
      mutex.acquire()
      do_something
      #解锁
      mutex.release()
  def main(num):
    global mutex
    threads = []
    mutex = threading.Lock()
    for r in range(0,num):
      threads.append(Mythread(args))
    for t in threads:
     #设置为TRUE,主线程结束,子线程强制结束
      t.setDaemon(True)
      t.start()
    for t in threads:
      t.join()    
  

  说完基本知识之后,来说说我的程序了,程序的主要功能如下流程图所示,

  详解:
    PYTHON脚本从数据库获取文本,然后交给Sphinx查询获取该词在数据库中的数量,图中没有提到的一个小细节是获取的文本需要用PYTHON来构建KEYWORD,所以源代码中的函数complex_words就是干这个事的,由于数据库的数据量特别大,最多的时候可以达到上千万条,测试的结果是用1000条数据,单进程花了56分钟,太恐怖了,最后只好用多线程来实现,由于数据量特别多,我能够想到的办法就是用QUEUE来装数据,一边装,一边取(无敌的FIFO,这时候才发现数据结构不是徒有虚名的)。
    QUEUE很简单,申明一个QUEUE()对象,然后用PUT方法装入数据,然后线程再用GET方法去取。本以为所有问题已经可以解决了,但是新的问题又来了,当线程数大于20个的时候,PYTHON会提示一个“CONNECT RESET BY PEER”,而出错的数据肯定又必须重新装进QUEUE,麻烦来了,线程开的越多,出错的数据就越多,重新装入队列的数据就越多,当线程为50是,已经快成死循环了,PYTHON提供GIL机制,即有PYTHON来控制一个全局锁,而不是如JAVA般全有程序员控制。无赖之下,只好把线程设置为2个,发现所耗费的时间和10个线程相差无几。下面是我的源代码。
 
# -*- encoding:utf8 -*-
import re
import MySQLdb
import sys
import threading
from datetime import datetime
from Queue import Queue
from sphinxapi import *
#部分法语介词列表
spacewords = ['\xc3\xa0', 'de', 'de la', 'du', 'des', 'le', 'la', 'les', 'ce', 'ci', 'car', 
        'avec', 'bon', 'bonne', 'bien', 'dans', 'en', 'un', 'que', 'qui','pour', 'chacun',
        'entre', 'il faut', 'jamais', 'pas', 'pas de', 'quand','ou','ch','et','sa','par','a','A']
#索引列表
indexs = ['wakazaka_fr_ebay','wakazaka_fr_kelkoo','wakazaka_fr_article','wakazaka_fr_priceminister']
host = ''
name = ''
password =''
db = 'wakazaka_fr_innodb'
#Sphinx端口
port = 9312
#匹配模式
mode = SPH_MATCH_PHRASE
#索引
INDEXS= "wakazaka_fr;wakazaka_fr_ebay;wakazaka_fr_kelkoo;wakazaka_fr_article;wakazaka_fr_priceminister"


class Mythread(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        global mutex,sc
        while True:
        #获取线程的名字
        #name = threading.currentThread().getName()
            item = self.queue.get()
            if self.queue.empty():
              break
            else:
              try:
                process(item[0],item[1],sc,mutex)
              except:
                self.queue.put(item)
              self.queue.task_done()
        print 'Program is over'
        

#构建KEYWORD,不能含有介词和非数字和字母,KEYWORD的长度不能大于3
def complex_words(title):
    li = title.split(' ')
    words = []
    i = 0
    #print li
    length = len(li)
    count = length
    while count > 1:
        j = i + 1
        #print 'j',j
        if j + 1 < length:
            if re.search('\W',li[i]) or li[i] in spacewords:
                i = j
                count -= 1
                continue
            elif re.search('\W',li[j]) or li[j] in spacewords:
                i = j + 1
                count -= 2
                continue
            elif li[i] not in spacewords and li[j] not in spacewords:
                tmp = li[i] + ' ' + li[j]
                words.append(tmp)
                #print '1',words
            if re.search('\W',li[j+1]) or li[j+1] in spacewords:
                i = j + 2
                count -= 3
                continue
            elif li[j+1] not in spacewords:
                words.append(tmp + ' ' + li[j+1])
            i += 1
            count -= 1
            #print '2',words
        else:
            if re.search('\W',li[i]) or li[i] in spacewords:
                count -= 1
                break

            elif re.search('\W',li[j]) or li[j] in spacewords:
                count -= 1
                break
            elif li[i] not in spacewords and li[j] not in spacewords:
                words.append(li[i] + ' ' + li[j])
            count -= 1
            i += 1
            #print '3',words
    #print words
    return words
#获取文本
def get_item(i,j):
    conn = MySQLdb.connect(host,name,password,db)
    if conn:
        
        #print "Connect to the database"
        conn.set_character_set("utf8")
        cursor = conn.cursor()
        strsql = "SELECT title,category_id FROM articles ORDER BY articles.id ASC LIMIT %s,%s"%(i,j)
        #print strsql
        cursor.execute(strsql)
        data = cursor.fetchall()
        conn.close()
        return data
#插入数据
def write_item(keyword,cid,length,hits):
    conn = MySQLdb.connect(host,name,password,db)
    if conn:
        #print "Connect to the database"
        conn.set_character_set("utf8")
        cursor = conn.cursor()
        strsql = "INSERT INTO test_new (keyword,category_id,length,hits) VALUES ('%s','%s','%s','%s')\
            "%(keyword,cid,length,hits)
        print strsql
        cursor.execute(strsql)
        conn.close()
#判断数据是否重复
def check(keyword,cid):
    conn = MySQLdb.connect(host,name,password,db)
    if conn:
        #print "Connect to the database"
        conn.set_character_set("utf8")
        cursor = conn.cursor()
        strsql = "SELECT id FROM test_new WHERE keyword = '%s' AND category_id = '%s'"%(keyword,cid)
        #print strsql
        cursor.execute(strsql)
        if cursor.rowcount == 0:
            conn.close()
            return None
        else:
            conn.close()
            return cursor.fetchone()
        
#查询KEYWORD,返回其个数
def sphinx_result(keyword,sc):
    sc.SetServer(host,port)
    sc.SetMatchMode(mode)
    li = keyword.split(' ')
    length = len(li)
    sc.SetGroupBy('category_id',SPH_GROUPBY_ATTR,"@count desc")
    
    result = sc.Query(keyword,INDEXS)
    maxcount = 0
    mylist = []
    
    if result.has_key('matches'):
        for num,match in enumerate(result["matches"]):
            cid = match["attrs"]["category_id"]
            count = match["attrs"]["@count"]
            maxcount += count
            
            if num > 2:
                break
            mylist.append([keyword,cid,length,count])
    return mylist
#Sphinx的BuildKeywords方法会将输入的文本拆分,并得出在数据库中的个数
def single_words(title,sc,index):
   global mutex
   #这一段请查看Sphinx API,有详细说明
   sc.SetServer(host,port)
   sc.SetMatchMode(mode)
   mutex.acquire()
   results = sc.BuildKeywords(title,index,True)
   mutex.release()
   return results

def get_count(map,sc,title):
    for index in indexs:
        results = single_words(title,sc,index)
        for result in results:
            name = result['tokenized']
            hits = result['hits']
            for key in map.keys():
                if name == key:
                    map[name] = map[name] + hits
    return map

def process(title,cid,sc,mutex):
    map = {}
    dict = {}
    results1 = single_words(title,sc,'wakazaka_fr')
    for result in results1:
        name = result['tokenized']
        if re.search('\W+',name):
            continue
        elif name not in spacewords and len(name) >= 3:
            try:
                int(name)
            except:
                if check(name,cid):
                    continue
                else:
                    hits = result['hits']
                    map[name] = hits
                    dict = get_count(map,sc,title)
        if dict:
            for key in dict.keys():
                try:
                    mutex.acquire()
                    write_item(key.lower(),cid,1,dict[key])
                    mutex.release()
                except:
                    mutex.release()
                    print 'Exist'
                    continue
    
    results2 = complex_words(title)
    for result in results2:
        result = result.strip()
        if len(result) >= 3 and result not in spacewords: 
            if check(result,cid):
                continue
            else:
                li = sphinx_result(result,sc)
                for l in li:
                    try:
                        int(l[0])
                    except:
                        temp = l[0].split(' ')[0]
                        if len(temp) > 1:
                            try:
                                int(l[0])
                            except:
                                try:
                                    mutex.acquire()
                                    write_item(l[0].lower(),l[1],l[2],l[3])
                                    mutex.release()
                                except:
                                    mutex.release()
                                    print 'Exist'
                                    continue
                                  
def main():
  start_time = datetime.now()
  global mutex,sc
  threads = []
  q = Queue()
  mutex = threading.Lock()
  #建立Sphinx对象
  sc = SphinxClient()
  for r in range(0,50):
    threads.append(Mythread(q))
    
  for t in threads:
    t.setDaemon(True)
    t.start()
  i = 0
  j = 1000
  #获取数据,每次取1000条,直至取完为止
  items = get_item(i,j)
  temp = 1
  while len(items) > 0:
    for item in items:
        q.put(item)
    i = temp + j
    items = get_item(i,j)
    temp = i
    
    for t in threads:
        t.join()
    q.join()
    end_time = datetime.now()
    print '###TIME:',end_time - start_time

if __name__ == "__main__":
    main()
  

Threading模块:http://docs.python.org/library/threading.html
QUEUE模块http://docs.python.org/library/queue.html#module-Queue
Sphinxhttp://sphinxsearch.com/docs/
如果你想测试这些代码,首先你必须有MYSQL,MYSQLDB模块,以及Sphinx,还要有Sphinx建立的索引,附件是我的源代码
  最后一点,网络是我们最好的老师,有什么问题,只管GOOGLE,一切都可以解决
  • 大小: 37.5 KB
1
0
分享到:
评论

相关推荐

    多线程计时器

    在实际编程中,多线程计时器需要考虑线程安全、性能优化以及异常处理等问题。合理的多线程设计可以极大地提升用户体验,使程序更加高效且响应迅速。通过深入理解和熟练运用多线程技术,开发者能够创建出更加复杂和...

    Python-TaskBoardStudio使用Python3Pyqt5开发的任务看板

    6. **多线程**:为了保证用户界面的响应速度,TaskBoardStudio可能会使用多线程来处理耗时的操作,如数据同步、网络请求等。QThread类可以帮助我们在PyQt5中实现并发。 7. **网络通信**:如果TaskBoardStudio支持云...

    The Hacker's Guide to Python

    - **多线程注意事项**:提醒开发者在使用多线程时需要注意的问题,以避免常见的陷阱。 - **多进程与多线程对比**:比较了多进程和多线程两种并发模型的优缺点,并给出选择建议。 - **异步与事件驱动架构**:探讨了...

    python绩效管理系统-后端

    Python提供了多线程、多进程、异步I/O等机制来提高性能。例如,使用gevent或asyncio库实现异步编程,以并发处理多个请求。 6. 错误处理与日志记录: 系统应具备良好的错误处理机制,以确保稳定运行。Python的...

    python-guide python引言

    - **优化**:采用C扩展、多线程等方式提高执行效率。 **3.11 科学计算** - **库**:使用NumPy、SciPy进行数值计算。 - **可视化**:使用Matplotlib绘制图表。 **3.12 图像处理** - **库**:使用OpenCV、PIL进行...

    哔哩哔哩Python课程列表.pdf

    * Python 并发编程:多线程、多进程、协程、异步编程等 * Python 并发编程框架:threading、multiprocessing、asyncio 等 Python 网络编程 * Python 网络编程: socket 编程、TCP/IP 协议、网络协议等 * Python ...

    python-distilled-developers-library.rar

    4. **并发与异步编程**:Python的多线程和多进程模型,以及现代的asyncio库,用于实现高效的并发和异步操作,是Python开发中的重要部分,特别是对于网络编程和IO密集型应用。 5. **标准库的深度探索**:Python的...

    Python-PythonTips一些初学者到中级用户的Python技巧

    了解Python的性能瓶颈,如全局解释器锁(GIL),以及如何利用多线程、多进程或者异步I/O(如asyncio库)来提高程序的并发能力。 12. **代码规范**: 遵循PEP 8编码规范,使代码易于阅读和维护。了解代码审查和...

    python知识结构

    11. **并发与异步**:Python的threading和multiprocessing模块支持多线程和多进程,asyncio库提供了异步I/O编程的能力,以应对高并发场景。 12. **测试与调试**:Python有unittest、pytest等测试框架,用于编写单元...

    甜橙音乐网程序Python源代码

    7. **多线程/异步I/O**:Python的`threading`和`asyncio`模块可能被用来实现并发处理,提高程序响应速度,比如在播放音乐时同时处理用户请求。 8. **音乐播放功能**:Python的`pygame`库可能被用于处理音频播放,它...

    Python开发工程师料.zip

    7. **并发与多线程**:Python提供了threading模块来支持多线程,还有asyncio库用于异步编程,这在处理大量并发任务时非常重要。 8. **调试和测试**:掌握pdb调试器的使用,以及unittest测试框架,以确保代码的质量...

    the hacker guide to python

    1. **Python 版本选择 (Python versions)**:选择合适的Python版本是至关重要的,因为不同的版本具有不同的特性和兼容性问题。 2. **项目结构 (Project layout)**:一个清晰的项目结构可以提高开发效率,并使得维护...

    python编写好用程序

    3. **异常处理**:使用try-except语句进行错误处理,确保程序在遇到问题时能够优雅地处理,而不是突然崩溃。 4. **标准库**:Python拥有丰富的标准库,如os模块用于操作系统交互,sys模块用于系统相关操作,time...

    Python仪器管理预警系统源码.zip

    5. 多线程与并发:为了实现后台数据处理和用户交互的并行,系统可能使用Python的`threading`库来管理多个并发任务。 6. 日志管理:利用`logging`库记录系统运行日志,便于调试和问题排查。 7. Web接口:如果系统...

    一款基于Python开发的录屏软件源码.zip

    总的来说,这款基于Python的录屏软件源码涵盖了Python GUI编程、图像和视频处理、音频录制、多线程处理、文件I/O等多个核心知识点,是学习Python编程和多媒体处理的一个良好实践案例。通过研究这个项目,开发者可以...

    用于网络蜘蛛的Python ProxyPool.zip

    6. **多线程/异步处理**:为了提高效率,ProxyPool可能会采用多线程或异步IO技术处理请求。Python的`threading`库支持多线程,而`asyncio`库则提供了基于事件循环的异步编程模型。 7. **日志记录**:在开发过程中,...

    python谷粒教育学习资料.zip

    12. **并发和异步**:Python的多线程、多进程和asyncio模块提供了处理并发和异步操作的方式,以优化性能。 13. **测试和调试**:Python的unittest模块提供了单元测试框架,学会编写测试用例和调试代码能确保程序的...

Global site tag (gtag.js) - Google Analytics