论坛首页 编程语言技术论坛

Ruby on Rails + AJAX + Mongrel + JMS/MQ/MOM = 即时通讯

浏览 16020 次
该帖已经被评为精华帖
作者 正文
   发表时间:2007-07-02  
前文研究过利用 Flash XMLSocket 作即时通讯的 Juggernaut,虽然它可以简易的做到即时通讯,但使用非 HTTP 通讯和非开放标准这两点很让人在意,有满有其他的解决方案?

ActiveMQ 的 AJAX Client

在思考的过程中,发觉我想做的不正是使用消息通讯的软件吗?这类软件己经有一个很好的 Model:Messaging Oriented Middleware(MOM)。接着我跑到 Apache ActiveMQ 的网站研究有没有可用的方案。果然,在 AMQ 的源码中就有一个 AJAX 的 messaging client。

ActiveMQ 是开源的 JMS 专案,技术成熟而且被很多其他产品应用。它的 AJAX Client 利用连接 Server 上的 Messaging Servlet ,由Servlet 再向 AMQ 发送讯息。等等,AJAX 的 A 不就是非同步的意思吗?怎样用 AJAX 做到即时通讯?答案是结合 Poll 和 AJAX。Browser 首先用 AJAX 连接到 Servlet (Poll) ,如果有讯息就会即时把讯息返回,如果没有讯息就会让 client 等待,直到指定的时限(约30秒、比一般 browser 和 socket 的 timeout 短 )。每个 AJAX request 完结后 client 会立即重试,这样 client 便可以得到有如 push 的效果了。比起一般 Server Push 的 Comet ,这种做法更符合一般 HTTP 的 Request - Response 模式,由于与普通 HTTP 无异,一般的 Firewall 和 Browser 也支援。

在 Rails 中的 AJAX Poll

ActiveMQ 的方法虽好,但它是一个 Servlet ,我们难道要另架一台 Java Application Server?就算不介意架这台 Server ,还要用 proxy 之类的方法解决 AJAX 不能 Cross-Domain 的问题。有没有方法只用 Rails 就做到这种 AJAX Poll 的效果?

答案是 No and Yes。用普通的方法 Rails 不能做到以上效果。Rails 是一个单线程的环境 ,也就是说全部 Request 也由同一支 Thread 控制。在以上情境中“叫 Client 在 Request 中等待直至有信息”等于叫整个 server 停下来。幸好,Rails 单线程不等于 Server 也必须单线程。Adam的 Comet with Rails + Mongrel http://adam.blogs.bitscribe.net/2007/05/08/comet-with-rails-mongrel/ 中示范了怎样用自订 HttpHandler 让 Rails 可以持续地跟 Client 连接。我们可以在 Browser 呼叫 AJAX,在 Mongrel 的 HttpHandler 中把这些 Request 转成向 MOM 的呼叫,把消息化为 HTTP Response 送回 browser,如下图。



理论差不多了,看看怎样实作吧!

在 Rails 和 Mongrel 作即时股票报价软件

ActiveMQ 的 Web Demo 包括一个股票报价软件。它分成 Consumer 和 Producer 两部份,Consumer 是收取服价资料的 Client ,Producer 则是发报股价的 Server。我们的目标就是要在 Rails 实作这个软件。

1. 环境设定

这个 project 使用 Apache ActiveMQ 作 MOM,使用 Mongrel 作 Rails Server,使用 Stomp 连接这两者。安装 AMQ 很容易,只需去它的下载网站下载就行了,解压后在 AMQ 的 folder 输入以下指令可以启动 AMQ:

./bin/activemq


接着我们安装 Mongrel 和 Stomp:

gem install mongrel
gem install stomp


当然我们需要一个 Rails project

rails Messaging


准备工作完成,可以写程式了!
2. Producer - 股票报价的消息源

首先我们作一个 Controller "portfolio" 给这个 project,我们分别用 producer 和 index 两个 action 代表 Producer 和 Consumer。

cd Messaging
./script/generate controller portfolio index producer

Producer 的工作是找出最新的股价,再把它们送去 AMQ。 Producer 的主要源码如下:

档案一:app/views/layouts/portfolio.rhtml
<html>
<head>
<title>My Portfolio</title>
<link rel="stylesheet" type="text/css" href="/stylesheets/style.css"></link>
<%= javascript_include_tag :defaults %>
<%= yield :scripts %>
<style>
      .stocks                { border: 1 solid black; }
      .stocks td, .stocks th { width: 200; text-align: right; }
      .stocks .up            { background-color: #9f9; }
      .stocks .down          { background-color: #f99; }
    </style>
</head>
<body>

<%= yield %>


档案二:app/view/portfolio/producer.rhtml
<% content_for("scripts") do %>
<meta http-equiv='refresh' content='2'/>
<% end %>

				
<h1>Portfolio Producer</h1>

Stock updated (<%= @counter %>), next update in 2 seconds.
<ul>
<% @prices.each() { |stock, price| %>
	<li><%= stock %> - <%= price %> </li>
<% } %>
</ul>



portfolio.rhtml 是 profolio controller 的 layout ,包括基本的 CSS 和 script。producer.rhtml 是 producer 的 view,显示股价更新次数和最新股价。在 HTML HEAD 中我加入了一句 META REFRESH,目的是定期更新页面,模拟真实股价更新的情况。

档案三:app/controllers/portfolio_controller.rb
# produce stock price update, and send to MQ using STOMP
  def producer  
    @counter = $counter += 1
    @prices = $prices
    conn = Stomp::Connection.new '', '', 'localhost', 61613, false
    
    begin
      STOCKS.each() { |stock|
        out = create_stock_text(stock)
        conn.send '/topic/STOCKS.' + stock.upcase , out, {'persistent'=>'false'}
      }
    ensure
      conn.disconnect
    end
  end


portfolio_controller.rb 中的 producer action 就是股价提供者:

    * Stomp::Connection.new 向 AMQ Server 开启一个新的 STOMP 连接。
    * create_stock_text(stock) 会找出最新的股价。这里我们随机计出数值和变动,实际上这会由报价机构提供
    * conn.send 把讯息传去 AMQ 上指定的 topic 里,所有订阅了这个 topic 的 client 也会收到这个讯息。

完成这几个档案后,只需要启动 Rails 、 AMQ ,再把 browser 指向 http://127.0.0.1:3000/portfolio/producer 就可以看到如下的画面:


不错,但究竟是否真的有 message 产生呢?我们可以手动连接上 AMQ 看。只要开启一个 terminal 输入以下指令:

telnet localhost 61613
Trying ::1...
Connected to localhost.
Escape character is '^]'.


看到 connected 的字句表示己经用 Stomp 连结上了。接着输入以下指令 (^@ 是 ASCII 的 Null Character,可用 Ctrl + @ 输入):

CONNECT

^@
SUBSCRIBE
destination: /topic/STOCKS.*

^@


这会连接上 AMQ 并收取所有 /topic/STOCKS.* 的讯息。如果己经开启了 Producer ,可以收到类似下面的讯息:

MESSAGE
destination:/topic/STOCKS.IBMW
content-type:text/plain; charset=UTF-8
content-length:83
timestamp:1183307032064
priority:0
expires:0
message-id:ID:core.local-59817-1183285008394-3:735:-1:1:1

<price stock='IBMW' bid='35.9353301170448' offer='35.9712654471618' movement='up'/>


有消息发布者,当然也要有消息接收者吧!

3. Consumer - 用 Mongrel 的 HttpHandler 作股票机

我们把 /portfolio/index 设定为 consumer 。它的源码如下:
档案四:app/view/portfolio/index.rhtml
<% content_for("scripts") do %>
<script type="text/javascript" src="/javascripts/amq.js"></script>
<script type="text/javascript">amq.uri='/amq';</script>
<script type="text/javascript" src="/javascripts/portfolio.js"></script>
<% end %>

<h1>My Portfolio</h1>

<p>This example displays an example stock portfolio. In a real system
this page would be generated dynamically based on the users current
stock portfolio</p>

<table class="stocks">
 <tr>
  <th>Stock</th>
  <th>Description</th>
  <th>Amount</th>
  <th>Price</th>
  <th>Value</th>
  <th>Cost</th>
  <th>P & L</th>
 </tr>
 <tr id="IBMW">
  <td>IBMW</td>
  <td>IBM Stock</td>
  <td id="amount">1000</td>
  <td id="price"></td>
  <td id="value"></td>
  <td id="cost">19000</td>
  <td id="pl"></td>
 </tr>
 <tr id="MSFT">
  <td>MSFT</td>
  <td>Microsoft</td>
  <td id="amount">6000</td>
  <td id="price"></td>
  <td id="value"></td>
  <td id="cost">22000</td>
  <td id="pl"></td>
 </tr>
 <tr id="BEAS">
  <td>BEAS</td>
  <td>BEA Stock</td>
  <td id="amount">1100</td>
  <td id="price"></td>
  <td id="value"></td>
  <td id="cost">12342</td>
  <td id="pl"></td>
 </tr>
 <tr id="SUNW">
  <td>SUNW</td>
  <td>Sun Microsystems Inc</td>
  <td id="amount">3000</td>
  <td id="price"></td>
  <td id="value"></td>
  <td id="cost">7700</td>
  <td id="pl"></td>
 </tr>
</table>

<a href="/portfolio/producer" target="_blank"><p>Start a Portfolio Producer</p></a>

</body>
</html>


这取自 ActiveMQ 的 Web Demo。要注意的是最顶部的三句 javascript。首先它会载入 amq.js ,再把 amq 的 URL 设为 /amq ,接着再载入 portfolio.js。

档案五:public/javascripts/portfolio.js
var priceHandler = 
{
  _price: function(message) 
  {
    if (message != null) {
		
      var price = parseFloat(message.getAttribute('bid'))
      var symbol = message.getAttribute('stock')
      var movement = message.getAttribute('movement')
      if (movement == null) {
	        movement = 'up'
      }
	    
      var row = document.getElementById(symbol)
      if (row) {
		    // perform portfolio calculations
		    var value = asFloat(find(row, 'amount')) * price
		    var pl = value - asFloat(find(row, 'cost'))
		    
		    // now lets update the HTML DOM
		    find(row, 'price').innerHTML = fixedDigits(price, 2)
		    find(row, 'value').innerHTML = fixedDigits(value, 2)
		    find(row, 'pl').innerHTML    = fixedDigits(pl, 2)
		    find(row, 'price').className = movement
		    find(row, 'pl').className    = pl >= 0 ? 'up' : 'down'
      }
    }
  }
};


function portfolioPoll(first)
{
   if (first)
   {
     amq.addListener('stocks','/topic/STOCKS.*',priceHandler._price);
   }
}

amq.addPollHandler(portfolioPoll);


/// -----------------
// Original code by Joe Walnes
// -----------------

/*** Convenience methods, added as mixins to standard classes (object prototypes) ***/

/**
 * Return number as fixed number of digits. 
 */
function fixedDigits(t, digits) {
    return (t.toFixed) ? t.toFixed(digits) : this
}

/** 
 * Find direct child of an element, by id. 
 */
function find(t, id) {
    for (i = 0; i < t.childNodes.length; i++) {
        var child = t.childNodes[i]
        if (child.id == id) {
            return child
        }
    }
    return null
}

/**
 * Return the text contents of an element as a floating point number. 
 */
function asFloat(t) {
    return parseFloat(t.innerHTML)
}

这同样取自 AMQ,主要是 amq.addListener('stocks','/topic/STOCKS.*',priceHandler._price);,这让 browser 当收到由 /topic/STOCKS.* 传来的讯息时会执行指定的 javascript 更新画面。

档案六:lib/ajax_mq_handler.rb
require 'rubygems'
require 'stomp'
require 'timeout'

URL = '/amq'
CHANNEL = ''
ID = 'stocks'
DEST = '/topic/STOCKS.*'

class AmqHandler < Mongrel::HttpHandler
  RESP_OK = "<ajax-response>\n</ajax-response>"
  TIMEOUT = 30
  
  def logger()
    @@logger = Logger.new(STDOUT) unless defined?(@logger)
    @@logger
  end
  
  def process(request, response)
    logger.debug "handle AJAX request"
    
    query = Mongrel::HttpRequest.query_parse(request.params["QUERY_STRING"])
    method = request.params["REQUEST_METHOD"]
    
    if method == 'POST'
      process_post(request, response, query)
    else
      process_get(request, response, query)
    end
  end
  
  # Handle POST, use to send message
  # Not implemented
  # TODO: Implement it when needed
  def process_post(request, response, query)
    logger.debug "post method not supported"
    write_response(response, "<message type='WARN' value='Method not supported'/>")
  end
  
  # Handle GET, use to receive message
  def process_get(request, response, query)
    begin
      logger.debug "connecting stomp"
      conn = Stomp::Connection.new '', '', 'localhost', 61613, false 
      conn.subscribe DEST, { :ack =>"auto" }
    rescue
      logger.error "Error connecting server... #{$!}\n"
      write_response(response,  "<message type='ERROR' value=#{Mongrel::HttpRequest.escape($!)}/>")
      return
    end
    
    begin
      msg = ""
      timeout = query['timeout'].nil? ? TIMEOUT : (query['timeout'].to_i / 1000)      
      Timeout::timeout(timeout + 0.1) {
        msg = receive_messages(conn)
      }
      logger.debug "output: #{msg}"
      
      write_response(response,  msg)      
    rescue Timeout::Error      
      logger.debug "timeout occurred"
      write_response(response, "<message type='TIMEOUT'/>")
    rescue
      logger.error "an error occurred: #{$!}"      
      write_response(response, "<message type='ERROR' value='#{Mongrel::HttpRequest.escape($!)}'/>")
    end
  end
  
  private   
  # Return all pending message
  def receive_messages(conn)
    msg = conn.receive    
    output = msg.body    
    begin
      Timeout::timeout(0.01) {
        while !msg.nil?
          output += msg.body
          msg = conn.receive
        end
      }
    rescue Timeout::Error
      # do nothing, timeout is expected
    end
  
    return output
  end
  
  def write_response(response, output)
    response.start(200) do |head, out|    
      head["Content-Type"] = "text/xml"
      out.write "<ajax-response>\n<response id='#{ID}' destination='#{DEST}'>"
      out.write output
      out.write "</response>\n</ajax-response>"
    end
  end
end

uri URL, :handler => AmqHandler.new, :in_front => true



终于到了最重要的档案了,ajax_mq_handler.rb 延申了 Mongrel::HttpHandler,Mongrel 会把 Request 和 Response object 交给 Handler。它会如 producer 一样连接去 AMQ。不同的是它只接收讯息 (技术上可以做到发讯息,但基于安全和懒的理由暂时不作这部份)。这里使用了 Timeout ,在 30 秒内没有新讯息的话它会返回 timeout 的讯息。如果在 timeout 前有讯息,它会尝试返回全部讯息。编写 HttpHandler 时要留意这并非在 Rails 的环境中,一般 Rails 的 variable 和 helper 这里也用不到,因此尽量只在这里做必要的工作,其他的还是留给 Rails 吧。

虽然造了一个 HttpHandler,但 Mongrel 不会自动去用它。要让 Mongrel 使用这个 Handler,可以在 Mongrel 设定档加入以下指令:

档案七:config/mongrel_conf.yml
:config_script: lib/ajax_mq_handler.rb


再在启动 Server 时使用以下指令:

mongrel_rails start -C ./config/mongrel_conf.yml


是时候看看成果了:



同时把 browser 指向 http://127.0.0.1/portfolio 和 http://127.0.0.1/portfolio/producer ,你会留意到当 producer 更新的时候, consumer 也同时更新了!神奇吗?这是用全开源的东西实作出来的成果啊!
总结

我们成功地在 Rails 中作了一个只用 AJAX 而即时更新的软件,这过程中我们用 ApacheMQ 作 MOM,用 STOMP 去连结 MOM 和 Rails 以及用 Mongrel HttpHandler 去让 AJAX 读取消息。当然这只能算是一个 proof of concept ,如有错漏敬请指正。接着可以想想怎样作一个完整的 plugin 简化这个过程。有甚么建议或想法欢迎指教!

整个 Producer 和 Consumer 的 Rails 软件可以在这里下载http://www.reality.hk/%7Esiuying/software/messaging.tgz,它以 MIT License 形式发布。
   发表时间:2007-07-02  
这样最大的问题就是密集处理会有更大的负荷。

例如,原来的定时刷新3s请求一次网页

按照上述方案,就是有新消息才会更新。如果聊天非常密集,比如3s内说了好多句话,每一次有新消息都会重新向服务器提交请求。如果3s内说了n句话,处于同一个聊天下的所有客户端都会集体就是向服务器请求n次。显然不能满足一些需求。

当然,如果不是那么频繁的话,效果应该还好。我实验过类似的例子,php实现的。不过没有测试负载,只是2,3个人之间实验了一下,体验确实不错。假的comet么。呵呵。但是大量的,频繁的聊天呢?上述情况是我想象的,也没有实验。

等大家一起讨论。
0 请登录后投票
   发表时间:2007-07-02  
如果想做comet的话,shooting_star这个项目非常值得关注。个人感觉他的事件与内容分离的设计非常方便,不过貌似也没有解决“密集处理会有更大的负荷”的问题。

详细的 http://rubyforge.org/projects/shooting-star/
0 请登录后投票
   发表时间:2007-07-02  
poppy 写道
这样最大的问题就是密集处理会有更大的负荷。

例如,原来的定时刷新3s请求一次网页

按照上述方案,就是有新消息才会更新。如果聊天非常密集,比如3s内说了好多句话,每一次有新消息都会重新向服务器提交请求。如果3s内说了n句话,处于同一个聊天下的所有客户端都会集体就是向服务器请求n次。显然不能满足一些需求。

当然,如果不是那么频繁的话,效果应该还好。我实验过类似的例子,php实现的。不过没有测试负载,只是2,3个人之间实验了一下,体验确实不错。假的comet么。呵呵。但是大量的,频繁的聊天呢?上述情况是我想象的,也没有实验。

等大家一起讨论。


还没有考虑处理大量请求的问题。

不过相对于持久连线的 comet,把每个讯息都用请求/回应方式处理不是更容易用群集去解决问题吗?当然了这也是我的想像,没有实作也不知道是否可行!
0 请登录后投票
   发表时间:2007-07-03  
http://eve.woygo.com 我的IM,有兴趣一起研究啊
0 请登录后投票
   发表时间:2007-07-07  
这种方案我以前用过,但最大的问题是,http servlet采用线程来处理每个http request,所以一般只能支持几百个用户同时连接。
0 请登录后投票
   发表时间:2007-07-08  
balaschen,

明白你的意思,不过 activemq 的 ajax client 使用了 Threadless Waiting 技巧。用它的话可得到比普通的 polling 更佳的效能。

当然 Rails/Mongrel 暂时没有这些,但这个架构本身把讯息保存放在 MOM ,把事务逻辑放在 Rails ,Ajax handler 用 Mongrel 作,有需要的时候作 cluster 似乎也可行的。
0 请登录后投票
论坛首页 编程语言技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics