`

多线程典型问题实现(生产者消费者问题、理发师问题与哲学家问题)

阅读更多

本文主要讨论了三个典型的多线程交互问题:生产者消费者问题、理发师问题与哲学家问题。我对上述三个问题做了基本的处理和编程实现(Ruby&Erlang)。

 

生产者消费者问题

各种语言实现本问题: http://dada.perl.it/shootout/prodcons_allsrc.html

./lib/utils.rb:

$KCODE = 'utf8'

require 'thread'
alias putsOld puts

Stdout_mutex = Mutex.new

def puts(*args)
  Stdout_mutex.synchronize do
    putsOld args
    $stdout.flush
  end
end

cp.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

# Two important problem:
# Empty and full
class CPQueue
  def initialize(num)
    @num = num
    @point = 0
    @items = []
    @semaphore = Mutex.new
    @resource = ConditionVariable.new
    @full = ConditionVariable.new
  end

  def pop
    @semaphore.synchronize do
      @resource.wait(@semaphore) while @point<=0
      item = @items[@point]
      @point -= 1
      @full.signal
      item
    end
  end

  def push(item)
    @semaphore.synchronize do
      @full.wait(@semaphore) while @point>=@num
      @point +=1
      @items[@point] = item
      @resource.signal
    end
  end
end

class BasicCP
  def initialize(queue)
    @queue = queue
  end
end

class Consumer < BasicCP
  @@CC = 0
  def consume
    product = @queue.pop
    @@CC += 1
    puts "#{@@CC}:Product #{product} consumed."
  end
end


class Producer < BasicCP
  @@iNum = 1
  def produce
    product = @@iNum
    @@iNum +=1
    @queue.push(product)
    puts "   Product #{product} produced."
  end
end

nn = []
100.times do |i|
  nn[i] = 10
  sized_queue = CPQueue.new(1+rand(nn[i]))

  consumer_threads = []
  (1+rand(nn[i])).times {
    consumer_threads << Thread.new {
      consumer = Consumer.new(sized_queue)
      sleep((rand(nn[i]))/100.0)
      consumer.consume
    }
  }

  (10+rand(3)).times{
    Thread.new {
      producer = Producer.new(sized_queue)
      sleep((rand(nn[i]))/100.0)
      producer.produce
    }
  }
  consumer_threads.each { |thread| thread.join }
end

 

cp.rb: 在考虑多线程程序时,我们会遇到资源问题。从表面上看,我们只会处理一个信号量就是储物柜(Queue),但是在考虑时,我们需要观察这个信号量描述对象的临界点(空的/满了),然后做信号通信。

-module(cp).
-export([main/0]).

main() ->
  forTest(1, 10)
  .

forTest(N,N) ->
  test(N);
forTest(M, N) ->
  test(M),
  forTest(M+1, N).

%num() ->
%  random:seed(),
%  random:uniform(100).

test(N) ->
  spawn(
    fun() ->
      Q = spawn(fun() -> queue({run, [], [] ,[] , N})end),
      T = N,
      forCP(1, T, fun()->
          spawn(fun() -> customer({run, Q}) end)
      end),
      forCP(1, T, fun()->
          spawn(fun() -> producer({run, Q, T}) end)
      end)
    end).

forCP(N, N, F) ->
  F();
forCP(M, N, F) ->
  F(),
  forCP(M+1, N, F).

queue({run, Items, CustomersWL, ProducerWL, N}) ->
  receive
    {From, Opt} ->
      {NI, NC, NP} = queue([From, Opt, Items, CustomersWL, ProducerWL, N]),
      queue({run, NI, NC, NP, N})
  end;

%Empty
queue([From, {pop}, [], C, P, _]) ->
  From ! {self() , {wait}},
  {[], [From|C], P};

%Data
queue([From, {pop}, [T|L], NC, NP, _]) ->
  From ! {self(), {data, T}},
  %Remove From in NC
  %Call NP
  isOk(NP),
  {L, NC, NP};

queue([From, {push, Data}, NI, NC, NP, N]) ->
  if
    length(NI) < N ->
      From ! {self(), {ok}},
      %Remove From in NP
      %Call NC
      isOk(NC),
      {[Data|NI], NC, NP};
    length(NI) >= N ->
      From ! {self(), {full}},
      {NI, NC, [From|NP]}
  end.

customer({run, Queue}) ->
  Queue ! {self(), {pop}},
  receive
    {NewQueue, {wait}} -> customer({wait, NewQueue});
    {_, {data, T}} -> customer({data, T})
  end;

customer({wait, _}) ->
  receive
    {From, ok} -> customer({run, From})
  end;

customer({data, D}) ->
  io:format("Custome ~p~n", [D]).

producer({run, Queue, N}) ->
  Queue ! {self(),{push, N}},
  receive
    {_, {ok}} -> io:format("Produce ~p~n", [N]);
    {NewQueue, {full}} -> producer({full, NewQueue, N})
  end;

producer({full, _, N}) ->
  receive
    {From, ok} -> producer({run, From, N})
  end.

isOk([])->
  {ok};
isOk([T|L])->
  T ! {self(), ok},
  isOk(L).

 理发师问题

bc.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

class BCQueue
  def initialize(num)
    @num = num
    @point = 0
    @items = []
    @semaphore = Mutex.new
  end

  def pop
    @semaphore.synchronize do
      if @point > 0
        item = @items[@point]
        @point -= 1
        item
      else
        nil
      end
    end
  end

  def push(item)
    @semaphore.synchronize do
      if @point<@num
        @point +=1
        @items[@point] = item
        true
      else
        false
      end
    end
  end
end

class BasicBC
  def initialize(queue)
    @queue = queue
  end
end

class Barber < BasicBC

  def initialize(queue)
    super(queue)
    @workSem = Mutex.new
    @workSem.synchronize{ @isWork = false }
    @sem = Mutex.new
  end

  def work
    @workSem.synchronize{ @isWork = true }
    while true
      item = @queue.pop
      if item
        item.begin_cut
        sleep(rand(10)/1000.0)
        item.end_cut
      else
        @workSem.synchronize{ @isWork = false }
        puts "Barber Sleep."
        break
      end
    end
  end

  def wakeup
    work
  end

  def work?
    @workSem.synchronize{ @isWork }
  end

  def semaphore
    @sem
  end

end

class Customer < BasicBC
  @@num = 0
  def initialize(queue, barber)
    super(queue)
    @num = @@num+=1
    @barber = barber
  end

  def get_cut
    if @barber.work?
      unless @queue.push(self)
        puts "C##{@num} left."
      else
        puts "C##{@num} wait."
      end
    else
      @barber.semaphore.synchronize do
        @queue.push(self)
        puts "C##{@num} Wakeup"
        @barber.wakeup
      end
    end
  end

  def begin_cut
    puts "C##{@num} Begin"
  end

  def end_cut
    puts "Finish #{@num}"
  end

end

queue = BCQueue.new(5)

barber = Barber.new(queue)

barber.work

customers = []
100.times do
  customers << Thread.new do
    sleep(rand(100)/1000.0)
    customer = Customer.new(queue, barber)
    customer.get_cut
  end
end

customers.each {|c| c.join}

 bc.erl:

-module(bc).
-export([main/0]).

main() ->
  forTest(10, 10)
  .

forTest(N,N) ->
  test(N);
forTest(M, N) ->
  test(M),
  forTest(M+1, N).

test(N) ->
  spawn(
    fun() ->
        Q = spawn(fun() -> queue({run, [], N})end),
      T = N,
      B = spawn(fun() -> barber({run, Q}) end),
      forCP(1, T+3, fun()->
          spawn(fun() -> customer({run, Q, B}) end)
      end)
    end).

forCP(N, N, F) ->
  F();
forCP(M, N, F) ->
  F(),
  forCP(M+1, N, F).

queue({run, Items, N}) ->
  receive
    {From, Opt} ->
      NI = queue([From, Opt, Items, N]),
      queue({run, NI, N})
  end;

%Empty
queue([From, {pop, Time}, [], _]) ->
  %io:format("Pop:NULL~n",[]),
  From ! {self() , {empty}, Time},
  [];

%Data
queue([From, {pop, Time}, L, _]) ->
  %io:format("Pop:~p~n",[T]),
  queue([From, {popReverse, Time}, L]);
queue([From, {popReverse, Time}, L]) ->
  [T|NewL] = lists:reverse(L),
  From ! {self(), {data, T}, Time},
  lists:reverse(NewL);


queue([From, {push, Data}, NI, N]) ->
  %io:format("New item:~p~n",[Data]),
  if
    length(NI) < N ->
      From ! {self(), ok},
      [Data|NI];
    length(NI) >= N ->
      From ! {self(), full},
      NI
  end.

barber({run, Queue}) ->
  Time = now(),
  Queue ! {self(), {pop, Time}},
  %io:format("Send pop from ~p~n",[self()]),
  barber({run, Queue, Time});

barber({run, Queue, Time}) ->
  %io:format("Waiting request from ~p~n", [Time]),
  receive
    {Queue, {empty}, Time} ->
      barber({sleep, Queue});
    {Queue, {data, T}, Time} ->
      barber({send, T, Queue});
    {Queue, _, _} ->
      %io:format("Old request coming at ~p~n",[NewTime]),
      barber({run, Queue, Time});
    {From, require} ->
      %io:format("Working in Require~n",[]),
      %From ! {self(), working},
      barber({send, From, Queue})
  end;


barber({sleep, Queue}) ->
  io:format("Barber sleep~n",[]),
  receive
    {From, require} ->
      %io:format("Require from ~p~n",[From]),
      barber({send, From, Queue})
  end;

barber({send, D, Queue})->
  %io:format("Pop from queue~n",[]),
  D ! {self(), cutting},
  barber({data, D, Queue});

barber({data, D, Queue}) ->
  receive
    {D , finish} ->
      io:format("Barber worked for ~p~n", [D]),
      barber({run, Queue});
    {From, require} ->
      From ! {self(), working},
      barber({data, D, Queue})
  end.

customer({run, Queue, Barber}) ->
  Barber ! {self(), require},
  %io:format("~p send require to ~p ~n",[self(), Barber]),
  customer({wait,Queue, Barber});

customer({working, Queue, Barber}) ->
  Queue ! {self(), {push,self()}},
  receive
    {_, full} ->
      io:format("~p Left~n",[self()]);
    {_, ok} ->
      io:format("~p waiting~n",[self()]),
      customer({wait,Queue, Barber})
  end;

customer({wait, Queue, Barber}) ->
  receive
    {Barber, cutting} ->
      %io:format("Survive to ~p~n", [self()]),
      Barber ! {self(), finish};
    {Barber, working} ->
      customer({working, Queue, Barber})
  end.

 bc.erl: 利用时间戳控制过期信息。从这个例子中,我们可以看到在编写消息传递(利用邮箱系统交互)时需要注意消息的对应和实时性。

哲学家问题

ph.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

class Kit
  def initialize(num)
    @size = num
    @kits = [true] * @size
    @sem = Mutex.new
  end

  def require(l)
    r = (l+1) % @size
    @sem.synchronize do
      if @kits[l] and @kits[r]
        @kits[l] = @kits[r] = false
        true
      else
        false
      end
    end
  end

  def release(l)
    r = (l+1) % @size
    @sem.synchronize do
      @kits[l] = @kits[r] = true
    end
  end

end

class Phils
  # The num begin from zero!
  def initialize(num, kits)
    @num = num
    @kits = kits
  end

  def live
    100.times do
      think
      eat
    end
  end

  def eat
    while not @kits.require(@num)
      puts "#{@num} Wait Eating"
      sleep(rand(100)/1000.0)
    end
    puts "#{@num} Eating"
    sleep(rand(100)/1000.0)
    @kits.release(@num)
  end

  def think
    puts "#{@num} Thinking"
    sleep(rand(100)/1000.0)
  end

end

N = 5
kit = Kit.new(N)

phs = []
N.times do |i|
  phs<<Thread.new do
    person = Phils.new(i, kit)
    person.live
  end
end

phs.each {|t| t.join}

 ph.erl:

-module(ph).
-export([main/0]).

main() ->
  KitsArray = array:set(0, true,array:set(4, true,array:set(3, true,array:set(2, true,array:set(1, true, array:new(5)) ) ) ) ),
  K = spawn(fun() -> kits(run, KitsArray) end),
  spawn(fun() -> philosopher(run, K, 4, 10) end),
  spawn(fun() -> philosopher(run, K, 3, 10) end),
  spawn(fun() -> philosopher(run, K, 2, 10) end),
  spawn(fun() -> philosopher(run, K, 1, 10) end),
  spawn(fun() -> philosopher(run, K, 0, 10) end).

kits(run, KitsArray) ->
  Size = array:size(KitsArray),
  receive
    {From, require, N} ->
      L = array:get(N, KitsArray),
      R = array:get((N+1)rem Size, KitsArray),
      kits(From, L andalso R, KitsArray, N);
    {_, release, N} ->
      kits(release, KitsArray, N)
  end.

kits(From, true, KitsArray, N) ->
  From ! ok,
  L = N,
  R = (N+1) rem array:size(KitsArray),
  kits(run, array:set(R, false, array:set(L, false, KitsArray)));

kits(From, false, KitsArray, _) ->
  From ! wait,
  kits(run, KitsArray).

kits(release, KitsArray, N) ->
  L = N,
  R = (N+1) rem array:size(KitsArray),
  kits(run, array:set(R, true, array:set(L, true, KitsArray))).

philosopher(run,_, _, 0) ->
  true;
philosopher(run,Kits, N, T) ->
  philosopher(thinking,Kits, N, T);

philosopher(thinking,Kits, N, T) ->
  io:format("~p(~p) is thinking~n", [N, T]),
  sleep(10),
  philosopher(eating,Kits, N, T);

philosopher(eating,Kits, N, T) ->
  Kits ! {self(), require, N},
  receive
    wait ->
      io:format("~p(~p) is waiting~n", [N, T]),
      sleep(5),
      philosopher(eating, Kits, N, T);
    ok ->
      io:format("~p(~p) is eating~n", [N, T]),
      sleep(8),
      Kits ! {self(), release, N},
      philosopher(run, Kits,N, T -1)
  end.

sleep(T) ->
  receive
  after T ->
      true
  end.

 在这里,我们需要重新思考一下我们在编写程序时需要注意的问题。其实这不突然,我们在编写上面两个问题的演示解答时只注重了正确性,而在实际应用中还要考虑很多问题。如果说这些问题只是个简单处理也行,但是面对一些情况(譬如本例的效率问题时)和问题时,我们甚至需要换一种解决方法。
正确性、健壮性、可靠性、效率、易用性、可读性(可理解性)、可扩展性、可复用性、兼容性、可移植性
而在编写这个程序时,我们需要考虑效率问题,毕竟这个问题与上面不同。上面两个问题都有自己的问题:效率瓶颈(货物仓库、理发师自己和椅子),但是本问题中的资源是分散的,不存在绝对的中心。但是我们在解决这个问题的时候,有可能引入一些瓶颈元素:管程、集中资源分配等。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics