想研究一下各大网站首页有多少个链接,于是用Ruby写了一个脚本,用Beanstalk消息队列,把任务放入Beanstalk中,同时开启多个子进程,如20个,并行运行,同时抓取网页,先抓取完成的进程再次读取Beanstalk任务,继续执行,直到没有任务后,进程退出,主进程等待所有子进程退出后,打印抓取的消息。
Ruby的多线程实际是只能跑在单cpu上,并且同一时刻cpu只处理一个线程,所以采用多进程抓取,消息队列采用最简单的Beanstalk,需要安装Beanstalkd服务。
示例代码主要抓取电商网站测试。
代码可以直接运行,需要ruby 1.9版本,1.9一下要稍微修改才能运行。
[代码] [Ruby]代码
001 |
#!/usr/bin/env ruby |
002 |
#encoding: UTF-8 |
003 |
004 |
# 抓取每一个站点的首页链接数量 |
005 |
# require 'rubygems' # 1.8.7 |
006 |
require 'ap' # gem install awesome_print
|
007 |
require 'json'
|
008 |
require 'net/http'
|
009 |
require 'nokogiri' # gem install nokogiri
|
010 |
require 'forkmanager' # gem install parallel-forkmanager
|
011 |
require 'beanstalk-client' # gem install beanstalk-client
|
012 |
013 |
class MultipleCrawler
|
014 |
015 |
class Crawler
|
016 |
def initialize(user_agent, redirect_limit= 1 )
|
017 |
@user_agent = user_agent
|
018 |
@redirect_limit = redirect_limit
|
019 |
@timeout = 20
|
020 |
end
|
021 |
attr_accessor :user_agent , :redirect_limit , :timeout
|
022 |
|
023 |
def fetch(website)
|
024 |
print "Pid:#{Process.pid}, fetch: #{website}\n"
|
025 |
redirect, url = @redirect_limit , website
|
026 |
start_time = Time .now
|
027 |
redirecting = false
|
028 |
begin
|
029 |
begin
|
030 |
uri = URI .parse(url)
|
031 |
req = Net:: HTTP ::Get. new (uri.path)
|
032 |
req.add_field( 'User-Agent' , @user_agent )
|
033 |
res = Net:: HTTP .start(uri.host, uri.port) do |http|
|
034 |
http.read_timeout = @timeout
|
035 |
http.request(req)
|
036 |
end
|
037 |
if res.header[ 'location' ] # 遇到重定向,则url设定为location,再次抓取
|
038 |
url = res.header[ 'location' ]
|
039 |
redirecting = true
|
040 |
end
|
041 |
redirect -= 1
|
042 |
end while redirecting and redirect>= 0
|
043 |
opened_time = ( Time .now - start_time).round( 4 ) # 统计打开网站耗时
|
044 |
encoding = res.body.scan(/<meta.+?charset=["'\s]*([\w-]+)/i)[ 0 ]
|
045 |
encoding = encoding ? encoding[ 0 ].upcase : 'GB18030'
|
046 |
html = 'UTF-8' ==encoding ? res.body : res.body.force_encoding( 'GB2312' ==encoding || 'GBK' ==encoding ? 'GB18030' : encoding).encode( 'UTF-8' )
|
047 |
doc = Nokogiri:: HTML (html)
|
048 |
processed_time = ( Time .now - start_time - opened_time).round( 4 ) # 统计分析链接耗时, 1.8.7, ('%.4f' % float).to_f 替换 round(4)
|
049 |
[opened_time, processed_time, doc.css( 'a[@href]' ).size, res.header[ 'server' ]]
|
050 |
rescue =>e
|
051 |
e.message
|
052 |
end
|
053 |
end
|
054 |
end
|
055 |
|
056 |
def initialize(websites, beanstalk_jobs, pm_max= 1 , user_agent= '' , redirect_limit= 1 )
|
057 |
@websites = websites # 网址数组
|
058 |
@beanstalk_jobs = beanstalk_jobs # beanstalk服务器地址和管道参数
|
059 |
@pm_max = pm_max # 最大并行运行进程数
|
060 |
@user_agent = user_agent # user_agent 伪装成浏览器访问
|
061 |
@redirect_limit = redirect_limit # 允许最大重定向次数
|
062 |
|
063 |
@ipc_reader , @ipc_writer = IO .pipe # 缓存结果的 ipc 管道
|
064 |
end
|
065 |
|
066 |
attr_accessor :user_agent , :redirect_limit
|
067 |
|
068 |
def init_beanstalk_jobs # 准备beanstalk任务
|
069 |
beanstalk = Beanstalk::Pool. new (* @beanstalk_jobs )
|
070 |
#清空beanstalk的残留消息队列
|
071 |
begin
|
072 |
while job = beanstalk.reserve( 0 . 1 )
|
073 |
job.delete
|
074 |
end
|
075 |
rescue Beanstalk::TimedOut
|
076 |
print "Beanstalk queues cleared!\n"
|
077 |
end
|
078 |
@websites .size.times{|i| beanstalk.put(i)} # 将所有的任务压栈
|
079 |
beanstalk.close
|
080 |
rescue => e
|
081 |
puts e
|
082 |
exit
|
083 |
end
|
084 |
|
085 |
def process_jobs # 处理任务
|
086 |
start_time = Time .now
|
087 |
pm = Parallel::ForkManager. new ( @pm_max )
|
088 |
@pm_max .times do |i|
|
089 |
pm.start(i) and next # 启动后,立刻 next 不会等待进程执行完,这样才可以并行运算
|
090 |
beanstalk = Beanstalk::Pool. new (* @beanstalk_jobs )
|
091 |
@ipc_reader .close # 关闭读取管道,子进程只返回数据
|
092 |
loop{
|
093 |
begin
|
094 |
job = beanstalk.reserve( 0 . 1 ) # 检测超时为0.1秒,因为任务以前提前压栈
|
095 |
index = job.body
|
096 |
job.delete
|
097 |
website = @websites [index.to_i]
|
098 |
result = Crawler. new ( @user_agent ).fetch(website)
|
099 |
@ipc_writer .puts( ({website=>result}).to_json )
|
100 |
rescue Beanstalk::DeadlineSoonError, Beanstalk::TimedOut, SystemExit, Interrupt
|
101 |
break
|
102 |
end
|
103 |
}
|
104 |
@ipc_writer .close
|
105 |
pm.finish( 0 )
|
106 |
end
|
107 |
@ipc_writer .close
|
108 |
begin
|
109 |
pm.wait_all_children # 等待所有子进程处理完毕
|
110 |
rescue SystemExit, Interrupt # 遇到中断,打印消息
|
111 |
print "Interrupt wait all children!\n"
|
112 |
ensure
|
113 |
results = read_results
|
114 |
ap results, :indent => - 4 , :index => false # 打印处理结果
|
115 |
print "Process end, total: #{@websites.size}, crawled: #{results.size}, time: #{'%.4f' % (Time.now - start_time)}s.\n"
|
116 |
end
|
117 |
end
|
118 |
|
119 |
def read_results # 通过管道读取子进程抓取返回的数据
|
120 |
results = {}
|
121 |
while result = @ipc_reader .gets
|
122 |
results.merge! JSON .parse(result)
|
123 |
end
|
124 |
@ipc_reader .close
|
125 |
results
|
126 |
end
|
127 |
|
128 |
def run # 运行入口
|
129 |
init_beanstalk_jobs
|
130 |
process_jobs
|
131 |
end
|
132 |
end |
133 |
134 |
websites = %w( |
135 |
http://www.51buy.com/ http://www.360buy.com/ http://www.tmall.com/ http://www.taobao.com/ |
136 |
http://china.alibaba.com/ http://www.paipai.com/ http://shop.qq.com/ http://www.lightinthebox.com/ |
137 |
http://www.amazon.cn/ http://www.newegg.com.cn/ http://www.vancl.com/ http://www.yihaodian.com/ |
138 |
http://www.dangdang.com/ http://www.m18.com/ http://www.suning.com/ http://www.hstyle.com/ |
139 |
) |
140 |
beanstalk_jobs = [[ 'localhost:11300' ], 'crawler-jobs' ]
|
141 |
user_agent = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:13.0) Gecko/20100101 Firefox/13.0'
|
142 |
pm_max = 10
|
143 |
144 |
MultipleCrawler. new (websites, beanstalk_jobs, pm_max, user_agent).run
|
相关推荐
用ruby写的网页抓取脚本,可用于在线文档下载以及整站下载。
这个库的核心理念是将工作分解成可独立执行的部分,然后在多个线程或进程中并行运行这些部分。这在处理大数据集、执行计算密集型任务或进行批量操作时特别有用。 并行处理的基本概念包括并行和并发。并行处理是指多...
### Ruby 实现网页图片抓取 #### 知识点概览 1. **Nokogiri 库介绍与使用** 2. **Open-URI 库的使用** 3. **Ruby 基本语法:模块、方法定义与调用** 4. **网页数据解析** 5. **图片下载与保存** #### 详细解释 ##...
`ForkOff`是Ruby中用于实现简单并行处理的工具,它基于操作系统级别的`fork`系统调用,允许程序创建子进程来同时执行任务,从而提高性能。 `ForkOff`库的主要优点在于其轻量级和易用性。与线程相比,子进程拥有独立...
在Ruby编程语言中,网页图片抓取是一种常见的网络爬虫技术,主要用于自动化地从网站上下载图片资源。这里,我们将详细解析标题为“ruby实现网页图片抓取”的代码,了解如何利用Ruby进行网页图片的抓取和下载。 首先...
Kimura - 用Ruby编写的现代Web抓取框架,与Headless Chromium / Firefox,PhantomJS或简单的HTTP请求一起开箱即用,并允许抓取用交互JavaScript呈现的网站
Ruby 支持多种并发模型,包括多线程、多进程及使用协程库(如 Goliath 或 Reel)。下面是一个使用进程进行并发处理的示例代码: ```ruby require 'process' Process.fork do puts "This is a new process" end ...
Ruby Ruby Ruby Ruby Ruby Ruby
并行在并行进程(>使用所有CPU)或线程(>加速阻止操作)中运行任何代码。 最适合于地图缩减或例如并行下载/上传。 安装gem install parallel并行并行运行任何代码以并行处理(>使用所有CPU)或线程(>加速阻止操作...
Eye在保留Bluepill的简洁性的同时,增加了更多功能,如支持更多的事件回调、更丰富的日志记录以及更灵活的进程监控策略。比如,Eye允许用户自定义健康检查,以确保进程在正常工作,并在检测到问题时采取相应措施。 ...
Ruby语言在进行进程管理时,提供了多种方法,其中之一就是`POSIX::Spawn`。这个库是Ruby的一个扩展,它提供了一种高效的方式来创建新的进程,尤其是对于Unix-like系统(包括Linux和Mac OS X)。`POSIX::Spawn`库的...
本项目专注于Android应用程序的自动化测试,使用了Python编程语言结合appium、pytest以及allure框架,实现了一套高效的多机并行测试解决方案,尤其针对华为设备进行了优化。 首先,Appium是一个开源的自动化测试...
这对于避免多线程环境下的竞争条件非常重要,但也限制了Ruby程序在多核处理器上的并行性能。 - **MRI内部机制**:MRI(Matz's Ruby Interpreter)是Ruby的标准实现,它通过GIL保证了内存安全性和数据一致性。 - **...
`childprocess`库就是为了满足这种需求而诞生的,它是一个跨平台的Ruby库,允许开发者在Windows、Linux和macOS等操作系统上方便地管理子进程。 `childprocess`库的设计目标是提供一个简单易用且功能丰富的API,以...
God:过程管理的Ruby框架...使服务器进程和任务保持运行应该是部署过程中的简单部分。 God的目标是成为最简单,功能最强大的监视应用程序。 文档请参阅REPO_ROOT / doc中的内部仓库文档。 请参阅位于http:/的在线文档