`

Ruby多进程并行抓取网页

阅读更多

 

想研究一下各大网站首页有多少个链接,于是用Ruby写了一个脚本,用Beanstalk消息队列,把任务放入Beanstalk中,同时开启多个子进程,如20个,并行运行,同时抓取网页,先抓取完成的进程再次读取Beanstalk任务,继续执行,直到没有任务后,进程退出,主进程等待所有子进程退出后,打印抓取的消息。 
Ruby的多线程实际是只能跑在单cpu上,并且同一时刻cpu只处理一个线程,所以采用多进程抓取,消息队列采用最简单的Beanstalk,需要安装Beanstalkd服务。 
示例代码主要抓取电商网站测试。 
代码可以直接运行,需要ruby 1.9版本,1.9一下要稍微修改才能运行。

 

[代码] [Ruby]代码

001 #!/usr/bin/env ruby
002 #encoding: UTF-8
003  
004 # 抓取每一个站点的首页链接数量
005 # require 'rubygems'            # 1.8.7
006 require 'ap'                # gem install awesome_print
007 require 'json'
008 require 'net/http'
009 require 'nokogiri'          # gem install nokogiri
010 require 'forkmanager'       # gem install parallel-forkmanager
011 require 'beanstalk-client'  # gem install beanstalk-client
012  
013 class MultipleCrawler
014  
015     class Crawler
016         def initialize(user_agent, redirect_limit=1)
017             @user_agent = user_agent
018             @redirect_limit = redirect_limit
019             @timeout 20
020         end
021         attr_accessor :user_agent:redirect_limit:timeout
022          
023         def fetch(website)
024             print "Pid:#{Process.pid}, fetch: #{website}\n"
025             redirect, url = @redirect_limit, website
026             start_time = Time.now
027             redirecting = false
028             begin
029                 begin
030                     uri = URI.parse(url)
031                     req = Net::HTTP::Get.new(uri.path)
032                     req.add_field('User-Agent'@user_agent)
033                     res = Net::HTTP.start(uri.host, uri.port) do |http|
034                         http.read_timeout = @timeout
035                         http.request(req)
036                     end
037                     if res.header['location'# 遇到重定向,则url设定为location,再次抓取
038                         url = res.header['location']
039                         redirecting = true
040                     end
041                     redirect -= 1
042                 end while redirecting and redirect>=0
043                 opened_time = (Time.now - start_time).round(4# 统计打开网站耗时
044                 encoding = res.body.scan(/<meta.+?charset=["'\s]*([\w-]+)/i)[0]
045                 encoding = encoding ? encoding[0].upcase : 'GB18030'
046                 html = 'UTF-8'==encoding ? res.body : res.body.force_encoding('GB2312'==encoding || 'GBK'==encoding ? 'GB18030' : encoding).encode('UTF-8')
047                 doc = Nokogiri::HTML(html)
048                 processed_time = (Time.now - start_time - opened_time).round(4)# 统计分析链接耗时, 1.8.7, ('%.4f' % float).to_f 替换 round(4)
049                 [opened_time, processed_time, doc.css('a[@href]').size, res.header['server']]
050             rescue =>e
051                 e.message 
052             end
053         end
054     end
055      
056     def initialize(websites, beanstalk_jobs, pm_max=1, user_agent='', redirect_limit=1)
057         @websites = websites                # 网址数组
058         @beanstalk_jobs = beanstalk_jobs    # beanstalk服务器地址和管道参数
059         @pm_max = pm_max                    # 最大并行运行进程数
060         @user_agent = user_agent            # user_agent 伪装成浏览器访问
061         @redirect_limit = redirect_limit    # 允许最大重定向次数
062          
063         @ipc_reader@ipc_writer IO.pipe # 缓存结果的 ipc 管道
064     end
065      
066     attr_accessor :user_agent:redirect_limit
067      
068     def init_beanstalk_jobs # 准备beanstalk任务
069         beanstalk = Beanstalk::Pool.new(*@beanstalk_jobs)
070         #清空beanstalk的残留消息队列
071         begin
072             while job = beanstalk.reserve(0.1)
073                 job.delete
074             end
075         rescue Beanstalk::TimedOut
076             print "Beanstalk queues cleared!\n"
077         end
078         @websites.size.times{|i| beanstalk.put(i)} # 将所有的任务压栈
079         beanstalk.close
080         rescue => e
081             puts e
082             exit
083     end
084      
085     def process_jobs # 处理任务
086         start_time = Time.now
087         pm = Parallel::ForkManager.new(@pm_max)
088         @pm_max.times do |i|
089             pm.start(i) and next # 启动后,立刻 next 不会等待进程执行完,这样才可以并行运算
090             beanstalk = Beanstalk::Pool.new(*@beanstalk_jobs)
091             @ipc_reader.close    # 关闭读取管道,子进程只返回数据
092             loop{
093                 begin
094                     job = beanstalk.reserve(0.1# 检测超时为0.1秒,因为任务以前提前压栈
095                     index = job.body
096                     job.delete
097                     website = @websites[index.to_i]
098                     result = Crawler.new(@user_agent).fetch(website)
099                     @ipc_writer.puts( ({website=>result}).to_json )
100                 rescue Beanstalk::DeadlineSoonError, Beanstalk::TimedOut, SystemExit, Interrupt
101                     break
102                 end
103             }
104             @ipc_writer.close
105             pm.finish(0)   
106         end
107         @ipc_writer.close
108         begin
109             pm.wait_all_children        # 等待所有子进程处理完毕
110         rescue SystemExit, Interrupt    # 遇到中断,打印消息
111             print "Interrupt wait all children!\n"
112         ensure
113             results = read_results
114             ap results, :indent => -4 :index=>false # 打印处理结果
115             print "Process end, total: #{@websites.size}, crawled: #{results.size}, time: #{'%.4f' % (Time.now - start_time)}s.\n"
116         end
117     end
118      
119     def read_results # 通过管道读取子进程抓取返回的数据
120         results = {}
121         while result = @ipc_reader.gets
122             results.merge! JSON.parse(result)
123         end
124         @ipc_reader.close
125         results
126     end
127      
128     def run # 运行入口
129         init_beanstalk_jobs
130         process_jobs
131     end
132 end
133  
134 websites = %w(
135 http://www.51buy.com/ http://www.360buy.com/ http://www.tmall.com/ http://www.taobao.com/
136 http://china.alibaba.com/ http://www.paipai.com/ http://shop.qq.com/ http://www.lightinthebox.com/
137 http://www.amazon.cn/ http://www.newegg.com.cn/ http://www.vancl.com/ http://www.yihaodian.com/
138 http://www.dangdang.com/ http://www.m18.com/ http://www.suning.com/ http://www.hstyle.com/
139 )
140 beanstalk_jobs = [['localhost:11300'],'crawler-jobs']
141 user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:13.0) Gecko/20100101 Firefox/13.0'
142 pm_max = 10
143  
144 MultipleCrawler.new(websites, beanstalk_jobs, pm_max, user_agent).run
2
0
分享到:
评论

相关推荐

    网页抓取脚本(by ruby)

    用ruby写的网页抓取脚本,可用于在线文档下载以及整站下载。

    Ruby-ParallelRuby让并行处理简单和快速

    这个库的核心理念是将工作分解成可独立执行的部分,然后在多个线程或进程中并行运行这些部分。这在处理大数据集、执行计算密集型任务或进行批量操作时特别有用。 并行处理的基本概念包括并行和并发。并行处理是指多...

    Ruby实现网页图片抓取

    ### Ruby 实现网页图片抓取 #### 知识点概览 1. **Nokogiri 库介绍与使用** 2. **Open-URI 库的使用** 3. **Ruby 基本语法:模块、方法定义与调用** 4. **网页数据解析** 5. **图片下载与保存** #### 详细解释 ##...

    Ruby-forkoffruby简单的并行处理

    `ForkOff`是Ruby中用于实现简单并行处理的工具,它基于操作系统级别的`fork`系统调用,允许程序创建子进程来同时执行任务,从而提高性能。 `ForkOff`库的主要优点在于其轻量级和易用性。与线程相比,子进程拥有独立...

    ruby实现网页图片抓取

    在Ruby编程语言中,网页图片抓取是一种常见的网络爬虫技术,主要用于自动化地从网站上下载图片资源。这里,我们将详细解析标题为“ruby实现网页图片抓取”的代码,了解如何利用Ruby进行网页图片的抓取和下载。 首先...

    Ruby-Kimura用Ruby编写的现代Web抓取框架

    Kimura - 用Ruby编写的现代Web抓取框架,与Headless Chromium / Firefox,PhantomJS或简单的HTTP请求一起开箱即用,并允许抓取用交互JavaScript呈现的网站

    线程与并发:Ruby并行世界的探索之旅

    Ruby 支持多种并发模型,包括多线程、多进程及使用协程库(如 Goliath 或 Reel)。下面是一个使用进程进行并发处理的示例代码: ```ruby require 'process' Process.fork do puts "This is a new process" end ...

    Ruby Ruby Ruby Ruby Ruby Ruby

    Ruby Ruby Ruby Ruby Ruby Ruby

    ruby代码ruby代码ruby代码ruby代码ruby代码ruby代码

    ruby代码ruby代码ruby代码ruby代码ruby代码ruby代码ruby代码ruby代码

    Ruby:并行处理变得简单而快速-Ruby开发

    并行在并行进程(&gt;使用所有CPU)或线程(&gt;加速阻止操作)中运行任何代码。 最适合于地图缩减或例如并行下载/上传。 安装gem install parallel并行并行运行任何代码以并行处理(&gt;使用所有CPU)或线程(&gt;加速阻止操作...

    Ruby-Eye进程监控工具灵感来自Bluepill和God

    Eye在保留Bluepill的简洁性的同时,增加了更多功能,如支持更多的事件回调、更丰富的日志记录以及更灵活的进程监控策略。比如,Eye允许用户自定义健康检查,以确保进程在正常工作,并在检测到问题时采取相应措施。 ...

    Ruby-posixspawnRubys快速进程spawn基于posixspawn系统接口

    Ruby语言在进行进程管理时,提供了多种方法,其中之一就是`POSIX::Spawn`。这个库是Ruby的一个扩展,它提供了一种高效的方式来创建新的进程,尤其是对于Unix-like系统(包括Linux和Mac OS X)。`POSIX::Spawn`库的...

    Working with Ruby Threads

    这对于避免多线程环境下的竞争条件非常重要,但也限制了Ruby程序在多核处理器上的并行性能。 - **MRI内部机制**:MRI(Matz's Ruby Interpreter)是Ruby的标准实现,它通过GIL保证了内存安全性和数据一致性。 - **...

    Ruby-childprocess跨平台的ruby库来管理子进程

    `childprocess`库就是为了满足这种需求而诞生的,它是一个跨平台的Ruby库,允许开发者在Windows、Linux和macOS等操作系统上方便地管理子进程。 `childprocess`库的设计目标是提供一个简单易用且功能丰富的API,以...

    Ruby进程监控器-Ruby开发

    God:过程管理的Ruby框架...使服务器进程和任务保持运行应该是部署过程中的简单部分。 God的目标是成为最简单,功能最强大的监视应用程序。 文档请参阅REPO_ROOT / doc中的内部仓库文档。 请参阅位于http:/的在线文档

Global site tag (gtag.js) - Google Analytics