浏览 16020 次
该帖已经被评为精华帖
|
|
---|---|
作者 | 正文 |
发表时间:2007-07-02
ActiveMQ 的 AJAX Client 在思考的过程中,发觉我想做的不正是使用消息通讯的软件吗?这类软件己经有一个很好的 Model:Messaging Oriented Middleware(MOM)。接着我跑到 Apache ActiveMQ 的网站研究有没有可用的方案。果然,在 AMQ 的源码中就有一个 AJAX 的 messaging client。 ActiveMQ 是开源的 JMS 专案,技术成熟而且被很多其他产品应用。它的 AJAX Client 利用连接 Server 上的 Messaging Servlet ,由Servlet 再向 AMQ 发送讯息。等等,AJAX 的 A 不就是非同步的意思吗?怎样用 AJAX 做到即时通讯?答案是结合 Poll 和 AJAX。Browser 首先用 AJAX 连接到 Servlet (Poll) ,如果有讯息就会即时把讯息返回,如果没有讯息就会让 client 等待,直到指定的时限(约30秒、比一般 browser 和 socket 的 timeout 短 )。每个 AJAX request 完结后 client 会立即重试,这样 client 便可以得到有如 push 的效果了。比起一般 Server Push 的 Comet ,这种做法更符合一般 HTTP 的 Request - Response 模式,由于与普通 HTTP 无异,一般的 Firewall 和 Browser 也支援。 在 Rails 中的 AJAX Poll ActiveMQ 的方法虽好,但它是一个 Servlet ,我们难道要另架一台 Java Application Server?就算不介意架这台 Server ,还要用 proxy 之类的方法解决 AJAX 不能 Cross-Domain 的问题。有没有方法只用 Rails 就做到这种 AJAX Poll 的效果? 答案是 No and Yes。用普通的方法 Rails 不能做到以上效果。Rails 是一个单线程的环境 ,也就是说全部 Request 也由同一支 Thread 控制。在以上情境中“叫 Client 在 Request 中等待直至有信息”等于叫整个 server 停下来。幸好,Rails 单线程不等于 Server 也必须单线程。Adam的 Comet with Rails + Mongrel http://adam.blogs.bitscribe.net/2007/05/08/comet-with-rails-mongrel/ 中示范了怎样用自订 HttpHandler 让 Rails 可以持续地跟 Client 连接。我们可以在 Browser 呼叫 AJAX,在 Mongrel 的 HttpHandler 中把这些 Request 转成向 MOM 的呼叫,把消息化为 HTTP Response 送回 browser,如下图。 理论差不多了,看看怎样实作吧! 在 Rails 和 Mongrel 作即时股票报价软件 ActiveMQ 的 Web Demo 包括一个股票报价软件。它分成 Consumer 和 Producer 两部份,Consumer 是收取服价资料的 Client ,Producer 则是发报股价的 Server。我们的目标就是要在 Rails 实作这个软件。 1. 环境设定 这个 project 使用 Apache ActiveMQ 作 MOM,使用 Mongrel 作 Rails Server,使用 Stomp 连接这两者。安装 AMQ 很容易,只需去它的下载网站下载就行了,解压后在 AMQ 的 folder 输入以下指令可以启动 AMQ: ./bin/activemq 接着我们安装 Mongrel 和 Stomp: gem install mongrel gem install stomp 当然我们需要一个 Rails project rails Messaging 准备工作完成,可以写程式了! 2. Producer - 股票报价的消息源 首先我们作一个 Controller "portfolio" 给这个 project,我们分别用 producer 和 index 两个 action 代表 Producer 和 Consumer。 cd Messaging ./script/generate controller portfolio index producer Producer 的工作是找出最新的股价,再把它们送去 AMQ。 Producer 的主要源码如下: 档案一:app/views/layouts/portfolio.rhtml <html> <head> <title>My Portfolio</title> <link rel="stylesheet" type="text/css" href="/stylesheets/style.css"></link> <%= javascript_include_tag :defaults %> <%= yield :scripts %> <style> .stocks { border: 1 solid black; } .stocks td, .stocks th { width: 200; text-align: right; } .stocks .up { background-color: #9f9; } .stocks .down { background-color: #f99; } </style> </head> <body> <%= yield %> 档案二:app/view/portfolio/producer.rhtml <% content_for("scripts") do %> <meta http-equiv='refresh' content='2'/> <% end %> <h1>Portfolio Producer</h1> Stock updated (<%= @counter %>), next update in 2 seconds. <ul> <% @prices.each() { |stock, price| %> <li><%= stock %> - <%= price %> </li> <% } %> </ul> portfolio.rhtml 是 profolio controller 的 layout ,包括基本的 CSS 和 script。producer.rhtml 是 producer 的 view,显示股价更新次数和最新股价。在 HTML HEAD 中我加入了一句 META REFRESH,目的是定期更新页面,模拟真实股价更新的情况。 档案三:app/controllers/portfolio_controller.rb # produce stock price update, and send to MQ using STOMP def producer @counter = $counter += 1 @prices = $prices conn = Stomp::Connection.new '', '', 'localhost', 61613, false begin STOCKS.each() { |stock| out = create_stock_text(stock) conn.send '/topic/STOCKS.' + stock.upcase , out, {'persistent'=>'false'} } ensure conn.disconnect end end portfolio_controller.rb 中的 producer action 就是股价提供者: * Stomp::Connection.new 向 AMQ Server 开启一个新的 STOMP 连接。 * create_stock_text(stock) 会找出最新的股价。这里我们随机计出数值和变动,实际上这会由报价机构提供 * conn.send 把讯息传去 AMQ 上指定的 topic 里,所有订阅了这个 topic 的 client 也会收到这个讯息。 完成这几个档案后,只需要启动 Rails 、 AMQ ,再把 browser 指向 http://127.0.0.1:3000/portfolio/producer 就可以看到如下的画面: 不错,但究竟是否真的有 message 产生呢?我们可以手动连接上 AMQ 看。只要开启一个 terminal 输入以下指令: telnet localhost 61613 Trying ::1... Connected to localhost. Escape character is '^]'. 看到 connected 的字句表示己经用 Stomp 连结上了。接着输入以下指令 (^@ 是 ASCII 的 Null Character,可用 Ctrl + @ 输入): CONNECT ^@ SUBSCRIBE destination: /topic/STOCKS.* ^@ 这会连接上 AMQ 并收取所有 /topic/STOCKS.* 的讯息。如果己经开启了 Producer ,可以收到类似下面的讯息: MESSAGE destination:/topic/STOCKS.IBMW content-type:text/plain; charset=UTF-8 content-length:83 timestamp:1183307032064 priority:0 expires:0 message-id:ID:core.local-59817-1183285008394-3:735:-1:1:1 <price stock='IBMW' bid='35.9353301170448' offer='35.9712654471618' movement='up'/> 有消息发布者,当然也要有消息接收者吧! 3. Consumer - 用 Mongrel 的 HttpHandler 作股票机 我们把 /portfolio/index 设定为 consumer 。它的源码如下: 档案四:app/view/portfolio/index.rhtml <% content_for("scripts") do %> <script type="text/javascript" src="/javascripts/amq.js"></script> <script type="text/javascript">amq.uri='/amq';</script> <script type="text/javascript" src="/javascripts/portfolio.js"></script> <% end %> <h1>My Portfolio</h1> <p>This example displays an example stock portfolio. In a real system this page would be generated dynamically based on the users current stock portfolio</p> <table class="stocks"> <tr> <th>Stock</th> <th>Description</th> <th>Amount</th> <th>Price</th> <th>Value</th> <th>Cost</th> <th>P & L</th> </tr> <tr id="IBMW"> <td>IBMW</td> <td>IBM Stock</td> <td id="amount">1000</td> <td id="price"></td> <td id="value"></td> <td id="cost">19000</td> <td id="pl"></td> </tr> <tr id="MSFT"> <td>MSFT</td> <td>Microsoft</td> <td id="amount">6000</td> <td id="price"></td> <td id="value"></td> <td id="cost">22000</td> <td id="pl"></td> </tr> <tr id="BEAS"> <td>BEAS</td> <td>BEA Stock</td> <td id="amount">1100</td> <td id="price"></td> <td id="value"></td> <td id="cost">12342</td> <td id="pl"></td> </tr> <tr id="SUNW"> <td>SUNW</td> <td>Sun Microsystems Inc</td> <td id="amount">3000</td> <td id="price"></td> <td id="value"></td> <td id="cost">7700</td> <td id="pl"></td> </tr> </table> <a href="/portfolio/producer" target="_blank"><p>Start a Portfolio Producer</p></a> </body> </html> 这取自 ActiveMQ 的 Web Demo。要注意的是最顶部的三句 javascript。首先它会载入 amq.js ,再把 amq 的 URL 设为 /amq ,接着再载入 portfolio.js。 档案五:public/javascripts/portfolio.js var priceHandler = { _price: function(message) { if (message != null) { var price = parseFloat(message.getAttribute('bid')) var symbol = message.getAttribute('stock') var movement = message.getAttribute('movement') if (movement == null) { movement = 'up' } var row = document.getElementById(symbol) if (row) { // perform portfolio calculations var value = asFloat(find(row, 'amount')) * price var pl = value - asFloat(find(row, 'cost')) // now lets update the HTML DOM find(row, 'price').innerHTML = fixedDigits(price, 2) find(row, 'value').innerHTML = fixedDigits(value, 2) find(row, 'pl').innerHTML = fixedDigits(pl, 2) find(row, 'price').className = movement find(row, 'pl').className = pl >= 0 ? 'up' : 'down' } } } }; function portfolioPoll(first) { if (first) { amq.addListener('stocks','/topic/STOCKS.*',priceHandler._price); } } amq.addPollHandler(portfolioPoll); /// ----------------- // Original code by Joe Walnes // ----------------- /*** Convenience methods, added as mixins to standard classes (object prototypes) ***/ /** * Return number as fixed number of digits. */ function fixedDigits(t, digits) { return (t.toFixed) ? t.toFixed(digits) : this } /** * Find direct child of an element, by id. */ function find(t, id) { for (i = 0; i < t.childNodes.length; i++) { var child = t.childNodes[i] if (child.id == id) { return child } } return null } /** * Return the text contents of an element as a floating point number. */ function asFloat(t) { return parseFloat(t.innerHTML) } 这同样取自 AMQ,主要是 amq.addListener('stocks','/topic/STOCKS.*',priceHandler._price);,这让 browser 当收到由 /topic/STOCKS.* 传来的讯息时会执行指定的 javascript 更新画面。 档案六:lib/ajax_mq_handler.rb require 'rubygems' require 'stomp' require 'timeout' URL = '/amq' CHANNEL = '' ID = 'stocks' DEST = '/topic/STOCKS.*' class AmqHandler < Mongrel::HttpHandler RESP_OK = "<ajax-response>\n</ajax-response>" TIMEOUT = 30 def logger() @@logger = Logger.new(STDOUT) unless defined?(@logger) @@logger end def process(request, response) logger.debug "handle AJAX request" query = Mongrel::HttpRequest.query_parse(request.params["QUERY_STRING"]) method = request.params["REQUEST_METHOD"] if method == 'POST' process_post(request, response, query) else process_get(request, response, query) end end # Handle POST, use to send message # Not implemented # TODO: Implement it when needed def process_post(request, response, query) logger.debug "post method not supported" write_response(response, "<message type='WARN' value='Method not supported'/>") end # Handle GET, use to receive message def process_get(request, response, query) begin logger.debug "connecting stomp" conn = Stomp::Connection.new '', '', 'localhost', 61613, false conn.subscribe DEST, { :ack =>"auto" } rescue logger.error "Error connecting server... #{$!}\n" write_response(response, "<message type='ERROR' value=#{Mongrel::HttpRequest.escape($!)}/>") return end begin msg = "" timeout = query['timeout'].nil? ? TIMEOUT : (query['timeout'].to_i / 1000) Timeout::timeout(timeout + 0.1) { msg = receive_messages(conn) } logger.debug "output: #{msg}" write_response(response, msg) rescue Timeout::Error logger.debug "timeout occurred" write_response(response, "<message type='TIMEOUT'/>") rescue logger.error "an error occurred: #{$!}" write_response(response, "<message type='ERROR' value='#{Mongrel::HttpRequest.escape($!)}'/>") end end private # Return all pending message def receive_messages(conn) msg = conn.receive output = msg.body begin Timeout::timeout(0.01) { while !msg.nil? output += msg.body msg = conn.receive end } rescue Timeout::Error # do nothing, timeout is expected end return output end def write_response(response, output) response.start(200) do |head, out| head["Content-Type"] = "text/xml" out.write "<ajax-response>\n<response id='#{ID}' destination='#{DEST}'>" out.write output out.write "</response>\n</ajax-response>" end end end uri URL, :handler => AmqHandler.new, :in_front => true 终于到了最重要的档案了,ajax_mq_handler.rb 延申了 Mongrel::HttpHandler,Mongrel 会把 Request 和 Response object 交给 Handler。它会如 producer 一样连接去 AMQ。不同的是它只接收讯息 (技术上可以做到发讯息,但基于安全和懒的理由暂时不作这部份)。这里使用了 Timeout ,在 30 秒内没有新讯息的话它会返回 timeout 的讯息。如果在 timeout 前有讯息,它会尝试返回全部讯息。编写 HttpHandler 时要留意这并非在 Rails 的环境中,一般 Rails 的 variable 和 helper 这里也用不到,因此尽量只在这里做必要的工作,其他的还是留给 Rails 吧。 虽然造了一个 HttpHandler,但 Mongrel 不会自动去用它。要让 Mongrel 使用这个 Handler,可以在 Mongrel 设定档加入以下指令: 档案七:config/mongrel_conf.yml :config_script: lib/ajax_mq_handler.rb 再在启动 Server 时使用以下指令: mongrel_rails start -C ./config/mongrel_conf.yml 是时候看看成果了: 同时把 browser 指向 http://127.0.0.1/portfolio 和 http://127.0.0.1/portfolio/producer ,你会留意到当 producer 更新的时候, consumer 也同时更新了!神奇吗?这是用全开源的东西实作出来的成果啊! 总结 我们成功地在 Rails 中作了一个只用 AJAX 而即时更新的软件,这过程中我们用 ApacheMQ 作 MOM,用 STOMP 去连结 MOM 和 Rails 以及用 Mongrel HttpHandler 去让 AJAX 读取消息。当然这只能算是一个 proof of concept ,如有错漏敬请指正。接着可以想想怎样作一个完整的 plugin 简化这个过程。有甚么建议或想法欢迎指教! 整个 Producer 和 Consumer 的 Rails 软件可以在这里下载http://www.reality.hk/%7Esiuying/software/messaging.tgz,它以 MIT License 形式发布。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2007-07-02
这样最大的问题就是密集处理会有更大的负荷。
例如,原来的定时刷新3s请求一次网页 按照上述方案,就是有新消息才会更新。如果聊天非常密集,比如3s内说了好多句话,每一次有新消息都会重新向服务器提交请求。如果3s内说了n句话,处于同一个聊天下的所有客户端都会集体就是向服务器请求n次。显然不能满足一些需求。 当然,如果不是那么频繁的话,效果应该还好。我实验过类似的例子,php实现的。不过没有测试负载,只是2,3个人之间实验了一下,体验确实不错。假的comet么。呵呵。但是大量的,频繁的聊天呢?上述情况是我想象的,也没有实验。 等大家一起讨论。 |
|
返回顶楼 | |
发表时间:2007-07-02
如果想做comet的话,shooting_star这个项目非常值得关注。个人感觉他的事件与内容分离的设计非常方便,不过貌似也没有解决“密集处理会有更大的负荷”的问题。
详细的 http://rubyforge.org/projects/shooting-star/ |
|
返回顶楼 | |
发表时间:2007-07-02
poppy 写道 这样最大的问题就是密集处理会有更大的负荷。
例如,原来的定时刷新3s请求一次网页 按照上述方案,就是有新消息才会更新。如果聊天非常密集,比如3s内说了好多句话,每一次有新消息都会重新向服务器提交请求。如果3s内说了n句话,处于同一个聊天下的所有客户端都会集体就是向服务器请求n次。显然不能满足一些需求。 当然,如果不是那么频繁的话,效果应该还好。我实验过类似的例子,php实现的。不过没有测试负载,只是2,3个人之间实验了一下,体验确实不错。假的comet么。呵呵。但是大量的,频繁的聊天呢?上述情况是我想象的,也没有实验。 等大家一起讨论。 还没有考虑处理大量请求的问题。 不过相对于持久连线的 comet,把每个讯息都用请求/回应方式处理不是更容易用群集去解决问题吗?当然了这也是我的想像,没有实作也不知道是否可行! |
|
返回顶楼 | |
发表时间:2007-07-03
http://eve.woygo.com 我的IM,有兴趣一起研究啊
|
|
返回顶楼 | |
发表时间:2007-07-07
这种方案我以前用过,但最大的问题是,http servlet采用线程来处理每个http request,所以一般只能支持几百个用户同时连接。
|
|
返回顶楼 | |
发表时间:2007-07-08
balaschen,
明白你的意思,不过 activemq 的 ajax client 使用了 Threadless Waiting 技巧。用它的话可得到比普通的 polling 更佳的效能。 当然 Rails/Mongrel 暂时没有这些,但这个架构本身把讯息保存放在 MOM ,把事务逻辑放在 Rails ,Ajax handler 用 Mongrel 作,有需要的时候作 cluster 似乎也可行的。 |
|
返回顶楼 | |