BlazeDS 是 Adobe公司的一个开源项目,相对应的有一个商业质量的软件LiveCycleDS. 这两个项目都是为了使能(Enable) Flex RIA的Client端与JavaEE服务器互操作而创建的项目。进而Adobe希望Flash的企业版本Flex可以在丰富的互联网应用中与JavaEE共同成为占有一席之地的编程框架。
BlazeDS功能也非常强大,除了支持传统的HTTP以及基于HTTP的SOAP通信协议以外,还支持AMF(Adobe Message Format)协议,Adobe声称该协议的传输效率是传统协议的10倍(笔者没有进行过验证)。除此以外,还支持一些传统的HTTP协议不能完成的服务器端推消息的操作。当然几年前就有了Comet以及现在的Websocket,但是Adobe的BlazeDS在更早就提供了非常完善的多种可以支持错误恢复的消息通信机制。比如:实时的流媒体单向通道,准实时的long polling,以及piggybacking等等。它考虑到当时浏览器的限制,比如IE在不进行破戒的情况下仅支持2个并发连接,Firefox仅支持8个并发连接等等,如果没有办法建立实时的流媒体通道,那么框架将自动fallback到准实时的long polling模式等等。还可以支持多人共同参与互动的Dashboard等等。线上有很多相关样例资源以及教程,总而言之,为了满足丰富的互联网的用户体验,BlazeDS在通信机制上确实下了不少功夫。然而Flex还是没有被广泛认可,不少人都觉得它实在是比较慢。再加上苹果和安卓系统都去掉了对Flash的原生支持。这条技术路线的前景可谓惨淡。不过本文不是为了宣传推销BlazeDS,而是站在生命周期的视角来探讨BlazeDS这个开源项目中FlexClient.java中的一些潜在问题。
在大概了解了项目背景之后,那么我们更容易理解FlexClient这个类的职责了。在Server Side, 每一个FlexClient对象代表了一个与服务器通信的Flex的Client,可能是在AIR运行时当中,也可能是在浏览器的AVM运行时当中。FlexClient相对MessageBroker来讲,他是完成消息推送到其他MessageClient的负责方;相对Endpoint来讲,FlexClient是完成从该Endpoint对应的消息队列Poll消息下发给客户端的负责方。
源代码连接: FlexClient.java
/** * Flag indicating whether the instance is valid; once invalidated this flag is * set to false. */ boolean valid;
/** * Flag used to break cycles during invalidation. */ /* package visibility for FlexClientManager */ volatile boolean invalidating; /** * Instance level lock to sync for state changes. */ final Object lock = new Object();
/** * @exclude * Constructs a new FlexClient instance having the specified Id. * * @param manager The FlexClientManager managing this instance. * @param id The Id for this instance. */ public FlexClient(FlexClientManager manager, String id) { this.id = id; flexClientManager = manager; updateLastUse(); valid = true; if (Log.isDebug()) Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient created with id '" + this.id + "'."); }
/** * Invalidates the FlexClient. */ public void invalidate() { synchronized (lock) { if (!valid || invalidating) return; // Already shutting down. invalidating = true; // This thread gets to shut the FlexClient down. flexClientManager.removeFlexClient(this); cancelTimeout(); } // Unregister from all FlexSessions. if (!sessions.isEmpty()) { for (FlexSession session : sessions) unregisterFlexSession(session); } // Invalidate associated MessageClient subscriptions. if (messageClients != null && !messageClients.isEmpty()) { for (MessageClient messageClient : messageClients) { messageClient.removeMessageClientDestroyedListener(this); messageClient.invalidate(); } messageClients.clear(); } // Notify destroy listeners that we're shutting the FlexClient down. if (destroyedListeners != null && !destroyedListeners.isEmpty()) { for (FlexClientListener destroyListener : destroyedListeners) { destroyListener.clientDestroyed(this); } destroyedListeners.clear(); } // Unbind all attributes. if (attributes != null && !attributes.isEmpty()) { Set<String> keySet = attributes.keySet(); String[] keys = keySet.toArray(new String[keySet.size()]); for (String key : keys) removeAttribute(key); } // Close any registered push handlers. if (endpointPushHandlers != null && !endpointPushHandlers.isEmpty()) { for (EndpointPushHandler handler : endpointPushHandlers.values()) { handler.close(true /* notify Channel of disconnect */); } endpointPushHandlers = null; } synchronized (lock) { valid = false; invalidating = false; } if (Log.isDebug()) Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient with id '" + this.id + "' has been invalidated."); }
以上两段代码包含了全部对于两个状态标记属性的赋值操作。除此以外,一些依赖于Valid状态的方法,进行checkValid的判断,个别方法单独加了lock用来同步,其他方法也通过volatile来保证了有序性(由于编译器会对源代码进行优化而重新调整代码顺序,详见Java Memory Model或者Java Language Specification):
/** * Utility method that tests validity and throws an exception if the instance * has been invalidated. */ protected void checkValid() { synchronized (lock) { if (!valid) { MessageException e = new MessageException(); e.setMessage(FLEX_CLIENT_INVALIDATED); throw e; } } }
/** * @exclude * Poll for outbound messages for the FlexClient. * This method is only invoked by internal code while processing a client poll request; it * is not intended for general public use. * Poll requests that trigger this method come from client-side polling channels and the request * is not specific to a single Consumer/MessageClient instance so process any queued messages for * the specified endpoint across all subscriptions. * * @param endpointId The Id of the endpoint that received the poll request. * @return The flush result including messages to return in the poll response and * an optional wait time for the next poll/flush. */ public FlushResult poll(String endpointId) { synchronized (lock) { checkValid(); EndpointQueue queue = outboundQueues.get(endpointId); if (queue != null) return internalPoll(queue); // Otherwise, the client is not subscribed. throwNotSubscribedException(endpointId); } return null; }
* List of registered FlexClient attribute listeners.
private volatile CopyOnWriteArrayList<FlexClientAttributeListener> attributeListeners;
... /** * Adds a FlexClient attribute listener that will be notified when an * attribute is added, removed or changed. If the attribute implements * FlexClientBindingListener, it will be notified before any * FlexClientAttributeListeners are notified. * * @param listener The listener to add. */ public void addClientAttributeListener(FlexClientAttributeListener listener) { if (listener != null) { checkValid(); if (attributeListeners == null) { synchronized (lock) { if (attributeListeners == null) attributeListeners = new CopyOnWriteArrayList<FlexClientAttributeListener>(); } } attributeListeners.addIfAbsent(listener); } }
图2 Valid状态可以执行的方法
/** * Removes a FlexClient attribute listener. * * @param listener The listener to remove. */ public void removeClientAttributeListener(FlexClientAttributeListener listener) { // No need to check validity; removing a listener is always ok. if (listener != null && attributeListeners != null) attributeListeners.remove(listener); } /** * Removes a FlexClient destroyed listener. * * @see flex.messaging.client.FlexClientListener * * @param listener The listener to remove. */ public void removeClientDestroyedListener(FlexClientListener listener) { // No need to check validity; removing a listener is always ok. if (listener != null && destroyedListeners != null) destroyedListeners.remove(listener); }
/** * @exclude * Registers an <tt>EndpointPushHandler</tt> for the specified endpoint to handle pushing messages * to remote clients. * * @param handler The <tt>EndpointPushHandler</tt> to register. * @param endpointId The endpoint to register for. */ public void registerEndpointPushHandler(EndpointPushHandler handler, String endpointId) { synchronized (lock) { if (endpointPushHandlers == null) endpointPushHandlers = new HashMap<String, EndpointPushHandler>(1); if (endpointPushHandlers.containsKey(endpointId)) { MessageException me = new MessageException(); me.setMessage(ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED, new Object[] {getId(), endpointId}); throw me; } endpointPushHandlers.put(endpointId, handler); } }
图3 使用lock进行同步的方法
/** * @exclude * Used internally to disassociate a MessageClient (subscription) from a FlexClient. * * @param messageClient The MessageClient to disassociate from the FlexClient. */ public void unregisterMessageClient(MessageClient messageClient) { if (messageClients != null && messageClients.remove(messageClient)) { messageClient.removeMessageClientDestroyedListener(this); String endpointId = messageClient.getEndpointId(); // Manage the outbound queue that this subscription uses. synchronized (lock) { EndpointQueue queue = outboundQueues.get(endpointId); if (queue != null) { // Decrement the ref count of MessageClients using this queue. queue.messageClientRefCount--; // Unregister the message client from the outbound throttle // manager (if one exists). OutboundQueueThrottleManager tm = queue.processor.getOutboundQueueThrottleManager(); if (tm != null) tm.unregisterAllSubscriptions(messageClient.getDestinationId()); // If we're not attempting to notify the remote client that this MessageClient has // been invalidated, remove any associated messages from the queue. if (!messageClient.isAttemptingInvalidationClientNotification()) { Object messageClientId = messageClient.getClientId(); for (Iterator<Message> iter = queue.messages.iterator(); iter.hasNext(); ) { Message message = iter.next(); if (message.getClientId().equals(messageClientId)) iter.remove(); } } // If no active subscriptions require the queue, clean it up if possible. if (queue.messageClientRefCount == 0) { if (queue.messages.isEmpty() || messageClient.isClientChannelDisconnected()) { if (queue.asyncPoll != null) // Close out async long-poll if one is registered. { FlushResult flushResult = internalFlush(queue); // If the MessageClient isn't attempting client notification, override // and do so in this case to suppress the next poll request from the remote client // which will fail triggering an unnecessary channel disconnect on the client. if (!messageClient.isAttemptingInvalidationClientNotification()) { CommandMessage msg = new CommandMessage(); msg.setClientId(messageClient.getClientId()); msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION); List<Message> messages = flushResult.getMessages(); if (messages == null) messages = new ArrayList<Message>(1); messages.add(msg); } completeAsyncPoll(queue.asyncPoll, flushResult); } // Remove the empty, unused queue. outboundQueues.remove(endpointId); } // Otherwise, the queue is being used by a polling client or contains messages // that will be written by a delayed flush. // Leave it in place. Once the next poll request or delayed flush occurs the // queue will be cleaned up at that point. See internalFlush() and shutdownQueue(). } // Make sure to notify any threads waiting on this queue that may be associated // with the subscription that's gone away. synchronized (queue) { queue.notifyAll(); } } // And if this subscription was associated with an endpoint push handler, unregister it. if (endpointPushHandlers != null) { EndpointPushHandler handler = endpointPushHandlers.get(endpointId); if (handler != null) handler.unregisterMessageClient(messageClient); } } } }
