最近在学习Python,在网上遇到一哥们问一个关于用Python多线程的问题。具体就是读取一个文件中的IP信息,然利用百度地图open api获取IP所在地的经纬度,最后再将取回的结果分类为“请求成功”和“请求失败”分别存入两个文件中。由于自己也在学,所以也就拿来做了做,并分别用Python3和Java实现。
稍稍分析了一下需求能看的出来里面性能瓶颈有两个:
1、一个是查询百度地图OPEN API;
2、写磁盘文件;
于是想到把这两部分分开处理,自然的想到了生产-消费者模式来实现。
对于IP文件信息格式如下(我的测试文件有41255行数据):
61.0.0.0__61.255.255.255__亚洲____
61.157.0.0__61.157.255.255__四川____
61.139.0.0__61.139.127.255__四川____
61.130.93.0__61.130.254.255__浙江____
下面是Python3实现代码:
#coding:utf-8 import urllib.request import json import threading import queue FILE_ENCODING = "utf-8" successList = [] failList = [] successPath = r"success.txt" failPath = r"failed.txt" ipPath = "ip.txt" queryCount = 5 threadCount = 20 lock = threading.RLock() separator = "|" saveCount = 100 successIndex = 0 failIndex = 0 theCount = 0 queryQueue = queue.Queue() processedQueue = queue.Queue() class IPStruct(object): def __init__(self): self.startIP = None self.endIP = None self.address = "" self.position = "" self.status = "" class QueryIPLocation(object): _retry = 5 def query(self, objList): for obj in objList: url = self.get_url(obj)#.encode('utf-8') # print("url:%s" % url) for j in range(0, self._retry + 1): try: result = urllib.request.urlopen(url,timeout=5) except Exception: if j == self._retry: obj.status = "Failed" break continue if result.getcode() != 200: if j == self._retry: obj.status = "Failed" else: jsonRet = json.loads(result.read().decode()) if jsonRet['status'] == 0: obj.status = "SUCCESS" self.proc_ok(jsonRet, obj) break elif jsonRet['status'] == 1: obj.status = "SERVER_ERROR" break elif jsonRet['status'] == 2: obj.status = "INVALID_PARAMETER" break elif jsonRet["status"] == 5: obj.status = "INVALID_AK" break elif jsonRet["status"] == 101: obj.status = "DISABLE_SERVICE" break elif jsonRet['status'] == 345: if j == self._retry: obj.status = "MINUTE_QUERY_LIMIT" elif jsonRet["status"] == 355: obj.status = "DAY_QUERY_LIMIT" break def get_url(self, obj): base = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll" return base + "&ip=" + str(obj.startIP) def proc_ok(self, result, obj): point = result["content"]["point"] address = result["content"]["address"] obj.address = address obj.position = str(point['x']) + "," + str(point['y']) class ThreadUrl(threading.Thread): """Threaded Url Grab""" def __init__(self,queryQueue,processedQueue): threading.Thread.__init__(self) self.queryQueue = queryQueue self.processedQueue = processedQueue def run(self): while True: queryList = self.queryQueue.get() app = QueryIPLocation() try: app.query(queryList) for item in queryList: self.processedQueue.put(item) except Exception: print("there has error..........") raise finally: self.queryQueue.task_done() class ThreadFile(threading.Thread): """Threaded save File""" def __init__(self,queue): threading.Thread.__init__(self) self.queue = queue def run(self): global successList global failList global successIndex global failIndex global theCount while True: item = self.queue.get() if item.status == "SUCCESS": lock.acquire() try: theCount += 1 print("theCount:",theCount) successList.append(item) successIndex += 1 if successIndex == saveCount: save_result(successPath,resultSequence=successList) successIndex = 0 del successList[:] except Exception: raise finally: lock.release() else: lock.acquire() try: theCount += 1 print("theCount:",theCount) failList.append(item) failIndex += 1 if failIndex == saveCount: save_result(failPath,resultSequence=failList) failIndex = 0 del failList[:] except Exception: raise finally: lock.release() self.queue.task_done() def save_result(filePath, resultSequence=[]): """save the success result to successPath""" line = "" file = open(filePath, "a") if not file: print("error open file %s to write" % filePath) for item in resultSequence: if item.status == "SUCCESS": line += item.startIP + separator + item.endIP + separator + item.address + separator + item.position + "\n" else: line += item.startIP + separator + item.endIP + separator + item.address + separator + item.status + "\n" file.write(line) file.close() def get_ip_iter(file): ipList = [] for line in file: addr = line.split("__") count = len(addr) if count <= 2 or "." not in addr[0] or "." not in addr[1]: continue obj = IPStruct() obj.startIP = addr[0] obj.endIP = addr[1] obj.address = addr[2] ipList.append(obj) if len(ipList) == queryCount: yield ipList del ipList[:] else: if len(ipList) > 0: yield ipList def main(): print("start read file") ipFile = open(ipPath,"r",encoding="utf-8") if not ipFile: print("errror open file %s" % ipPath) return iterIpFile = get_ip_iter(ipFile) for i in range(threadCount): t = ThreadUrl(queryQueue,processedQueue) t.setDaemon(True) t.start() for queryList in iterIpFile: queryQueue.put(list(queryList)) for i in range(2): dt = ThreadFile(processedQueue) dt.setDaemon(True) dt.start() queryQueue.join() processedQueue.join() save_result(successPath,resultSequence=successList) save_result(failPath,resultSequence=failList)
使用 time python3 ***.py 执行:
real 15m46.962s user 0m38.042s sys 1m0.162s
可以看到运行时间15分46秒
由于自己以前一直学的是Java,随即就对用Python和用Java的多线程来实现谁的执行速度会更高一些产生一些性趣,索性就会就Java也实现了同样的需求,并使用了同样的线程数。
以下是JAVA实现的代码:
package test; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import net.sf.json.JSONObject; import util.HttpUtil; import util.JSONUtil; public class IPQueryTest { public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); BlockingQueue<IPStruct> fileQueue = new LinkedBlockingQueue<IPStruct>(); BlockingQueue<IPStruct> ipQueue = new LinkedBlockingQueue<IPStruct>(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new ThreadReadFile(fileQueue)); for(int i=0;i<20;i++){ exec.execute(new ThreadQueryIPLocation(fileQueue,ipQueue)); } for(int i=0;i<2 ;i++){ exec.execute(new ThreadWriteFile(ipQueue)); } while(true){ Thread.sleep(10); if(fileQueue.size() == 0 && ipQueue.size() == 0){ ThreadWriteFile.saveResult(ThreadWriteFile.SUCC_PATH, ThreadWriteFile.succIPStructs); ThreadWriteFile.saveResult(ThreadWriteFile.FAIL_PATH, ThreadWriteFile.failIPStructs); long stend = System.currentTimeMillis(); System.out.println("total cost:" + (stend - start)); break; } /*else{ System.out.println("fileQueue size:" + fileQueue.size() + ",ipQueue size:" + ipQueue.size()); }*/ } } } class IPStruct { private String startIP; private String endIP; private String address; private String position; private String status; public String getStartIP() { return startIP; } public void setStartIP(String startIP) { this.startIP = startIP; } public String getEndIP() { return endIP; } public void setEndIP(String endIP) { this.endIP = endIP; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getPosition() { return position; } public void setPosition(String position) { this.position = position; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } } class ThreadReadFile implements Runnable{ private final BlockingQueue<IPStruct> queue; private static final String FILE_PATH = "D:\\ip.txt"; ThreadReadFile(BlockingQueue<IPStruct> q){ this.queue = q; } @Override public void run() { try{ produceIPS(queue); }catch(InterruptedException ex){ ex.printStackTrace(); } } public void produceIPS(BlockingQueue<IPStruct> queue)throws InterruptedException{ File file = null; BufferedReader reader = null; try{ file = new File(FILE_PATH); reader = new BufferedReader(new FileReader(file)); String line = null; IPStruct ipStruct = null; while((line = reader.readLine()) != null){ ipStruct = formatIPStruct(line); if(null != ipStruct){ queue.add(ipStruct); } } }catch(IOException e){ e.printStackTrace(); }finally{ if(null != reader){ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } } } public IPStruct formatIPStruct(String line){ String[] addr = line.split("__"); if(addr.length <= 2 || addr[0].indexOf(".")== -1 || addr[1].indexOf(".") == -1){ return null; } IPStruct ipStruct = new IPStruct(); ipStruct.setStartIP(addr[0]); ipStruct.setEndIP(addr[1]); ipStruct.setAddress(addr[2]); return ipStruct; } } class ThreadQueryIPLocation implements Runnable{ private final BlockingQueue<IPStruct> inputQueue; private final BlockingQueue<IPStruct> outputQueue; ThreadQueryIPLocation(BlockingQueue<IPStruct> inputQueue,BlockingQueue<IPStruct> outputQueue){ this.inputQueue = inputQueue; this.outputQueue = outputQueue; } @Override public void run() { try{ while(true){ IPStruct ipStruct = inputQueue.take(); queryIPLocation(ipStruct); outputQueue.add(ipStruct); } } catch(InterruptedException ex){ ex.printStackTrace(); } } private static final int RETRY = 5; private static final String BASE_URL = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll"; public void queryIPLocation(IPStruct obj) { String url = null; String result = null; url = BASE_URL + "&ip=" + obj.getStartIP(); //System.out.println("url-->" + url); for (int j = 0; j <= RETRY; j++) { try { result = HttpUtil.getJsonStringFromUrl(url); } catch (Exception e) { if (j == RETRY) { obj.setStatus("Failed"); break; } continue; } if (null != result) { JSONObject jsonObject = JSONUtil.toJSONObject(result); Integer status = (Integer) jsonObject.get("status"); if (status == 0) { obj.setStatus("SUCCESS"); procOK(jsonObject, obj); break; } else if (status == 1) { obj.setStatus("SERVER_ERROR"); break; } else if (status == 5) { obj.setStatus("INVALID_AK"); break; } else if (status == 101) { obj.setStatus("DISABLE_SERVICE"); break; } else if (status == 345) { if (j == RETRY) { obj.setStatus("MINUTE_QUERY_LIMIT"); } } else if (status == 355) { obj.setStatus("DAY_QUERY_LIMIT"); break; } } } } public void procOK(JSONObject result, IPStruct obj) { JSONObject contentJSON = JSONUtil.toJSONObject(result.get("content")); JSONObject pointJSON = JSONUtil.toJSONObject(contentJSON.get("point").toString()); obj.setAddress((String) contentJSON.get("address")); obj.setPosition(pointJSON.get("x") + "," + pointJSON.get("y")); } } class ThreadWriteFile implements Runnable{ public static final List<IPStruct> succIPStructs = new ArrayList<IPStruct>(); public static final List<IPStruct> failIPStructs = new ArrayList<IPStruct>(); public static final String SEPARATOR = "|"; public static final int FILE_SAVE_SIZE = 100; public static final String SUCC_PATH = "D:\\success.txt"; public static final String FAIL_PATH = "D:\\fail.txt"; public static AtomicInteger counter = new AtomicInteger(0); private final BlockingQueue<IPStruct> queue; ThreadWriteFile(BlockingQueue<IPStruct> queue){ this.queue = queue; } @Override public void run() { try{ while(true){ IPStruct ipStruct = queue.take(); count(); if(ipStruct.getStatus() == "SUCCESS"){ saveSuccIPStruct(ipStruct); } else{ saveFailIPStruct(ipStruct); } } } catch(InterruptedException ex){ ex.printStackTrace(); } } private synchronized void count(){ counter.addAndGet(1); System.out.println("count:" + counter.get()); } public static void writeResult(String lines,String path){ try { if(null == lines || lines.trim().equals("")){ return; } FileWriter writer = new FileWriter(path,true); writer.write(lines); writer.close(); } catch (IOException e) { e.printStackTrace(); } } public synchronized void saveSuccIPStruct(IPStruct ipStruct){ succIPStructs.add(ipStruct); if(succIPStructs.size() >= FILE_SAVE_SIZE){ StringBuffer lines = new StringBuffer(); for(IPStruct succIPStruct : succIPStructs){ lines.append(succIPStruct.getStartIP()).append(SEPARATOR); lines.append(succIPStruct.getEndIP()).append(SEPARATOR); lines.append(succIPStruct.getAddress()).append(SEPARATOR); lines.append(succIPStruct.getPosition()).append("\n"); } writeResult(lines.toString(),SUCC_PATH); succIPStructs.clear(); } } public synchronized void saveFailIPStruct(IPStruct ipStruct){ failIPStructs.add(ipStruct); if(failIPStructs.size() >= FILE_SAVE_SIZE){ StringBuffer lines = new StringBuffer(); for(IPStruct failIPStruct : failIPStructs){ lines.append(failIPStruct.getStartIP()).append(SEPARATOR); lines.append(failIPStruct.getEndIP()).append(SEPARATOR); lines.append(failIPStruct.getAddress()).append(SEPARATOR); lines.append(failIPStruct.getStatus()).append("\n"); } writeResult(lines.toString(),FAIL_PATH); failIPStructs.clear(); } } public static void saveResult(String path,List<IPStruct> ipStructs){ StringBuffer lines = new StringBuffer(); for(IPStruct ipStruct : ipStructs){ lines.append(ipStruct.getStartIP()).append(SEPARATOR); lines.append(ipStruct.getEndIP()).append(SEPARATOR); lines.append(ipStruct.getAddress()).append(SEPARATOR); if(ipStruct.getStatus() == "SUCCESS"){ lines.append(ipStruct.getPosition()).append("\n"); } else{ lines.append(ipStruct.getStatus()).append("\n"); } } writeResult(lines.toString(),path); } }
JAVA代码是在windows下eclipse里执行:
total cost:1132948
可以看出JAVA代码的运行时间大约在11分钟。单单从运行时间上看java的实现会快差不多4分钟,大师们也都说过Python很关注程序员的生产率而不是绝对的性能。当然很大的可能性是我代码本身问题,而造成这4分钟的时间差。
写这些也并不想真的比较python和java那个好,只是想这样可以有对比的学习,做以记录。
相关推荐
经常会遇到下述问题:很多io busy的应用采取多线程的方式来解决,但这时候会发现python命令行不响应ctrl-c 了,而对应的java代码则没有问题: 复制代码 代码如下: public class Test { public static void main...
本文探讨了如何使用Petri网(特别是带抑制弧的Petri网)来分析Python多线程程序中的潜在错误,以及对错误进行定位和解决问题的方案。 知识点1:Python多线程编程 Python是近年来流行起来的编程语言,它具有简单、...
9. 多线程:Java的多线程模型(Thread类)与Python的多线程(threading模块)有差异,转换时需要考虑线程安全和并发控制。 10. 内存管理:Python的垃圾回收机制与Java的JVM不同,因此,转换后可能需要对内存使用...
多线程对爬虫的效率提高是非凡的,当我们使用python的多线程有几点是需要我们知道的:1.Python的多线程并不如java的多线程,其差异在于当python解释器开始执行任务时,受制于GIL(全局解释所),Python的线程被限制到...
Python多线程爬虫是一种利用Python编程语言实现的网络数据抓取工具,它结合了多线程技术,能有效提高爬取效率。本项目源码提供了详细的实现方式,帮助我们理解如何在Python中构建一个多线程爬虫。下面将详细探讨...
另外包含Python多线程、异步+多进程爬虫实现代码详解,需要的可下载试试! 多线程对爬虫的效率提高是非凡的,当我们使用python的多线程有几点是需要我们知道的:1、Python的多线程并不如java的多线程,其差异在于当...
基于动态Web的Python多线程空气质量数据程序设计的知识点主要包括以下几个方面: 1. 动态Web的概念及其应用 动态Web是与静态网页相对的一种网页编程技术。它通过基本的html语法规范与Java、C#等高级程序设计语言、...
多进程/多线程并发 : 任何任务 3. 基于fork的多进程并发程序 每当有一个客户端连接就创建一个新的进程 4. ftp文件服务程序 *********************************************** 多线程并发 threading 的多...
4. **线程限制**:Python的全局解释器锁(GIL)限制了多线程的并行执行能力。 **Python与Java的jar文件交互:** Python可以通过Jython这个Java版本的Python解释器来运行Python代码并利用Java的资源。Jython可以...
### Python多线程与队列管理Shell程序详解 在Python编程中,多线程和队列管理是非常重要的技术,尤其当涉及到并发处理多个任务时。本文将深入探讨Python中的多线程以及如何利用队列来管理和监控这些线程的状态。 #...
在IT行业中,多线程和线程池是提高程序并发性能和资源管理的关键技术。尤其在Java编程领域,它们在大型系统和并发密集型应用中扮演着重要角色。本篇文章将详细探讨“常用多线程模板”以及“鱼刺类(Fork/Join框架)...
多线程在Java、C++、Python等编程语言中都有广泛的应用。本篇文章将深入探讨多线程执行任务的具体实现方式。 一、线程的概念与优势 线程是操作系统分配CPU时间的基本单元,一个进程可以包含一个或多个线程。相比...
在处理大量图像或需要快速响应时间的应用场景中,多线程并发识别可以显著提升效率。以下将详细介绍如何利用Tesseract OCR实现多线程并发识别,以及可能涉及的相关技术点。 首先,理解Tesseract OCR的基本工作原理是...
解决了多个进程或者线程对共享资源的争夺 Event e.set e.clear e.wait Lock lock.acquire() lock.release() 4. 什么是线程 threading Thread() t.start() t.join() t.name t.getName t.setName t.daemon...
在Java、Python、C#等编程语言中,都提供了多线程的支持。 对于文件搜索,传统的单线程方法可能在处理大量数据时显得效率低下。例如,如果需要在一个大型文件系统或网络驱动器中查找特定文件,单线程搜索可能需要很...
在实际编程中,Java、Python等语言提供了丰富的多线程和线程池支持。例如,Java的`Thread`类代表线程,`ExecutorService`接口和`ThreadPoolExecutor`类用于实现线程池,而Python的`threading`模块提供了线程相关的...