`

多线程典型问题实现(生产者消费者问题、理发师问题与哲学家问题)

阅读更多

本文主要讨论了三个典型的多线程交互问题:生产者消费者问题、理发师问题与哲学家问题。我对上述三个问题做了基本的处理和编程实现(Ruby&Erlang)。

 

生产者消费者问题

各种语言实现本问题: http://dada.perl.it/shootout/prodcons_allsrc.html

./lib/utils.rb:

$KCODE = 'utf8'

require 'thread'
alias putsOld puts

Stdout_mutex = Mutex.new

def puts(*args)
  Stdout_mutex.synchronize do
    putsOld args
    $stdout.flush
  end
end

cp.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

# Two important problem:
# Empty and full
class CPQueue
  def initialize(num)
    @num = num
    @point = 0
    @items = []
    @semaphore = Mutex.new
    @resource = ConditionVariable.new
    @full = ConditionVariable.new
  end

  def pop
    @semaphore.synchronize do
      @resource.wait(@semaphore) while @point<=0
      item = @items[@point]
      @point -= 1
      @full.signal
      item
    end
  end

  def push(item)
    @semaphore.synchronize do
      @full.wait(@semaphore) while @point>=@num
      @point +=1
      @items[@point] = item
      @resource.signal
    end
  end
end

class BasicCP
  def initialize(queue)
    @queue = queue
  end
end

class Consumer < BasicCP
  @@CC = 0
  def consume
    product = @queue.pop
    @@CC += 1
    puts "#{@@CC}:Product #{product} consumed."
  end
end


class Producer < BasicCP
  @@iNum = 1
  def produce
    product = @@iNum
    @@iNum +=1
    @queue.push(product)
    puts "   Product #{product} produced."
  end
end

nn = []
100.times do |i|
  nn[i] = 10
  sized_queue = CPQueue.new(1+rand(nn[i]))

  consumer_threads = []
  (1+rand(nn[i])).times {
    consumer_threads << Thread.new {
      consumer = Consumer.new(sized_queue)
      sleep((rand(nn[i]))/100.0)
      consumer.consume
    }
  }

  (10+rand(3)).times{
    Thread.new {
      producer = Producer.new(sized_queue)
      sleep((rand(nn[i]))/100.0)
      producer.produce
    }
  }
  consumer_threads.each { |thread| thread.join }
end

 

cp.rb: 在考虑多线程程序时,我们会遇到资源问题。从表面上看,我们只会处理一个信号量就是储物柜(Queue),但是在考虑时,我们需要观察这个信号量描述对象的临界点(空的/满了),然后做信号通信。

-module(cp).
-export([main/0]).

main() ->
  forTest(1, 10)
  .

forTest(N,N) ->
  test(N);
forTest(M, N) ->
  test(M),
  forTest(M+1, N).

%num() ->
%  random:seed(),
%  random:uniform(100).

test(N) ->
  spawn(
    fun() ->
      Q = spawn(fun() -> queue({run, [], [] ,[] , N})end),
      T = N,
      forCP(1, T, fun()->
          spawn(fun() -> customer({run, Q}) end)
      end),
      forCP(1, T, fun()->
          spawn(fun() -> producer({run, Q, T}) end)
      end)
    end).

forCP(N, N, F) ->
  F();
forCP(M, N, F) ->
  F(),
  forCP(M+1, N, F).

queue({run, Items, CustomersWL, ProducerWL, N}) ->
  receive
    {From, Opt} ->
      {NI, NC, NP} = queue([From, Opt, Items, CustomersWL, ProducerWL, N]),
      queue({run, NI, NC, NP, N})
  end;

%Empty
queue([From, {pop}, [], C, P, _]) ->
  From ! {self() , {wait}},
  {[], [From|C], P};

%Data
queue([From, {pop}, [T|L], NC, NP, _]) ->
  From ! {self(), {data, T}},
  %Remove From in NC
  %Call NP
  isOk(NP),
  {L, NC, NP};

queue([From, {push, Data}, NI, NC, NP, N]) ->
  if
    length(NI) < N ->
      From ! {self(), {ok}},
      %Remove From in NP
      %Call NC
      isOk(NC),
      {[Data|NI], NC, NP};
    length(NI) >= N ->
      From ! {self(), {full}},
      {NI, NC, [From|NP]}
  end.

customer({run, Queue}) ->
  Queue ! {self(), {pop}},
  receive
    {NewQueue, {wait}} -> customer({wait, NewQueue});
    {_, {data, T}} -> customer({data, T})
  end;

customer({wait, _}) ->
  receive
    {From, ok} -> customer({run, From})
  end;

customer({data, D}) ->
  io:format("Custome ~p~n", [D]).

producer({run, Queue, N}) ->
  Queue ! {self(),{push, N}},
  receive
    {_, {ok}} -> io:format("Produce ~p~n", [N]);
    {NewQueue, {full}} -> producer({full, NewQueue, N})
  end;

producer({full, _, N}) ->
  receive
    {From, ok} -> producer({run, From, N})
  end.

isOk([])->
  {ok};
isOk([T|L])->
  T ! {self(), ok},
  isOk(L).

 理发师问题

bc.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

class BCQueue
  def initialize(num)
    @num = num
    @point = 0
    @items = []
    @semaphore = Mutex.new
  end

  def pop
    @semaphore.synchronize do
      if @point > 0
        item = @items[@point]
        @point -= 1
        item
      else
        nil
      end
    end
  end

  def push(item)
    @semaphore.synchronize do
      if @point<@num
        @point +=1
        @items[@point] = item
        true
      else
        false
      end
    end
  end
end

class BasicBC
  def initialize(queue)
    @queue = queue
  end
end

class Barber < BasicBC

  def initialize(queue)
    super(queue)
    @workSem = Mutex.new
    @workSem.synchronize{ @isWork = false }
    @sem = Mutex.new
  end

  def work
    @workSem.synchronize{ @isWork = true }
    while true
      item = @queue.pop
      if item
        item.begin_cut
        sleep(rand(10)/1000.0)
        item.end_cut
      else
        @workSem.synchronize{ @isWork = false }
        puts "Barber Sleep."
        break
      end
    end
  end

  def wakeup
    work
  end

  def work?
    @workSem.synchronize{ @isWork }
  end

  def semaphore
    @sem
  end

end

class Customer < BasicBC
  @@num = 0
  def initialize(queue, barber)
    super(queue)
    @num = @@num+=1
    @barber = barber
  end

  def get_cut
    if @barber.work?
      unless @queue.push(self)
        puts "C##{@num} left."
      else
        puts "C##{@num} wait."
      end
    else
      @barber.semaphore.synchronize do
        @queue.push(self)
        puts "C##{@num} Wakeup"
        @barber.wakeup
      end
    end
  end

  def begin_cut
    puts "C##{@num} Begin"
  end

  def end_cut
    puts "Finish #{@num}"
  end

end

queue = BCQueue.new(5)

barber = Barber.new(queue)

barber.work

customers = []
100.times do
  customers << Thread.new do
    sleep(rand(100)/1000.0)
    customer = Customer.new(queue, barber)
    customer.get_cut
  end
end

customers.each {|c| c.join}

 bc.erl:

-module(bc).
-export([main/0]).

main() ->
  forTest(10, 10)
  .

forTest(N,N) ->
  test(N);
forTest(M, N) ->
  test(M),
  forTest(M+1, N).

test(N) ->
  spawn(
    fun() ->
        Q = spawn(fun() -> queue({run, [], N})end),
      T = N,
      B = spawn(fun() -> barber({run, Q}) end),
      forCP(1, T+3, fun()->
          spawn(fun() -> customer({run, Q, B}) end)
      end)
    end).

forCP(N, N, F) ->
  F();
forCP(M, N, F) ->
  F(),
  forCP(M+1, N, F).

queue({run, Items, N}) ->
  receive
    {From, Opt} ->
      NI = queue([From, Opt, Items, N]),
      queue({run, NI, N})
  end;

%Empty
queue([From, {pop, Time}, [], _]) ->
  %io:format("Pop:NULL~n",[]),
  From ! {self() , {empty}, Time},
  [];

%Data
queue([From, {pop, Time}, L, _]) ->
  %io:format("Pop:~p~n",[T]),
  queue([From, {popReverse, Time}, L]);
queue([From, {popReverse, Time}, L]) ->
  [T|NewL] = lists:reverse(L),
  From ! {self(), {data, T}, Time},
  lists:reverse(NewL);


queue([From, {push, Data}, NI, N]) ->
  %io:format("New item:~p~n",[Data]),
  if
    length(NI) < N ->
      From ! {self(), ok},
      [Data|NI];
    length(NI) >= N ->
      From ! {self(), full},
      NI
  end.

barber({run, Queue}) ->
  Time = now(),
  Queue ! {self(), {pop, Time}},
  %io:format("Send pop from ~p~n",[self()]),
  barber({run, Queue, Time});

barber({run, Queue, Time}) ->
  %io:format("Waiting request from ~p~n", [Time]),
  receive
    {Queue, {empty}, Time} ->
      barber({sleep, Queue});
    {Queue, {data, T}, Time} ->
      barber({send, T, Queue});
    {Queue, _, _} ->
      %io:format("Old request coming at ~p~n",[NewTime]),
      barber({run, Queue, Time});
    {From, require} ->
      %io:format("Working in Require~n",[]),
      %From ! {self(), working},
      barber({send, From, Queue})
  end;


barber({sleep, Queue}) ->
  io:format("Barber sleep~n",[]),
  receive
    {From, require} ->
      %io:format("Require from ~p~n",[From]),
      barber({send, From, Queue})
  end;

barber({send, D, Queue})->
  %io:format("Pop from queue~n",[]),
  D ! {self(), cutting},
  barber({data, D, Queue});

barber({data, D, Queue}) ->
  receive
    {D , finish} ->
      io:format("Barber worked for ~p~n", [D]),
      barber({run, Queue});
    {From, require} ->
      From ! {self(), working},
      barber({data, D, Queue})
  end.

customer({run, Queue, Barber}) ->
  Barber ! {self(), require},
  %io:format("~p send require to ~p ~n",[self(), Barber]),
  customer({wait,Queue, Barber});

customer({working, Queue, Barber}) ->
  Queue ! {self(), {push,self()}},
  receive
    {_, full} ->
      io:format("~p Left~n",[self()]);
    {_, ok} ->
      io:format("~p waiting~n",[self()]),
      customer({wait,Queue, Barber})
  end;

customer({wait, Queue, Barber}) ->
  receive
    {Barber, cutting} ->
      %io:format("Survive to ~p~n", [self()]),
      Barber ! {self(), finish};
    {Barber, working} ->
      customer({working, Queue, Barber})
  end.

 bc.erl: 利用时间戳控制过期信息。从这个例子中,我们可以看到在编写消息传递(利用邮箱系统交互)时需要注意消息的对应和实时性。

哲学家问题

ph.rb:

$KCODE = 'utf8'

require '../lib/utils.rb'

class Kit
  def initialize(num)
    @size = num
    @kits = [true] * @size
    @sem = Mutex.new
  end

  def require(l)
    r = (l+1) % @size
    @sem.synchronize do
      if @kits[l] and @kits[r]
        @kits[l] = @kits[r] = false
        true
      else
        false
      end
    end
  end

  def release(l)
    r = (l+1) % @size
    @sem.synchronize do
      @kits[l] = @kits[r] = true
    end
  end

end

class Phils
  # The num begin from zero!
  def initialize(num, kits)
    @num = num
    @kits = kits
  end

  def live
    100.times do
      think
      eat
    end
  end

  def eat
    while not @kits.require(@num)
      puts "#{@num} Wait Eating"
      sleep(rand(100)/1000.0)
    end
    puts "#{@num} Eating"
    sleep(rand(100)/1000.0)
    @kits.release(@num)
  end

  def think
    puts "#{@num} Thinking"
    sleep(rand(100)/1000.0)
  end

end

N = 5
kit = Kit.new(N)

phs = []
N.times do |i|
  phs<<Thread.new do
    person = Phils.new(i, kit)
    person.live
  end
end

phs.each {|t| t.join}

 ph.erl:

-module(ph).
-export([main/0]).

main() ->
  KitsArray = array:set(0, true,array:set(4, true,array:set(3, true,array:set(2, true,array:set(1, true, array:new(5)) ) ) ) ),
  K = spawn(fun() -> kits(run, KitsArray) end),
  spawn(fun() -> philosopher(run, K, 4, 10) end),
  spawn(fun() -> philosopher(run, K, 3, 10) end),
  spawn(fun() -> philosopher(run, K, 2, 10) end),
  spawn(fun() -> philosopher(run, K, 1, 10) end),
  spawn(fun() -> philosopher(run, K, 0, 10) end).

kits(run, KitsArray) ->
  Size = array:size(KitsArray),
  receive
    {From, require, N} ->
      L = array:get(N, KitsArray),
      R = array:get((N+1)rem Size, KitsArray),
      kits(From, L andalso R, KitsArray, N);
    {_, release, N} ->
      kits(release, KitsArray, N)
  end.

kits(From, true, KitsArray, N) ->
  From ! ok,
  L = N,
  R = (N+1) rem array:size(KitsArray),
  kits(run, array:set(R, false, array:set(L, false, KitsArray)));

kits(From, false, KitsArray, _) ->
  From ! wait,
  kits(run, KitsArray).

kits(release, KitsArray, N) ->
  L = N,
  R = (N+1) rem array:size(KitsArray),
  kits(run, array:set(R, true, array:set(L, true, KitsArray))).

philosopher(run,_, _, 0) ->
  true;
philosopher(run,Kits, N, T) ->
  philosopher(thinking,Kits, N, T);

philosopher(thinking,Kits, N, T) ->
  io:format("~p(~p) is thinking~n", [N, T]),
  sleep(10),
  philosopher(eating,Kits, N, T);

philosopher(eating,Kits, N, T) ->
  Kits ! {self(), require, N},
  receive
    wait ->
      io:format("~p(~p) is waiting~n", [N, T]),
      sleep(5),
      philosopher(eating, Kits, N, T);
    ok ->
      io:format("~p(~p) is eating~n", [N, T]),
      sleep(8),
      Kits ! {self(), release, N},
      philosopher(run, Kits,N, T -1)
  end.

sleep(T) ->
  receive
  after T ->
      true
  end.

 在这里,我们需要重新思考一下我们在编写程序时需要注意的问题。其实这不突然,我们在编写上面两个问题的演示解答时只注重了正确性,而在实际应用中还要考虑很多问题。如果说这些问题只是个简单处理也行,但是面对一些情况(譬如本例的效率问题时)和问题时,我们甚至需要换一种解决方法。
正确性、健壮性、可靠性、效率、易用性、可读性(可理解性)、可扩展性、可复用性、兼容性、可移植性
而在编写这个程序时,我们需要考虑效率问题,毕竟这个问题与上面不同。上面两个问题都有自己的问题:效率瓶颈(货物仓库、理发师自己和椅子),但是本问题中的资源是分散的,不存在绝对的中心。但是我们在解决这个问题的时候,有可能引入一些瓶颈元素:管程、集中资源分配等。

分享到:
评论

相关推荐

    多线程代码 经典线程同步互斥问题 生产者消费者问题

    a: 创建一个线程 ...h: problem1 生产者消费者问题 (1生产者 1消费者 1缓冲区) problem1 more 生产者消费者问题 (1生产者 2消费者 4缓冲区) problem2 读者与写着问题 I: 信号量 semaphore 解决线程同步问题

    多线程简易实现生产者消费者模式

    6. **启动线程**:创建多个生产者和消费者线程,然后启动它们,线程会按照规则交替执行,实现生产与消费的平衡。 在提供的文件"stack"中,可能包含了一个使用栈作为缓冲区的生产者消费者模式实现。栈具有后进先出...

    使用多线程程序模拟实现单生产者/多消费者问题(Linux下C语言)。

    使用多线程程序模拟实现单生产者/多消费者问题。 要求“生产者”随机产生一个整数,“消费者 1”将这个整数加 1 后输出,“消 费者 2”将这个整数加 2 后输出,“消费者 3”将这个整数加 3 后输出,“消 费者 4”将...

    Java多线程实现生产者消费者

    本示例中的“生产者-消费者”模型是一种经典的多线程问题,它模拟了实际生产环境中的资源分配与消耗过程。下面我们将详细探讨如何在Java中实现这个模型。 首先,我们要理解生产者-消费者模型的基本概念。在这个模型...

    生产者和消费者问题以及哲学家就餐问题,JAVA实现的程序.rar

    生产者和消费者问题以及哲学家就餐问题是在多线程编程中常常被用来讨论并发控制的经典示例。这两个问题都涉及到资源的共享与访问同步,它们是理解并发编程中线程安全性和互斥机制的重要概念。 生产者消费者问题是多...

    JAVA实现线程间同步与互斥生产者消费者问题

    本项目通过一个生产者消费者问题的实例,展示了如何在Java中实现线程间的同步与互斥。 生产者消费者问题是经典的并发问题之一,它涉及到两个类型的线程:生产者和消费者。生产者负责生成数据(产品),而消费者则...

    java多线程实现生产者消费者问题

    用java多线程,实现生产者消费者同步和互斥的问题,操作系统中的经典问题

    12.2 Qt5多线程:使用信号量实现生产者和消费者

    生产者-消费者问题是并发编程中的一个经典模型,它描述了两个或多个线程之间的协作,其中一个或多个线程(生产者)负责生成数据,而其他线程(消费者)则负责处理这些数据。在实际应用中,这可以对应于例如数据生成...

    Linux c语言多线程实现生产者/消费者问题

    一组生产者线程与一组消费者线程通过缓冲区发生联系。生产者线程将生产的产品送入缓冲区,消费者线程则从中取出产品。缓冲区有N 个,是一个环形的缓冲池。 使用命令cc consumer.c -o consumer编译

    Linux 多线程实现生产者消费者模式.pdf

    总结来说,Linux多线程实现生产者消费者模式涵盖了多线程创建、线程同步与互斥、锁机制、条件变量以及线程屏障等核心知识点。这些知识点是Linux下进行高级多线程编程不可或缺的部分,对于理解现代操作系统中多任务...

    线程实现生产者消费者问题

    生产者消费者问题是多线程编程中的一个经典模型,用于演示如何在并发环境中通过共享资源进行协作。在这个模型中,生产者线程负责生成数据,而消费者线程则负责消费这些数据。问题的关键在于如何保证生产者不会在无处...

    操作系统中哲学家就餐问题和生产者消费者问题实验报告

    ### 操作系统中哲学家就餐问题和生产者消费者问题实验报告 #### 一、BACI并发程序设计系统概述 BACI(Berkeley ACI)是一个专为并发编程设计的系统,它允许程序员在诸如C++、C、Java等高级语言中嵌入特定的并发...

    用多线程同步方法解决生产者-消费者问题

    生产者-消费者问题是多线程同步的一个经典案例,主要探讨如何在并发环境下,确保生产者进程和消费者进程之间正确地共享资源,避免数据竞争和死锁。在这个问题中,生产者进程负责创建产品并将产品放入缓冲区,而消费...

    多线程——实现了生产者消费者问题,

    课程大作业啊,绝对真实,用多线程技术实现的,对初学多线程技术的童鞋绝对有帮助,亲,还不下载等什么呢

    java实现多线程经典模型生产者消费

    java实现多线程经典模型生产者消费java实现多线程经典模型生产者消费java实现多线程经典模型生产者消费java实现多线程经典模型生产者消费java实现多线程经典模型生产者消费java实现多线程经典模型生产者消费java实现...

    java多线程实现生产者消费者关系

    在实际应用中,我们常常会遇到一种典型的多线程问题——生产者消费者模型。这个模型描述了两种类型的线程:生产者线程负责创建或生产资源,而消费者线程则负责消耗这些资源。在Java中,我们可以利用同步机制来实现...

    用多线程同步方法解决生产者-消费者问题(操作系统课设

    多线程同步方法解决生产者-消费者问题 多线程同步方法是解决生产者-消费者问题的常用方法。生产者-消费者问题是操作系统中经典的问题之一,它是指在多线程环境下,多个生产者线程和消费者线程访问同一个共享缓冲区...

    电子科大操作系统课程报告信号量哲学家就餐,生产者消费者实验_信号量生产者消费者pv完整代码

    本实验主要研究了两个经典问题:哲学家就餐问题和生产者/消费者问题,通过这两个问题的解决,学生可以深入理解多线程编程、进程同步与互斥的原理以及信号量的应用。 哲学家就餐问题由荷兰计算机科学家埃德加·...

    理发师问题 linux 多线程

    理发师问题是多线程编程中的一个经典示例,它展示了如何在并发环境下处理资源的共享与同步。在Linux操作系统中,我们通常使用C语言和POSIX线程库(pthread)来实现多线程程序。本问题的核心是确保理发师与顾客之间的...

    多线程解决生产者与消费者问题

    本文通过多线程方法解决了生产者与消费者之间同步的问题

Global site tag (gtag.js) - Google Analytics