`
shuiyutian
  • 浏览: 11013 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Python多线程 VS JAVA多线程

阅读更多

      最近在学习Python,在网上遇到一哥们问一个关于用Python多线程的问题。具体就是读取一个文件中的IP信息,然利用百度地图open api获取IP所在地的经纬度,最后再将取回的结果分类为“请求成功”和“请求失败”分别存入两个文件中。由于自己也在学,所以也就拿来做了做,并分别用Python3和Java实现。

    稍稍分析了一下需求能看的出来里面性能瓶颈有两个:

    1、一个是查询百度地图OPEN API;

    2、写磁盘文件;

    于是想到把这两部分分开处理,自然的想到了生产-消费者模式来实现。

    对于IP文件信息格式如下(我的测试文件有41255行数据):

   

    61.0.0.0__61.255.255.255__亚洲____

    61.157.0.0__61.157.255.255__四川____

    61.139.0.0__61.139.127.255__四川____

    61.130.93.0__61.130.254.255__浙江____

    

   

    下面是Python3实现代码:

#coding:utf-8
import urllib.request
import json
import threading
import queue

FILE_ENCODING = "utf-8"
successList = []
failList = []
successPath = r"success.txt"
failPath = r"failed.txt"
ipPath = "ip.txt"
queryCount = 5
threadCount = 20
lock = threading.RLock()
separator = "|"
saveCount = 100
successIndex = 0
failIndex = 0
theCount = 0


queryQueue = queue.Queue()
processedQueue = queue.Queue()

class IPStruct(object):
    def __init__(self):
        self.startIP = None
        self.endIP = None
        self.address = ""
        self.position = ""
        self.status = ""


class QueryIPLocation(object):
    _retry = 5
    
    def query(self, objList):
        for obj in objList:
            url = self.get_url(obj)#.encode('utf-8')
#            print("url:%s" % url)
            for j in range(0, self._retry + 1):
                try:
                    result = urllib.request.urlopen(url,timeout=5)
                except Exception:
                    if j == self._retry:
                        obj.status = "Failed"
                        break
                    continue

                if result.getcode() != 200:
                    if j == self._retry:
                        obj.status = "Failed"
                else:
                    jsonRet = json.loads(result.read().decode())
                    if jsonRet['status'] == 0:
                        obj.status = "SUCCESS"
                        self.proc_ok(jsonRet, obj)
                        break
                    elif jsonRet['status'] == 1:
                        obj.status = "SERVER_ERROR"
                        break
                    elif jsonRet['status'] == 2:
                        obj.status = "INVALID_PARAMETER"
                        break
                    elif jsonRet["status"] == 5:
                        obj.status = "INVALID_AK"
                        break
                    elif jsonRet["status"] == 101:
                        obj.status = "DISABLE_SERVICE"
                        break
                    elif jsonRet['status'] == 345:
                        if j == self._retry:
                            obj.status = "MINUTE_QUERY_LIMIT"
                    elif jsonRet["status"] == 355:
                        obj.status = "DAY_QUERY_LIMIT"
                        break

    def get_url(self, obj):
        base = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll"
        return base + "&ip=" + str(obj.startIP)

    def proc_ok(self, result, obj):
        point = result["content"]["point"]
        address = result["content"]["address"]
        obj.address = address
        obj.position = str(point['x']) + "," + str(point['y'])

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self,queryQueue,processedQueue):
        threading.Thread.__init__(self)
        self.queryQueue = queryQueue
        self.processedQueue = processedQueue

    def run(self):
        while True:
            queryList = self.queryQueue.get()
            app = QueryIPLocation()
            try:
                app.query(queryList)
                for item in queryList:
                    self.processedQueue.put(item)
            except Exception:
                print("there has error..........")
                raise
            finally:
                self.queryQueue.task_done()
    
class ThreadFile(threading.Thread):
    """Threaded save File"""
    
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        global successList
        global failList
        global successIndex
        global failIndex
        global theCount
        while True:
            item = self.queue.get()
            if item.status == "SUCCESS":
                lock.acquire()
                try:
                    theCount += 1
                    print("theCount:",theCount)
                    successList.append(item)
                    successIndex += 1
                    if successIndex == saveCount:
                        save_result(successPath,resultSequence=successList)
                        successIndex = 0
                        del successList[:]
                except Exception:
                    raise
                finally:
                    lock.release()
            else:
                lock.acquire()
                try:
                    theCount += 1
                    print("theCount:",theCount)
                    failList.append(item)
                    failIndex += 1
                    if failIndex == saveCount:
                        save_result(failPath,resultSequence=failList)
                        failIndex = 0
                        del failList[:]
                except Exception:
                    raise
                finally:
                    lock.release()
            self.queue.task_done()

def save_result(filePath, resultSequence=[]):
    """save the success result to successPath"""
    line = ""
    file = open(filePath, "a")
    if not file:
        print("error open file %s to write" % filePath)
    for item in resultSequence:
        if item.status == "SUCCESS":
            line += item.startIP + separator + item.endIP + separator + item.address + separator + item.position + "\n"
        else:
            line += item.startIP + separator + item.endIP + separator + item.address + separator + item.status + "\n"
    file.write(line)
    file.close()

def get_ip_iter(file):
    ipList = []
    for line in file:
        addr = line.split("__")
        count = len(addr)
        if count <= 2 or "." not in addr[0] or "." not in addr[1]:
            continue
        obj = IPStruct()
        obj.startIP = addr[0]
        obj.endIP = addr[1]
        obj.address = addr[2]
        ipList.append(obj)
        if len(ipList) == queryCount:
            yield ipList
            del ipList[:]
    else:
        if len(ipList) > 0:
            yield ipList

def main():
    print("start read file")
    ipFile = open(ipPath,"r",encoding="utf-8")
    if not ipFile:
        print("errror open file %s" % ipPath)
        return
    
    iterIpFile = get_ip_iter(ipFile)
    for i in range(threadCount):
        t = ThreadUrl(queryQueue,processedQueue)
        t.setDaemon(True)
        t.start()

    for queryList in iterIpFile:
        queryQueue.put(list(queryList))

    for i in range(2):
        dt = ThreadFile(processedQueue)
        dt.setDaemon(True)
        dt.start()
    
    queryQueue.join()
    processedQueue.join()
     
    save_result(successPath,resultSequence=successList)
    save_result(failPath,resultSequence=failList)

    使用 time python3 ***.py 执行:

real    15m46.962s
user    0m38.042s
sys     1m0.162s

    可以看到运行时间15分46秒

 

    由于自己以前一直学的是Java,随即就对用Python和用Java的多线程来实现谁的执行速度会更高一些产生一些性趣,索性就会就Java也实现了同样的需求,并使用了同样的线程数。

    以下是JAVA实现的代码:

    

package test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

import net.sf.json.JSONObject;
import util.HttpUtil;
import util.JSONUtil;

public class IPQueryTest {
	
   
    
	public static void main(String[] args) throws InterruptedException {
		
		long start = System.currentTimeMillis();
		BlockingQueue<IPStruct> fileQueue = new LinkedBlockingQueue<IPStruct>();
		BlockingQueue<IPStruct> ipQueue = new LinkedBlockingQueue<IPStruct>();
		
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new ThreadReadFile(fileQueue));

		for(int i=0;i<20;i++){
			exec.execute(new ThreadQueryIPLocation(fileQueue,ipQueue));
		}

		for(int i=0;i<2 ;i++){
			exec.execute(new ThreadWriteFile(ipQueue));
		}
		
		while(true){
			Thread.sleep(10);
			if(fileQueue.size() == 0 && ipQueue.size() == 0){
				ThreadWriteFile.saveResult(ThreadWriteFile.SUCC_PATH, ThreadWriteFile.succIPStructs);
				ThreadWriteFile.saveResult(ThreadWriteFile.FAIL_PATH, ThreadWriteFile.failIPStructs);
				long stend = System.currentTimeMillis();
				System.out.println("total cost:" + (stend - start));
				break;
			}
			/*else{
				System.out.println("fileQueue size:" + fileQueue.size() + ",ipQueue size:" + ipQueue.size());
			}*/
		}
	}
}

class IPStruct {
	private String startIP;
	private String endIP;
	private String address;
	private String position;
	private String status;

	public String getStartIP() {
		return startIP;
	}

	public void setStartIP(String startIP) {
		this.startIP = startIP;
	}

	public String getEndIP() {
		return endIP;
	}

	public void setEndIP(String endIP) {
		this.endIP = endIP;
	}

	public String getAddress() {
		return address;
	}

	public void setAddress(String address) {
		this.address = address;
	}

	public String getPosition() {
		return position;
	}

	public void setPosition(String position) {
		this.position = position;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}
}

class ThreadReadFile implements Runnable{
	private final BlockingQueue<IPStruct> queue;
	private static final String FILE_PATH = "D:\\ip.txt";

	ThreadReadFile(BlockingQueue<IPStruct> q){
		this.queue = q;
	}
	
	@Override
	public void run() {
		try{
			produceIPS(queue);
		}catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	
	public void produceIPS(BlockingQueue<IPStruct> queue)throws InterruptedException{
		File file = null;
		BufferedReader reader = null;
		try{
			file = new File(FILE_PATH);
			reader = new BufferedReader(new FileReader(file));
			String line = null;
			IPStruct ipStruct = null;
			while((line = reader.readLine()) != null){
				ipStruct = formatIPStruct(line);
				if(null != ipStruct){
					queue.add(ipStruct);
				}
			}
		}catch(IOException e){
			e.printStackTrace();
		}finally{
			if(null != reader){
				try {
					reader.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	public IPStruct formatIPStruct(String line){
		String[] addr = line.split("__");
		if(addr.length <= 2 || addr[0].indexOf(".")== -1 || addr[1].indexOf(".") == -1){
			return null;
		}
		IPStruct ipStruct = new IPStruct();
		ipStruct.setStartIP(addr[0]);
		ipStruct.setEndIP(addr[1]);
		ipStruct.setAddress(addr[2]);
		return ipStruct;
	}
}

class ThreadQueryIPLocation implements Runnable{
	private final BlockingQueue<IPStruct> inputQueue;
	private final BlockingQueue<IPStruct> outputQueue;
	
	ThreadQueryIPLocation(BlockingQueue<IPStruct> inputQueue,BlockingQueue<IPStruct> outputQueue){
		this.inputQueue = inputQueue;
		this.outputQueue = outputQueue;
	}
	@Override
	public void run() {
		try{
			while(true){
				IPStruct ipStruct = inputQueue.take();
				queryIPLocation(ipStruct);
				outputQueue.add(ipStruct);
			}
		}
		catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	
	private static final int RETRY = 5;
	private static final String BASE_URL = "http://api.map.baidu.com/location/ip?ak=7E814968d7b3ee0440678cb17cb4aa29&coor=bd09ll";
	
	public void queryIPLocation(IPStruct obj) {
		String url = null;
		String result = null;
		url = BASE_URL + "&ip=" + obj.getStartIP();
		//System.out.println("url-->" + url);
		for (int j = 0; j <= RETRY; j++) {
			try {
				result = HttpUtil.getJsonStringFromUrl(url);
			} catch (Exception e) {
				if (j == RETRY) {
					obj.setStatus("Failed");
					break;
				}
				continue;
			}

			if (null != result) {
				JSONObject jsonObject = JSONUtil.toJSONObject(result);
				Integer status = (Integer) jsonObject.get("status");
				if (status == 0) {
					obj.setStatus("SUCCESS");
					procOK(jsonObject, obj);
					break;
				} else if (status == 1) {
					obj.setStatus("SERVER_ERROR");
					break;
				} else if (status == 5) {
					obj.setStatus("INVALID_AK");
					break;
				} else if (status == 101) {
					obj.setStatus("DISABLE_SERVICE");
					break;
				} else if (status == 345) {
					if (j == RETRY) {
						obj.setStatus("MINUTE_QUERY_LIMIT");
					}
				} else if (status == 355) {
					obj.setStatus("DAY_QUERY_LIMIT");
					break;
				}
			}
		}
	}

	public void procOK(JSONObject result, IPStruct obj) {
		JSONObject contentJSON = JSONUtil.toJSONObject(result.get("content"));
		JSONObject pointJSON = JSONUtil.toJSONObject(contentJSON.get("point").toString());
		obj.setAddress((String) contentJSON.get("address"));
		obj.setPosition(pointJSON.get("x") + "," + pointJSON.get("y"));
	}
}

class ThreadWriteFile implements Runnable{
	
	public static final List<IPStruct> succIPStructs = new ArrayList<IPStruct>();
	public static final List<IPStruct> failIPStructs = new ArrayList<IPStruct>();
	public static final String SEPARATOR = "|";
	public static final int FILE_SAVE_SIZE = 100;
	public static final String SUCC_PATH = "D:\\success.txt";
	public static final String FAIL_PATH = "D:\\fail.txt";
	public static AtomicInteger counter = new AtomicInteger(0);
	
	
	private final BlockingQueue<IPStruct> queue;
	
	ThreadWriteFile(BlockingQueue<IPStruct> queue){
		this.queue = queue;
	}
	
	@Override
	public void run() {
		try{
			while(true){
				IPStruct ipStruct = queue.take();
				count();
				if(ipStruct.getStatus() == "SUCCESS"){
					saveSuccIPStruct(ipStruct);
				}
				else{
					saveFailIPStruct(ipStruct);
				}
			}
		}
		catch(InterruptedException ex){
			ex.printStackTrace();
		}
	}
	private synchronized void count(){
		counter.addAndGet(1);
		System.out.println("count:" + counter.get());
	}
	
	public static void writeResult(String lines,String path){
		try {
			if(null == lines || lines.trim().equals("")){
				return;
			}
			FileWriter writer = new FileWriter(path,true);
			writer.write(lines);
			writer.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public synchronized void saveSuccIPStruct(IPStruct ipStruct){
		succIPStructs.add(ipStruct);
		if(succIPStructs.size() >= FILE_SAVE_SIZE){
			StringBuffer lines = new StringBuffer();
			for(IPStruct succIPStruct : succIPStructs){
				lines.append(succIPStruct.getStartIP()).append(SEPARATOR);
				lines.append(succIPStruct.getEndIP()).append(SEPARATOR);
				lines.append(succIPStruct.getAddress()).append(SEPARATOR);
				lines.append(succIPStruct.getPosition()).append("\n");
			}
			writeResult(lines.toString(),SUCC_PATH);
			succIPStructs.clear();
		}
	}
	
	public synchronized void saveFailIPStruct(IPStruct ipStruct){
		failIPStructs.add(ipStruct);
		if(failIPStructs.size() >= FILE_SAVE_SIZE){
			StringBuffer lines = new StringBuffer();
			for(IPStruct failIPStruct : failIPStructs){
				lines.append(failIPStruct.getStartIP()).append(SEPARATOR);
				lines.append(failIPStruct.getEndIP()).append(SEPARATOR);
				lines.append(failIPStruct.getAddress()).append(SEPARATOR);
				lines.append(failIPStruct.getStatus()).append("\n");
			}
			writeResult(lines.toString(),FAIL_PATH);
			failIPStructs.clear();
		}
	}
	
	public static void saveResult(String path,List<IPStruct> ipStructs){
		StringBuffer lines = new StringBuffer();
		for(IPStruct ipStruct : ipStructs){
			lines.append(ipStruct.getStartIP()).append(SEPARATOR);
			lines.append(ipStruct.getEndIP()).append(SEPARATOR);
			lines.append(ipStruct.getAddress()).append(SEPARATOR);
			if(ipStruct.getStatus() == "SUCCESS"){
				lines.append(ipStruct.getPosition()).append("\n");
			}
			else{
				lines.append(ipStruct.getStatus()).append("\n");
			}
		}
		writeResult(lines.toString(),path);
	}
}

    JAVA代码是在windows下eclipse里执行:

    

total cost:1132948

    可以看出JAVA代码的运行时间大约在11分钟。单单从运行时间上看java的实现会快差不多4分钟,大师们也都说过Python很关注程序员的生产率而不是绝对的性能。当然很大的可能性是我代码本身问题,而造成这4分钟的时间差。

    写这些也并不想真的比较python和java那个好,只是想这样可以有对比的学习,做以记录。

    

分享到:
评论

相关推荐

    探寻python多线程ctrl+c退出问题解决方案

    经常会遇到下述问题:很多io busy的应用采取多线程的方式来解决,但这时候会发现python命令行不响应ctrl-c 了,而对应的java代码则没有问题: 复制代码 代码如下: public class Test {   public static void main...

    基于Petri net with inhibitor arcs的Python多线程程序分析.pdf

    本文探讨了如何使用Petri网(特别是带抑制弧的Petri网)来分析Python多线程程序中的潜在错误,以及对错误进行定位和解决问题的方案。 知识点1:Python多线程编程 Python是近年来流行起来的编程语言,它具有简单、...

    java2python--java代码转python工具

    9. 多线程:Java的多线程模型(Thread类)与Python的多线程(threading模块)有差异,转换时需要考虑线程安全和并发控制。 10. 内存管理:Python的垃圾回收机制与Java的JVM不同,因此,转换后可能需要对内存使用...

    python爬虫之多线程、多进程爬虫

    多线程对爬虫的效率提高是非凡的,当我们使用python的多线程有几点是需要我们知道的:1.Python的多线程并不如java的多线程,其差异在于当python解释器开始执行任务时,受制于GIL(全局解释所),Python的线程被限制到...

    Python多线程爬虫项目源码.zip

    Python多线程爬虫是一种利用Python编程语言实现的网络数据抓取工具,它结合了多线程技术,能有效提高爬取效率。本项目源码提供了详细的实现方式,帮助我们理解如何在Python中构建一个多线程爬虫。下面将详细探讨...

    基于动态Web的Python多线程空气质量数据程序设计.pdf

    基于动态Web的Python多线程空气质量数据程序设计的知识点主要包括以下几个方面: 1. 动态Web的概念及其应用 动态Web是与静态网页相对的一种网页编程技术。它通过基本的html语法规范与Java、C#等高级程序设计语言、...

    python多线程DAY05.txt

    多进程/多线程并发 : 任何任务 3. 基于fork的多进程并发程序 每当有一个客户端连接就创建一个新的进程 4. ftp文件服务程序 *********************************************** 多线程并发 threading 的多...

    python 优缺点和java与python的jar

    4. **线程限制**:Python的全局解释器锁(GIL)限制了多线程的并行执行能力。 **Python与Java的jar文件交互:** Python可以通过Jython这个Java版本的Python解释器来运行Python代码并利用Java的资源。Jython可以...

    浅谈python多线程和队列管理shell程序

    ### Python多线程与队列管理Shell程序详解 在Python编程中,多线程和队列管理是非常重要的技术,尤其当涉及到并发处理多个任务时。本文将深入探讨Python中的多线程以及如何利用队列来管理和监控这些线程的状态。 #...

    常用多线程模板与鱼刺类多线程线程池应用小例子

    在IT行业中,多线程和线程池是提高程序并发性能和资源管理的关键技术。尤其在Java编程领域,它们在大型系统和并发密集型应用中扮演着重要角色。本篇文章将详细探讨“常用多线程模板”以及“鱼刺类(Fork/Join框架)...

    多线程执行任务具体实现

    多线程在Java、C++、Python等编程语言中都有广泛的应用。本篇文章将深入探讨多线程执行任务的具体实现方式。 一、线程的概念与优势 线程是操作系统分配CPU时间的基本单元,一个进程可以包含一个或多个线程。相比...

    Tesseract OCR多线程并发识别案例

    在处理大量图像或需要快速响应时间的应用场景中,多线程并发识别可以显著提升效率。以下将详细介绍如何利用Tesseract OCR实现多线程并发识别,以及可能涉及的相关技术点。 首先,理解Tesseract OCR的基本工作原理是...

    python多线程DAY04.txt

    解决了多个进程或者线程对共享资源的争夺 Event e.set e.clear e.wait Lock lock.acquire() lock.release() 4. 什么是线程 threading Thread() t.start() t.join() t.name t.getName t.setName t.daemon...

    一个多线程文件搜索的例子

    在Java、Python、C#等编程语言中,都提供了多线程的支持。 对于文件搜索,传统的单线程方法可能在处理大量数据时显得效率低下。例如,如果需要在一个大型文件系统或网络驱动器中查找特定文件,单线程搜索可能需要很...

    单线程 多线程 线程池 计算随机数的一价和

    在实际编程中,Java、Python等语言提供了丰富的多线程和线程池支持。例如,Java的`Thread`类代表线程,`ExecutorService`接口和`ThreadPoolExecutor`类用于实现线程池,而Python的`threading`模块提供了线程相关的...

    多线程和网络编程

    但是,多线程并不总是最佳解决方案,有时使用异步I/O(如Python的asyncio库或Java的NIO)可以达到更高的性能,因为它避免了线程切换的开销。 为了确保系统的稳定性和性能,开发者需要掌握如何有效地管理和调度线程...

Global site tag (gtag.js) - Google Analytics