关于Sphinx的介绍,请移步到
http://www.iteye.com/topic/122696这阅读
PYTHON对多线程的支持相当好,虽然很多人诟病他的GIL,首先来说说threading,它是基于thread模块的,它有两种方式,前一种方式如下。
def myfunction(args):
do_someting
def main(num):
threads = []
# 创建一个锁
mutex = threading.Lock()
# 创建线程对象
for x in range(0, num):
threads.append(threading.Thread(target=myfunction, args=(,)))
# 启动所有线程
for t in threads:
t.start()
# 等待所有子线程退出
for t in threads:
t.join()
if __name__ == "__main__":
main(10)
当然了,我自己更偏爱与第二种方式,他更像JAVA多线程的写法。
class Mythread(threading.Thread):
#
def __init__(self,parameter):
self.parameter = parameter
def run(self):
#加锁
mutex.acquire()
do_something
#解锁
mutex.release()
def main(num):
global mutex
threads = []
mutex = threading.Lock()
for r in range(0,num):
threads.append(Mythread(args))
for t in threads:
#设置为TRUE,主线程结束,子线程强制结束
t.setDaemon(True)
t.start()
for t in threads:
t.join()
说完基本知识之后,来说说我的程序了,程序的主要功能如下流程图所示,
详解:
PYTHON脚本从数据库获取文本,然后交给Sphinx查询获取该词在数据库中的数量,图中没有提到的一个小细节是获取的文本需要用PYTHON来构建KEYWORD,所以源代码中的函数complex_words就是干这个事的,由于数据库的数据量特别大,最多的时候可以达到上千万条,测试的结果是用1000条数据,单进程花了56分钟,太恐怖了,最后只好用多线程来实现,由于数据量特别多,我能够想到的办法就是用QUEUE来装数据,一边装,一边取(无敌的FIFO,这时候才发现数据结构不是徒有虚名的)。
QUEUE很简单,申明一个QUEUE()对象,然后用PUT方法装入数据,然后线程再用GET方法去取。本以为所有问题已经可以解决了,但是新的问题又来了,当线程数大于20个的时候,PYTHON会提示一个“CONNECT RESET BY PEER”,而出错的数据肯定又必须重新装进QUEUE,麻烦来了,线程开的越多,出错的数据就越多,重新装入队列的数据就越多,当线程为50是,已经快成死循环了,PYTHON提供GIL机制,即有PYTHON来控制一个全局锁,而不是如JAVA般全有程序员控制。无赖之下,只好把线程设置为2个,发现所耗费的时间和10个线程相差无几。下面是我的源代码。
# -*- encoding:utf8 -*-
import re
import MySQLdb
import sys
import threading
from datetime import datetime
from Queue import Queue
from sphinxapi import *
#部分法语介词列表
spacewords = ['\xc3\xa0', 'de', 'de la', 'du', 'des', 'le', 'la', 'les', 'ce', 'ci', 'car',
'avec', 'bon', 'bonne', 'bien', 'dans', 'en', 'un', 'que', 'qui','pour', 'chacun',
'entre', 'il faut', 'jamais', 'pas', 'pas de', 'quand','ou','ch','et','sa','par','a','A']
#索引列表
indexs = ['wakazaka_fr_ebay','wakazaka_fr_kelkoo','wakazaka_fr_article','wakazaka_fr_priceminister']
host = ''
name = ''
password =''
db = 'wakazaka_fr_innodb'
#Sphinx端口
port = 9312
#匹配模式
mode = SPH_MATCH_PHRASE
#索引
INDEXS= "wakazaka_fr;wakazaka_fr_ebay;wakazaka_fr_kelkoo;wakazaka_fr_article;wakazaka_fr_priceminister"
class Mythread(threading.Thread):
def __init__(self,queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
global mutex,sc
while True:
#获取线程的名字
#name = threading.currentThread().getName()
item = self.queue.get()
if self.queue.empty():
break
else:
try:
process(item[0],item[1],sc,mutex)
except:
self.queue.put(item)
self.queue.task_done()
print 'Program is over'
#构建KEYWORD,不能含有介词和非数字和字母,KEYWORD的长度不能大于3
def complex_words(title):
li = title.split(' ')
words = []
i = 0
#print li
length = len(li)
count = length
while count > 1:
j = i + 1
#print 'j',j
if j + 1 < length:
if re.search('\W',li[i]) or li[i] in spacewords:
i = j
count -= 1
continue
elif re.search('\W',li[j]) or li[j] in spacewords:
i = j + 1
count -= 2
continue
elif li[i] not in spacewords and li[j] not in spacewords:
tmp = li[i] + ' ' + li[j]
words.append(tmp)
#print '1',words
if re.search('\W',li[j+1]) or li[j+1] in spacewords:
i = j + 2
count -= 3
continue
elif li[j+1] not in spacewords:
words.append(tmp + ' ' + li[j+1])
i += 1
count -= 1
#print '2',words
else:
if re.search('\W',li[i]) or li[i] in spacewords:
count -= 1
break
elif re.search('\W',li[j]) or li[j] in spacewords:
count -= 1
break
elif li[i] not in spacewords and li[j] not in spacewords:
words.append(li[i] + ' ' + li[j])
count -= 1
i += 1
#print '3',words
#print words
return words
#获取文本
def get_item(i,j):
conn = MySQLdb.connect(host,name,password,db)
if conn:
#print "Connect to the database"
conn.set_character_set("utf8")
cursor = conn.cursor()
strsql = "SELECT title,category_id FROM articles ORDER BY articles.id ASC LIMIT %s,%s"%(i,j)
#print strsql
cursor.execute(strsql)
data = cursor.fetchall()
conn.close()
return data
#插入数据
def write_item(keyword,cid,length,hits):
conn = MySQLdb.connect(host,name,password,db)
if conn:
#print "Connect to the database"
conn.set_character_set("utf8")
cursor = conn.cursor()
strsql = "INSERT INTO test_new (keyword,category_id,length,hits) VALUES ('%s','%s','%s','%s')\
"%(keyword,cid,length,hits)
print strsql
cursor.execute(strsql)
conn.close()
#判断数据是否重复
def check(keyword,cid):
conn = MySQLdb.connect(host,name,password,db)
if conn:
#print "Connect to the database"
conn.set_character_set("utf8")
cursor = conn.cursor()
strsql = "SELECT id FROM test_new WHERE keyword = '%s' AND category_id = '%s'"%(keyword,cid)
#print strsql
cursor.execute(strsql)
if cursor.rowcount == 0:
conn.close()
return None
else:
conn.close()
return cursor.fetchone()
#查询KEYWORD,返回其个数
def sphinx_result(keyword,sc):
sc.SetServer(host,port)
sc.SetMatchMode(mode)
li = keyword.split(' ')
length = len(li)
sc.SetGroupBy('category_id',SPH_GROUPBY_ATTR,"@count desc")
result = sc.Query(keyword,INDEXS)
maxcount = 0
mylist = []
if result.has_key('matches'):
for num,match in enumerate(result["matches"]):
cid = match["attrs"]["category_id"]
count = match["attrs"]["@count"]
maxcount += count
if num > 2:
break
mylist.append([keyword,cid,length,count])
return mylist
#Sphinx的BuildKeywords方法会将输入的文本拆分,并得出在数据库中的个数
def single_words(title,sc,index):
global mutex
#这一段请查看Sphinx API,有详细说明
sc.SetServer(host,port)
sc.SetMatchMode(mode)
mutex.acquire()
results = sc.BuildKeywords(title,index,True)
mutex.release()
return results
def get_count(map,sc,title):
for index in indexs:
results = single_words(title,sc,index)
for result in results:
name = result['tokenized']
hits = result['hits']
for key in map.keys():
if name == key:
map[name] = map[name] + hits
return map
def process(title,cid,sc,mutex):
map = {}
dict = {}
results1 = single_words(title,sc,'wakazaka_fr')
for result in results1:
name = result['tokenized']
if re.search('\W+',name):
continue
elif name not in spacewords and len(name) >= 3:
try:
int(name)
except:
if check(name,cid):
continue
else:
hits = result['hits']
map[name] = hits
dict = get_count(map,sc,title)
if dict:
for key in dict.keys():
try:
mutex.acquire()
write_item(key.lower(),cid,1,dict[key])
mutex.release()
except:
mutex.release()
print 'Exist'
continue
results2 = complex_words(title)
for result in results2:
result = result.strip()
if len(result) >= 3 and result not in spacewords:
if check(result,cid):
continue
else:
li = sphinx_result(result,sc)
for l in li:
try:
int(l[0])
except:
temp = l[0].split(' ')[0]
if len(temp) > 1:
try:
int(l[0])
except:
try:
mutex.acquire()
write_item(l[0].lower(),l[1],l[2],l[3])
mutex.release()
except:
mutex.release()
print 'Exist'
continue
def main():
start_time = datetime.now()
global mutex,sc
threads = []
q = Queue()
mutex = threading.Lock()
#建立Sphinx对象
sc = SphinxClient()
for r in range(0,50):
threads.append(Mythread(q))
for t in threads:
t.setDaemon(True)
t.start()
i = 0
j = 1000
#获取数据,每次取1000条,直至取完为止
items = get_item(i,j)
temp = 1
while len(items) > 0:
for item in items:
q.put(item)
i = temp + j
items = get_item(i,j)
temp = i
for t in threads:
t.join()
q.join()
end_time = datetime.now()
print '###TIME:',end_time - start_time
if __name__ == "__main__":
main()
Threading模块:
http://docs.python.org/library/threading.html
QUEUE模块
http://docs.python.org/library/queue.html#module-Queue
Sphinx
http://sphinxsearch.com/docs/
如果你想测试这些代码,首先你必须有MYSQL,MYSQLDB模块,以及Sphinx,还要有Sphinx建立的索引,附件是我的源代码
最后一点,网络是我们最好的老师,有什么问题,只管GOOGLE,一切都可以解决
- 大小: 37.5 KB
分享到:
相关推荐
在实际编程中,多线程计时器需要考虑线程安全、性能优化以及异常处理等问题。合理的多线程设计可以极大地提升用户体验,使程序更加高效且响应迅速。通过深入理解和熟练运用多线程技术,开发者能够创建出更加复杂和...
6. **多线程**:为了保证用户界面的响应速度,TaskBoardStudio可能会使用多线程来处理耗时的操作,如数据同步、网络请求等。QThread类可以帮助我们在PyQt5中实现并发。 7. **网络通信**:如果TaskBoardStudio支持云...
- **多线程注意事项**:提醒开发者在使用多线程时需要注意的问题,以避免常见的陷阱。 - **多进程与多线程对比**:比较了多进程和多线程两种并发模型的优缺点,并给出选择建议。 - **异步与事件驱动架构**:探讨了...
Python提供了多线程、多进程、异步I/O等机制来提高性能。例如,使用gevent或asyncio库实现异步编程,以并发处理多个请求。 6. 错误处理与日志记录: 系统应具备良好的错误处理机制,以确保稳定运行。Python的...
- **优化**:采用C扩展、多线程等方式提高执行效率。 **3.11 科学计算** - **库**:使用NumPy、SciPy进行数值计算。 - **可视化**:使用Matplotlib绘制图表。 **3.12 图像处理** - **库**:使用OpenCV、PIL进行...
* Python 并发编程:多线程、多进程、协程、异步编程等 * Python 并发编程框架:threading、multiprocessing、asyncio 等 Python 网络编程 * Python 网络编程: socket 编程、TCP/IP 协议、网络协议等 * Python ...
4. **并发与异步编程**:Python的多线程和多进程模型,以及现代的asyncio库,用于实现高效的并发和异步操作,是Python开发中的重要部分,特别是对于网络编程和IO密集型应用。 5. **标准库的深度探索**:Python的...
了解Python的性能瓶颈,如全局解释器锁(GIL),以及如何利用多线程、多进程或者异步I/O(如asyncio库)来提高程序的并发能力。 12. **代码规范**: 遵循PEP 8编码规范,使代码易于阅读和维护。了解代码审查和...
11. **并发与异步**:Python的threading和multiprocessing模块支持多线程和多进程,asyncio库提供了异步I/O编程的能力,以应对高并发场景。 12. **测试与调试**:Python有unittest、pytest等测试框架,用于编写单元...
7. **多线程/异步I/O**:Python的`threading`和`asyncio`模块可能被用来实现并发处理,提高程序响应速度,比如在播放音乐时同时处理用户请求。 8. **音乐播放功能**:Python的`pygame`库可能被用于处理音频播放,它...
7. **并发与多线程**:Python提供了threading模块来支持多线程,还有asyncio库用于异步编程,这在处理大量并发任务时非常重要。 8. **调试和测试**:掌握pdb调试器的使用,以及unittest测试框架,以确保代码的质量...
1. **Python 版本选择 (Python versions)**:选择合适的Python版本是至关重要的,因为不同的版本具有不同的特性和兼容性问题。 2. **项目结构 (Project layout)**:一个清晰的项目结构可以提高开发效率,并使得维护...
3. **异常处理**:使用try-except语句进行错误处理,确保程序在遇到问题时能够优雅地处理,而不是突然崩溃。 4. **标准库**:Python拥有丰富的标准库,如os模块用于操作系统交互,sys模块用于系统相关操作,time...
5. 多线程与并发:为了实现后台数据处理和用户交互的并行,系统可能使用Python的`threading`库来管理多个并发任务。 6. 日志管理:利用`logging`库记录系统运行日志,便于调试和问题排查。 7. Web接口:如果系统...
总的来说,这款基于Python的录屏软件源码涵盖了Python GUI编程、图像和视频处理、音频录制、多线程处理、文件I/O等多个核心知识点,是学习Python编程和多媒体处理的一个良好实践案例。通过研究这个项目,开发者可以...
6. **多线程/异步处理**:为了提高效率,ProxyPool可能会采用多线程或异步IO技术处理请求。Python的`threading`库支持多线程,而`asyncio`库则提供了基于事件循环的异步编程模型。 7. **日志记录**:在开发过程中,...
12. **并发和异步**:Python的多线程、多进程和asyncio模块提供了处理并发和异步操作的方式,以优化性能。 13. **测试和调试**:Python的unittest模块提供了单元测试框架,学会编写测试用例和调试代码能确保程序的...