`
wslfh2005
  • 浏览: 12942 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

深入解析Apache Mina源码(2)——Mina的事件模型

阅读更多

1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现

2、深入解析Apache Mina源码(2)——Mina的事件模型

 

一、观察者模式的本来面目

工作时间长了,会发现代码中的很多东西都是相通相似的,就说JAVA的事件机制其实就是观察者模式的实现,学会了观察者模式,事件机制自己无师自通,先以我的角度看看什么是观察者模式。

观察者模式,顾名思义,应该有观察者(抽象观察者“Observer”角色)和被观察者(抽象主题“Subject”角色),被观察者一旦有变化就通知观察者更新自己。

形象图:



下面是观察者模式的类图:




从我的角度简单点理解,我们知道面向对象编程不过是对象之间的相互调用,那么被观察者应该有个列表(List)来把所有观察者引用起来,当有事件要通知观察者时,我们从List中把所有观察者类取出来挨个调用观察者的方法(如update())。这个过程中只有充分利用了面向对象编程的多态,继承等特性才能实现。

下面是代码实现:

被观察者(抽象主题类):

package com.lifanghu.observer;

import java.util.ArrayList;
import java.util.List;

/**
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午06:18:26
 * @name com.lifanghu.observer.Subject.java
 * @version 1.0
 */
public abstract class Subject {

    private List<Observer> observers = new ArrayList<Observer>();

    /**
     * 增加一个观察者
     * @param observer
     * @author lifh
     */
    public void addObserver(Observer observer) {
        observers.add(observer);
    }
    /**
     * 删除一个观察者
     * @param observer
     * @author lifh
     */
    public void removeObserver(Observer observer) {
        observers.remove(observer);
    }
    /**
     * 通知所有观察者
     * @author lifh
     */
    public void notifyObservers() {
        for (Observer observer : observers) {
            observer.update(this);
        }
    }
} 

  

实际被观察者(真实主题角色):

package com.lifanghu.observer;

/**
 * 实际被观察者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午10:48:02
 * @name com.lifanghu.observer.XiaoMing.java
 * @version 1.0
 */
public class XiaoMing extends Subject {

    private String state;

    //实际要通进行通知的方法
    public void change() {
        this.notifyObservers();
    }
    public String getState() {
        return state;
    }
    public void setState(String state) {
        this.state = state;
    }
}
 

 

抽象观察者:

package com.lifanghu.observer;

/**
 * 实际观察者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午10:54:07
 * @name com.lifanghu.observer.XiaoWang.java
 * @version 1.0
 */
public class XiaoWang implements Observer {

    public void update(Subject subject) {
        XiaoMing xm = (XiaoMing) subject;
        System.out.println("小王得到通知:" + xm.getState() + ";小王说:活该!");
    }
} 

  

具体观察者1:

package com.lifanghu.observer;

/**
 * 实际观察者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午10:54:07
 * @name com.lifanghu.observer.XiaoWang.java
 * @version 1.0
 */
public class XiaoWang implements Observer {

    public void update(Subject subject) {
        XiaoMing xm = (XiaoMing) subject;
        System.out.println("小王得到通知:" + xm.getState() + ";小王说:活该!");
    }
}
 

具体观察者2:

package com.lifanghu.observer;

/**
 * 实际观察者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午10:56:17
 * @name com.lifanghu.observer.ZhangShan.java
 * @version 1.0
 */
public class ZhangShan implements Observer {

    public void update(Subject subject) {
        XiaoMing xm = (XiaoMing) subject;
        System.out.println("张三得到通知:" + xm.getState() + ";张三说:快吃药吧!");
    }
}
 

 

客户端调用逻辑:

package com.lifanghu.observer;

/**
 * 实际观察者
 * @author lifh
 * @mail wslfh2005@163.com
 * @since 2012-6-14 下午10:56:17
 * @name com.lifanghu.observer.ZhangShan.java
 * @version 1.0
 */
public class ZhangShan implements Observer {

    public void update(Subject subject) {
        XiaoMing xm = (XiaoMing) subject;
        System.out.println("张三得到通知:" + xm.getState() + ";张三说:快吃药吧!");
    }
}
 

 

输出结果:

 写道
张三得到通知:小明病了;张三说:快吃药吧!
小王得到通知:小明病了;小王说:活该!
 

说明:对于观察者模式JDK是有相应的实现支持的,内置观察者模式主要有2个类,一个是类Observable,一个是接口类Observer ,大家可以上网去查,或者直接看它的源码,这里就不多说了。

 

二、从观察者模式到事件机制

关于事件机制上面也谈了,它其实就是观察者模式的实现,关于两者的联系也可以看iteye上的另一讨论http://www.iteye.com/topic/182643,里面说的已经很清楚了。

关于事件模型我想说的是里面的三个元素:事件源,事件本身,监听者,对应观察者模式的三个元素分别是:被观察者,观察者方法要传入的参数,观察者。

关于事件机制JDK也有相应的实现java.util.EventListenerjava.util.EventObject 

 

 

三、Mina中的事件以及监听器的实现

Mina 中有主要有三种事件:

1org.apache.mina.core.service包下面对IoService接口实现和IoSession接口实现的监听,它不是一个纯的事件模型,没有事件本身(EventObject)的存在。

2org.apache.mina.core.session包下在线程池环境下对于事件的触发从而进入过滤器链进行的处理,它不是一个纯事件的模型,少了监听器(EventListener)的实现。

3org.apache.mina.core.future包下对于异步IO操作(IoFuture)进行监听。它也不是一个纯的事件模型,也是少了事件本身(EventObject)的存在。

下面分别介绍下这三种事件细节。

1service包下面主要有两个类是用于监听模式的,监听者IoServiceListener和它的帮助类IoServiceListenerSupportIoServiceListener接口继承了EventListener,标明自己是个监听者接口,主要监听servicesession的创建,空闲,销毁事件的。IoServiceListenerSupport作为它的帮助类,实际上充当了事件源的角色,它存储了IoServiceListener的成员列表。

 

 

/** A list of {@link IoServiceListener}s. */
private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();

 里面有增加和删除监听者的方法:

 

    /**
     * Adds a new listener.
     * 
     * @param listener The added listener
     */
    public void add(IoServiceListener listener) {
        if (listener != null) {
            listeners.add(listener);
        }
    }

    /**
     * Removes an existing listener.
     * 
     * @param listener The listener to remove
     */
    public void remove(IoServiceListener listener) {
        if (listener != null) {
            listeners.remove(listener);
        }
    }
 

 

通知监听者的方法:

 

    /**
     * Calls {@link IoServiceListener#serviceActivated(IoService)}
     * for all registered listeners.
     * 通知方法
     */
    public void fireServiceActivated() {
        if (!activated.compareAndSet(false, true)) {
            // The instance is already active
            return;
        }

        activationTime = System.currentTimeMillis();

        // Activate all the listeners now
        for (IoServiceListener listener : listeners) {//循环提取出listeners执行本身的方法
            try {
                listener.serviceActivated(service);
            } catch (Throwable e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
    }
 

 

再来看接口IoService,有对监听器的操作:

 

    /**
     * Adds an {@link IoServiceListener} that listens any events related with
     * this service.
     */
    void addListener(IoServiceListener listener);

    /**
     * Removed an existing {@link IoServiceListener} that listens any events
     * related with this service.
     */
    void removeListener(IoServiceListener listener);

 它的实现AbstractIoService里面可以看到,实际是调用的IoServiceListenerSupport的方法执行的有关操作:

 

    /**
     * {@inheritDoc}
     */
    public final void addListener(IoServiceListener listener) {
        listeners.add(listener);
    }

    /**
     * {@inheritDoc}
     */
    public final void removeListener(IoServiceListener listener) {
        listeners.remove(listener);
    }

 看下AbstractIoAcceptor类的调用方法:

 

        if (activate) {
            //触发监听器的服务创建事件
            getListeners().fireServiceActivated();
        }

 2session包下的事件模型主要还是基于多线程模型来实现的,可以看到IoEvent实现了Runnable接口,它作为任务放入到线程池中去执行,看它的run方法:

 

    // 此类实现了 Runnable 它的存在主要为了线程池过滤器ExecutorFilter来使用的。
    // 这样如果加入了ExecutorFilter后,后面所有的数据处理都是多线程方式进行的。
    // 一般放在handler前面,也就是所有过滤器的后面,因为handler主要做业务处理,可能会有数据库的操作,比较耗时,适合多线程。
    public void run() {
        //激活各种事件
        fire();
    }

 IO事件类型:

 

package org.apache.mina.core.session;

/**
 * An {@link Enum} that represents the type of I/O events and requests.
 * Most users won't need to use this class.  It is usually used by internal
 * components to store I/O events.
 *
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 */
public enum IoEventType {
    SESSION_CREATED,//session创建
    SESSION_OPENED,//session打开
    SESSION_CLOSED,//session关闭
    MESSAGE_RECEIVED,//消息接收
    MESSAGE_SENT,//消息发送
    SESSION_IDLE,//session空闲
    EXCEPTION_CAUGHT,//发生异常
    WRITE,//写事件
    CLOSE,//关闭session事件
}

 它有一个实现类IoFilterEvent,我们知道如果我们想要利用mina的多线程处理,需要如下加一个过滤器:

 

connector.getFilterChain().addLast("executor", new ExecutorFilter());

 在这个时候我们的IoEvent就派上了用场,ExecutorFilter是一个实现了线程池(线程池在后面的文章中会介绍)的过滤器,看下ExecutorFilter的调用:

 

@Override
public final void sessionOpened(NextFilter nextFilter, IoSession session) {
    if (eventTypes.contains(IoEventType.SESSION_OPENED)) {
        IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,
            session, null); 
        //触发事件
        fireEvent(event);
    } else {
        nextFilter.sessionOpened(session);
    }
}
/**
 * Fires the specified event through the underlying executor.
 * 
 * @param event The filtered event
 */
protected void fireEvent(IoFilterEvent event) {
    //将事件提交给线程池执行
    executor.execute(event);
}

 可以看到这个session下的事件模型没有监听器,所以基本就没有事件模式的概念了,但是还是有一些事件模型的一些影子,所以可以认为它是一个伪事件模型。

 

 

3future包下面的事件模型,是比较常规的事件模型,这是一个异步调用后的事件模型,当设定的异步操作完成后会调用监听器的operationComplete方法,事件源为IoFuture等,监听器是IoFutureListener,它只有一个方法:

 

 

/**
 * Invoked when the operation associated with the {@link IoFuture}
 * has been completed even if you add the listener after the completion.
 * 只有一个处理完成的回调方法,当线程池将我们的任务处理完成后会调用此方法。
 * @param future  The source {@link IoFuture} which called this
 *                callback.
 */
void operationComplete(F future);

 看下事件源的代码:

 

package org.apache.mina.core.future;

import java.util.concurrent.TimeUnit;

import org.apache.mina.core.session.IoSession;

/**
 * Represents the completion of an asynchronous I/O operation on an 
 * {@link IoSession}.
 * Can be listened for completion using a {@link IoFutureListener}.
 * 因为有了线程池模型才会有异步的概念。
 * 我们将各种IO操作(连接,关闭,读,写)以任务的方式放入队列中并返回IoFuture供线程池去处理。
 * 处理过程和处理完成的操作会改变IoFuture的状态。
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 */
public interface IoFuture {
    /**
     * Returns the {@link IoSession} which is associated with this future.
     */
    IoSession getSession();

    /**
     * Wait for the asynchronous operation to complete.
     * The attached listeners will be notified when the operation is 
     * completed.
     */
    IoFuture await() throws InterruptedException;

    /**
     * Wait for the asynchronous operation to complete with the specified timeout.
     * 等待操作在指定时间内完成
     * @return <tt>true</tt> if the operation is completed.
     */
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    /**
     * Wait for the asynchronous operation to complete with the specified timeout.
     *
     * @return <tt>true</tt> if the operation is completed.
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Wait for the asynchronous operation to complete uninterruptibly.
     * The attached listeners will be notified when the operation is 
     * completed.
     * 等待异步操作完成,完成后会触发注册的监听器。
     * @return the current IoFuture
     */
    IoFuture awaitUninterruptibly();

    /**
     * Wait for the asynchronous operation to complete with the specified timeout
     * uninterruptibly.
     *
     * @return <tt>true</tt> if the operation is completed.
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

    /**
     * Wait for the asynchronous operation to complete with the specified timeout
     * uninterruptibly.
     *
     * @return <tt>true</tt> if the operation is finished.
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * @deprecated Replaced with {@link #awaitUninterruptibly()}.
     */
    @Deprecated
    void join();

    /**
     * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
     */
    @Deprecated
    boolean join(long timeoutMillis);

    /**
     * Returns if the asynchronous operation is completed.
     */
    boolean isDone();

    /**
     * Adds an event <tt>listener</tt> which is notified when
     * this future is completed. If the listener is added
     * after the completion, the listener is directly notified.
     * 增加异步完成后的监听者
     */
    IoFuture addListener(IoFutureListener<?> listener);

    /**
     * Removes an existing event <tt>listener</tt> so it won't be notified when
     * the future is completed.
     * 删除异步完成后的监听者
     */
    IoFuture removeListener(IoFutureListener<?> listener);
}

 看下调用监听者的方法类DefaultIoFuture的代码:

 

/**
 * Sets the result of the asynchronous operation, and mark it as finished.
 */
public void setValue(Object newValue) {
    synchronized (lock) {
        // Allow only once.
        if (ready) {
            return;
        }

        result = newValue;
        ready = true;
        if (waiters > 0) {
            lock.notifyAll();
        }
    }

    //调用监听者的方法
    notifyListeners();
}

 在此我们可以学到JAVA的异步只有在多线程操作下才有可能实现。

 

 

四、推荐文章

 

参考文章:

1. 观察者模式

http://ttitfly.iteye.com/blog/152512

 

2. JAVA与模式》之观察者模式

http://www.cnblogs.com/java-my-life/archive/2012/05/16/2502279.html

 

3. Java事件机制理解及应用

http://blog.csdn.net/JianZhiZG/article/details/1427073

 

4. java 事件机制

http://www.blogjava.net/chenweicai/archive/2007/04/13/110350.html

 

五、总结

 

 

事件模型作为mina中一个重要的设计模式很好的体现了JAVA高内聚,低耦合的设计思想,也让用户的调用代码简洁易用,本文章本着抛砖引玉的态度希望大家能指出缺点,共同讨论。

每天进步一点点,不做无为的码农。。。。。

2012617日星期日

码农虎虎

http://weibo.com/hurtigf

http://www.lifanghu.com/

wslfh2005@163.com

  • 大小: 28.3 KB
  • 大小: 11.6 KB
分享到:
评论
1 楼 u014037638 2015-01-15  
感谢分享。。。。看到这里你可以加个Future模式的讲解了,呵呵,最后面提了一下

相关推荐

    apache-mina源码

    在这个"apache-mina源码"中,我们可以深入理解MINA的设计原理和实现细节。 MINA的核心概念包括: 1. **IoSession**:IoSession是MINA中的核心组件,代表了服务端和客户端之间的连接。它包含了会话的状态信息,如...

    apache-mina-2.0.4.rar_apache mina_mina

    在深入研究Apache Mina的源码之前,了解其核心组件是必要的: 1. **Filter Chain**:Mina的核心设计模式之一是过滤器链。每个连接都有一系列过滤器,它们按照顺序处理入站和出站事件。过滤器可以实现特定功能,如...

    深入理解 Apache Mina

    最近一直在看 Mina 的源码,用了 Mina 这么长时间,说实话,现在才开始对 Mina 有了一 些 深刻的理解,关于 Mina 的基本知识的介绍,这里就不多说了,网上已经有很多不错的文 章 都对 Mina 做了较深刻的剖析,现在...

    mina 源码

    深入研究"apache-mina-2.0.4"源码,我们可以学习到MINA如何实现非阻塞I/O,过滤器链的构建和事件传播机制,以及如何定制和扩展MINA以满足特定需求。例如,可以查看IoSession的实现,了解其如何管理会话状态;研究...

    深入理解Apache Mina (6)---- Java Nio ByteBuffer与Mina ByteBuffer的区别

    Apache Mina是一个高性能的网络应用框架,主要用于简化网络...通过阅读《深入理解Apache Mina (6)---- Java Nio ByteBuffer与Mina ByteBuffer的区别》的相关资料,可以更深入地理解这两个类的具体实现和应用场景。

    Apache Mina 入门Demo

    1. **Mina架构**:Apache Mina的核心设计基于事件驱动和非阻塞I/O模型,这种模型特别适合处理大量并发连接。它将网络通信层抽象为一组服务,如TCP/IP协议栈,让你专注于业务逻辑而不是底层细节。 2. **IoSession...

    深入理解Apache_Mina

    在深入探讨Apache Mina框架之前,先要明白它在当今IT行业中的应用背景和重要性。Apache Mina是一个网络应用框架,用于帮助开发高性能和高可扩展性的网络应用。Mina为网络编程提供了一套简洁的API,简化了事件驱动...

    Apache MINA框架相关资料

    3. **Mina2源码分析**(Mina2源码分析.doc):源码分析文档通常由经验丰富的开发者编写,通过深入剖析MINA的源代码,揭示其内部工作原理,帮助开发者理解MINA如何实现非阻塞I/O,以及如何高效地处理网络连接和数据...

    apache-mina-2.0.4架包及源码各pdf学习教程

    apache-mina-2.0.4 架包 源码 学习教程.apache mina是Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序...

    Apache Mina Server 2.0 抢鲜体验

    7. **源码分析**:由于标签中提到了“源码”,因此,对于有志于深入理解Mina工作原理的开发者来说,阅读和分析Apache Mina的源码可以帮助他们更好地优化自己的网络应用程序,提高性能和稳定性。 8. **性能优化**:...

    Mina源码解析

    2. **Mina的事件模型**: Mina采用事件驱动的编程模型,通过I/O事件(如连接建立、数据读写、连接关闭等)触发相应的处理器。这些事件由IoAdapter抽象类进行封装,并由IoHandler接口处理。开发者可以通过继承...

    apache mina-spring 服务端程序

    Apache Mina是一个高度可扩展的网络通信框架,主要用于构建...这包括理解Mina的事件模型、Spring的bean管理和网络编程的最佳实践。同时,这也涉及到如何处理网络异常,优化性能,以及实现安全性和稳定性等方面的知识。

    apache-mina-2.0.7架包与源码

    源码包("apache-mina-2.0.7-src.zip")对于开发者来说非常有价值,因为它允许他们深入理解MINA的工作原理,进行自定义修改,以及调试和优化代码。通过查看源码,开发者可以学习MINA如何处理网络I/O事件,如何实现非...

    Mina 1.1.7 示例源码(apache.mina.example)

    它的异步事件驱动模型使得Mina在处理大量并发连接时表现出色。 2. **Mina的核心组件** - **IoSession**: 代表一个网络连接,是Mina中数据传输的通道,包含了连接的状态信息和会话属性。 - **Filter Chain**: 过滤...

    Mina2源码分析

    ### Mina2源码分析——核心模块解析 #### 概述 Mina2是一个高性能、可扩展的网络应用框架,支持多种传输协议如TCP、UDP等,并提供了丰富的API供开发者使用。本文旨在深入剖析Mina2的核心部分,帮助读者更好地理解和...

    apache-mina-2.0.7-src.zip

    1. **核心库**:包含MINA的核心组件,如Buffer、Filter、Session、ProtocolCodec等,这些组件构成了MINA的核心架构,提供事件驱动的网络编程模型。 2. **示例**:MINA提供了一系列的示例程序,展示了如何使用MINA...

    apache mina 学习笔记三(子项目FtpServer)

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个Java框架,用于构建高性能、高可用性的网络应用程序。MINA 提供了一种简单而强大的API,开发者可以使用它来处理TCP/IP和UDP/IP协议,如...

    MINA源码与例子

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用于简化开发高质量的网络服务。MINA的目标是为开发者提供一个简单易用但功能强大的库,...

Global site tag (gtag.js) - Google Analytics