`

Programming Ruby(读书笔记)-12章(Fibers,线程,进程)

    博客分类:
  • Ruby
阅读更多

线程与进程可以程序工作在并行状态。Fiber挂起执行中的某部分,进而执行另外部分。

12.1 Fibers

Ruby 1.9后引入。Fiber可理解了一种程序间的协作机制(coroutine mechanism)。使得我们编程像是多线程程序,但又不引用多线程的问题。下面示例读取文本并计算每个单词的出现次数

#常规的程序写法
counts = Hash.new(0)
File.foreach("testfile") do |line|
  line.scan(/\w+/) do |word|
    word = word.downcase
    counts[word] += 1
  end
end
counts.keys.sort.each {|k| print "#{k}:#{counts[k]}"}

produces:
and:1 is:3 line:3 on:1 one:1 so:1 this:3 three:1 two:1

 上面代码有点混乱,就是把扫描行取出单词的逻辑与计算单词的出现次数的逻辑混在一起了。我们应该写一个方法用于读取文本,获取单词,然后yield每一个单词给统计块。使用Fiber的写法如下:

words = Fiber.new do
  File.foreach("testfile") do |line|
    line.scan(/\w+/) do |word|
      Fiber.yield word.downcase
    end
  end
  nil
end

counts = Hash.new(0)
while word = words.resume
  counts[word] += 1
end
counts.keys.sort.each {|k| print "#{k}:#{counts[k]}"}

 Fiber的构造方法接收代码块,返回Fiber对象。代码块中的代码不会立即执行。当调用Fiber#resume时,代码块中的代码将执行-读取文件,解析单词,完成后,当调用Fiber.yield时,Ruby将挂起当前执行,把控制权交给调用Fiber#resume的代码,并且resume方法返回Fiber#yield传入的值。循环(foreach)完成后,返回nil。这边的while也将停止。

Fiber.resume的方法就像调用一个方法。当返回nil以后,如果再试图调用Fiber.resume时,Ruby将抛出FiberError。

 

Fiber常被用来制造序号值,下面的示例用来产生可以被2整除,但不能被3整除的数

twos = Fiber.new do
  num = 2
  loop do
    Fiber.yield(num) unless num %3 ==0
    num += 2
  end
end
10.times { print twos.resume, " " }
produces:
2 4 8 10 14 16 20 22 26 28

 Fiber是对象,可被用来传参,赋给变量,但是Fiber只允许在创建它的线程中执行。

 

Fibers,Coroutines,and Continuations

 原生的fiber有限制-fiber只允许返回给调用给resume的代码。Ruby有两个基本包扩展了这种行为。当加载进来后,fiber有一个transfer方法,允许transfer控制到其它fiber。

------------------

A related but more general mechanism is the continuation. A continuation is a way of

recording the state of your running program (where it is, the current binding, and so on)

and then resuming from that state at some point in the future. You can use continuations to

implementcoroutines(andothernewcontrolstructures).Continuationshavealsobeenused

to store the state of a running web application between requests—a continuation is created

when the application sends a response to the browser; then, when the next request arrives

from that browser, the continuation is invoked, and the application continues from where

it left off. You enable continuations in Ruby by requiring the continuation library, described

in the library section on page 739.

--------------------

12.2 多线程

 Ruby1.8及之前,线程不是操作系统级别,而只是Ruby解释器(interpreter)进行多个线程切换。1.9开始,线程是操作系统级别的。对于多核是一种利好。但是由于需要兼容旧的代码,Ruby实际上操作系统上的线程现时还是只有一个,并非真正意义上的多线程。

创建线程

require 'net/http'

pages = %w(www.rubycentral.org slashdot.org)

threads = pages.map do |page_to_fetch|
  Thread.new(page_to_fetch) do |url|
    http = Net::HTTP.new(url, 80)
    print "Fetching: #{url}\n"
    resp = http.get('/')
    print "Got #{url}: #{resp.message}\n"
  end
end
threads.each {|thr| thr.join}

produces:
Fetching: www.rubycentral.org
Fetching: slashdot.org
Got slashdot.org:OK
Got www.rubycentral.org:OK
#Thread.new(...)可以接收任意多个参数,用于后面的块使用。上例中没有直接使用page_to_fetch,是因为考虑同步的问题,所以使用Thread内变量。
另外,使用print而不是puts,是因为puts包括两个过程,写参数,然后开始一个新的行。而print本身已经包含一个新行。

 

Manipulating Threads

Thread#join 等待执行完成,可接收一个时间参数;如果超时后返回nil

Thread#value 返回线程执行的最后一个表达式的值

Thread#current 返回当前线程

Thread#list 返回当前所有线程,包括已经停止的

Thread#alive? 是否存活

Thread#status 状态

Thread#priority= 设置优先线

 

Thread variables线程变量

可把线程对象看成Hash,使用[]=写,使用[]取。

count = 0
threads = 10.times.map |i|
  Thread.new do
    sleep(rand(0.))
    Thread.current[:mycount] = count
    count += 1
  end
end
threads.each {|thr| thr.join; print thr[:mycount], "  "}
puts "count = #{count}"
这个没有考虑同步的问题(count的race condition)

 

Threads and Exceptions

当线程抛出一个未处理的异常时,Ruby怎么处理,依赖两个配置项:abort_on_exception flag与interpreter’s $DEBUG flag。

当abort_on_exception=false,并且$DEBUG是not enabled。抛出异常时会终止线程。默认是这样。

当join一个抛出了异常的线程时,会在调用方再次raise这个异常。

threads = 4.times.map do |i|
  Thread.new(i) do |i|
    raise "Boom" if i == 1
    print "#{i}\n"
  end
end
puts "waiting"
threads.each do |t|
  begin
    t.join
  rescue RuntimeError => e
    print "Failed: #{e.message}"
  end
end
puts "Done"

produces:
waiting
0
2
3
Failed: BoomDone

 但是如果abort_on_exception设为true,或使用-d来开户debug模式时,线程抛出异常会终止主线程。

Thread.abort_on_exception = true
threads = 4.times.map do |number|
......
produces:
0
2
prog.rb:4:in `block (2 levels) in <main>': Boom! (RuntimeError)

 

12.3 控制线程的调度

正确的设计是线程只做它们自己的事,与其它线程不相干,比如线程间需要互相等待,就不好了。但是Ruby提供了一些方法支持这样。

Thread#stop

Thread#run

Thread#pass 类似于java的Thread.yield

 

12.4 Mutual Exclusion互斥

类似于synchronized,Ruby提供了Mutex。

sum = 0
mutex = Mutex.new
threads = 10.times.map do
  Thread.new do
    100_000.times do
      mutex.lock
      new_value = sum + 1
      print "#{new_value}   " if new_value % 250_000 == 0
      sum = new_value
      mutex_unlock
    end
  end
end
threads.each(&:join)
puts "\nsum = #{sum}"
produces:
250000 500000 750000 1000000
sum = 1000000

 Ruby提供了Mutex#synchronize方法,接收一个需要同步执行的块代码。

...
mutex = Mutex.new
....
  mutex.synchronize do
  ....
  end
...

 Mutex#try_lock用于尝试获取锁,如果获取不到(已经被其它线程获取并且没有释放,就返回false)。

下面的示例:使用Mutex#try_lock

rate_mutex = Mutex.new
exchange_rates = ExchangeRates.new
exchange_rates.update_from_online_feed

Thread.new do
  loop do
    sleep 3600
    rate_mutex.synchronize do
      exchange_rates.update_fro_online_feed
    end
  end
end

loop do
  print "Enter currency code and amount: "
  line = gets
  if rate_mutex.try_lock
    puts(exchange_rates.convert(line)) ensure rate_mutex.unlock
  else
    puts "Sorry, rates being updated. Try again in a minute"
  end
end

 如果在获取锁期间,希望临时释放锁,让其它线程有机会运行它,可使用Mutex#sleep。这个后面可跟一个时间。

mutex.sleep 3600
...

 

Queues and Condition variables

Ruby包提供了有Queue与线程间的Condition变量。

 

12.5 Running Multiple Processes运行多进程

当希望利用多CPU,或者在Ruby中起动不是Ruby编写的程序时,可使用多进程机制。

 

Spawning New Processes产生新进程

system("tar xzf test.tgz")
`date`

 简单的方式:使用Object#system或者用反引号。

Object#system:在子进程中执行命令。如果命令找到了并且执行正常,则返回true;如果命令没有找到,就raise一个异常,如果命令找到了但是执行出错,返回false,当错误发生以后,可以使用$?来获取exit code。

但是system()这种方式不会返回程序的输出内容。

`command`可以返回程序的输出内容,但是注意常常需要使用String#chomp来删除行尾符号

 

当Ruby进程需要与子进程通信时,可使用IO.popen。当使用这方法时,Ruby会启动一个执行这个命令在子进程中,并且搭建一个主进程与子进程间的管道。将子进程的标准输入与输出与Ruby的IO object连接进来。

pig = IO.popen("local/util/pig", "w+")
pig.puts "ice cream after they go to bed"
pig.close_write
puts pig.gets
produces:
iceway eamcray afterway eythay ogay otay edbay

 

Independent children

子进程的执行不阻塞主进程。

exec("sort testfile > output.txt") if fork.nil?
...主线程继续
Process.wait

 Object#fork返回进程ID,但是在子进程中返回nil

..................

 

Blocks and Subprocesses块与子进程

IO.popen传入块代码时,类似于File.open的逻辑。

IO.popen("date") {|f| puts "Date is #{f.gets}" }
produces:
Date is Mon May 27 12:31:17 CDT 2013

IO对象在块执行完成后,自动关闭。

 

 

fork do
  puts "In child, pid = #$$"
  exit 99
end

pid = Process.wait
puts "Child terminated, pid = #{pid}, status = #{$?.exitstatus}"
produces:
In child, pid = 22033
Child terminated, pid = 22033, status = 99
#$$解释器进程ID
windows不支持。

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics