`
spjich
  • 浏览: 95095 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

(二)androidpn-server tomcat版源码解析之--push消息处理

阅读更多

在 (一)androidpn-server tomcat版源码解析之--项目启动这篇中,已经描述了整个推送服务器的启动过程,并且把握到了消息的入口即XmppIoHandler这个类,今天我将继续往下分析下面的核心代码,主要分为3大块,链接创建,消息的发送,链接关闭。

先贴一段XmppIoHandler的部分代码

/**
     * Invoked from an I/O processor thread when a new connection has been created.
     */
    public void sessionCreated(IoSession session) throws Exception {
        log.debug("sessionCreated()...");
        session.getConfig().setBothIdleTime(IDLE_TIME);;
    }

    /**
     * Invoked when a connection has been opened.
     */
    public void sessionOpened(IoSession session) throws Exception {
        log.debug("sessionOpened()...");
        log.debug("remoteAddress=" + session.getRemoteAddress());
        // Create a new XML parser
        XMLLightweightParser parser = new XMLLightweightParser("UTF-8");
        session.setAttribute(XML_PARSER, parser);
        // Create a new connection
        Connection connection = new Connection(session);
        session.setAttribute(CONNECTION, connection);
        session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,
                connection));
    }

    /**
     * Invoked when a connection is closed.
     */
    public void sessionClosed(IoSession session) throws Exception {
        log.debug("sessionClosed()...");
        Connection connection = (Connection) session.getAttribute(CONNECTION);
        connection.close();
    }

    /**
     * Invoked with the related IdleStatus when a connection becomes idle.
     */
    public void sessionIdle(IoSession session, IdleStatus status)
            throws Exception {
        log.debug("sessionIdle()...");
        Connection connection = (Connection) session.getAttribute(CONNECTION);
        if (log.isDebugEnabled()) {
            log.debug("Closing connection that has been idle: " + connection);
        }
        connection.close();
    }

    /**
     * Invoked when any exception is thrown.
     */
    public void exceptionCaught(IoSession session, Throwable cause)
            throws Exception {
        log.debug("exceptionCaught()...");
        log.error(cause);
    }

    /**
     * Invoked when a message is received.
     */
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        log.debug("messageReceived()...");
        log.debug("RCVD: " + message);

        // Get the stanza handler
        StanzaHandler handler = (StanzaHandler) session
                .getAttribute(STANZA_HANDLER);

        // Get the XMPP packet parser
        int hashCode = Thread.currentThread().hashCode();
        XMPPPacketReader parser = parsers.get(hashCode);
        if (parser == null) {
            parser = new XMPPPacketReader();
            parser.setXPPFactory(factory);
            parsers.put(hashCode, parser);
        }

        // The stanza handler processes the message
        try {
            handler.process((String) message, parser);
        } catch (Exception e) {
            log.error(
                    "Closing connection due to error while processing message: "
                            + message, e);
            Connection connection = (Connection) session
                    .getAttribute(CONNECTION);
            connection.close();
        }
    }

    /**
     * Invoked when a message written by IoSession.write(Object) is sent out.
     */
    public void messageSent(IoSession session, Object message) throws Exception {
        log.debug("messageSent()...");
    }
  •  链接创建

androidpn-server的消息处理步奏为: sessionCreated->sessionOpened->Router->handle

对应的处理类为:XmppIoHandler- >StanzaHandler->PacketRouterIQAuthHandler.java->IQHandler.java

                                                                                                                            IQRegisterHandler.java

                                                                                                                            IQRosterHandler.java

 

                                                                                                                            PresenceUpdateHandler.java

androidpn是以xmpp为协议的,要看懂以下的处理步奏也就必须补充点xmpp的一个相关知识

xmpp的三种消息类型:

message:message是一种基本推送消息方法,它不要求响应。

presence:presence用来表明用户的状态,如:online、away、dnd(请勿打扰)等

iq:一种请求/响应机制,从一个实体从发送请求,另外一个实体接受请求,并进行响应。

XmppIoHandler.java->  sessionCreated:

public void sessionCreated(IoSession session) throws Exception {
        log.debug("sessionCreated()...");
        session.getConfig().setBothIdleTime(IDLE_TIME);;
    }

 client的请求第一次到达时候为session设置了一个idletime

 

XmppIoHandler.java-> sessionOpened

public void sessionOpened(IoSession session) throws Exception {
        log.debug("sessionOpened()...");
        log.debug("remoteAddress=" + session.getRemoteAddress());
        // Create a new XML parser
        XMLLightweightParser parser = new XMLLightweightParser("UTF-8");
        session.setAttribute(XML_PARSER, parser);
        // Create a new connection
        Connection connection = new Connection(session);
        session.setAttribute(CONNECTION, connection);
        session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,
                connection));
    }

 初始化了xml解析器,并且创建了StanzaHandler实例,StanzaHandler这个类时后续处理的关键

StanzaHandler.java-> StanzaHandler

    /**
     * Constructor.
     * 
     * @param serverName the server name
     * @param connection the connection
     */
    public StanzaHandler(String serverName, Connection connection) {
        this.serverName = serverName;
        this.connection = connection;
        this.router = new PacketRouter();//该构造中初始化了三个router实现,分别是 MessageRouter,PresenceRouter,IQRouter,分别处理xmpp的三种消息类型
        notificationService = ServiceLocator.getNotificationService();
        notificationManager = new NotificationManager();
    }

 XmppIoHandler.java-> messageReceived

    public void messageReceived(IoSession session, Object message)
            throws Exception {
        log.debug("messageReceived()...");
        log.debug("RCVD: " + message);

        // Get the stanza handler
        StanzaHandler handler = (StanzaHandler) session
                .getAttribute(STANZA_HANDLER);

        // Get the XMPP packet parser
        int hashCode = Thread.currentThread().hashCode();
        XMPPPacketReader parser = parsers.get(hashCode);
        if (parser == null) {
            parser = new XMPPPacketReader();
            parser.setXPPFactory(factory);
            parsers.put(hashCode, parser);
        }

        // The stanza handler processes the message
        try {
            handler.process((String) message, parser);//###这个方法中,程序会根据xmpp的xml头来判断消息类型,并且传递到对应的Router处理类
        } catch (Exception e) {
            log.error(
                    "Closing connection due to error while processing message: "
                            + message, e);
            Connection connection = (Connection) session
                    .getAttribute(CONNECTION);
            connection.close();
        }
    }

 因为当前做的需求是消息推送,而MessageRouter实用在IM这种对话场合,所以MessageRouter并没有实现处理PresenceRouter和IQRouter都有对应的功能实现。

IQRoute.java -> IQRouter

 

public IQRouter() {
        sessionManager = SessionManager.getInstance();
        iqHandlers.add(new IQAuthHandler());
        iqHandlers.add(new IQRegisterHandler());
        iqHandlers.add(new IQRosterHandler());
        notificationService = ServiceLocator.getNotificationService();
    }

 构造中又实现了3和handler,用户建立好链接后又会对用户进行IQAuthHandler(鉴权),IQRegisterHandler(解析用户携带参数,往数据库中插入用户信息,将链接保存在sessionManager中),IQRosterHandler(没进行什么操作)

 

 

 

  • 消息的发送

以广播消息为例

SessionManager.java

/**
     * Returns a list that contains all authenticated client sessions.
     * 
     * @return a list that contains all client sessions
     */
    public Collection<ClientSession> getSessions() {
        return clientSessions.values();
    }

 获得链接池中所有链接

Session.java

 

    /**
     * Delivers the packet to the associated connection.
     * 
     * @param packet the packet to deliver
     */
    public void deliver(Packet packet) {
        if (connection != null && !connection.isClosed()) {
            connection.deliver(packet);
        }
    }

 Connection.java

/**
     * Delivers the packet to this connection (without checking the recipient).
     * 
     * @param packet the packet to deliver
     */
    public void deliver(Packet packet) {
        log.debug("SENT: " + packet.toXML());
        if (!isClosed()) {
            IoBuffer buffer = IoBuffer.allocate(4096);
            buffer.setAutoExpand(true);

            boolean errorDelivering = false;
            try {
                XMLWriter xmlSerializer = new XMLWriter(new IoBufferWriter(
                        buffer, (CharsetEncoder) encoder.get()),
                        new OutputFormat());
                xmlSerializer.write(packet.getElement());
                xmlSerializer.flush();
                buffer.flip();
                ioSession.write(buffer);//###通过mina的isSession对象传递
            } catch (Exception e) {
                log.debug("Connection: Error delivering packet" + "\n"
                        + this.toString(), e);
                errorDelivering = true;
            }
            if (errorDelivering) {
                close();
            } else {
                session.incrementServerPacketCount();
            }
        }
    }

 封装层次为isSession->Connection->Session

 

 

 

  • 链接关闭

XmppIoHandler.java

/**
     * Invoked when a connection is closed.
     */
    public void sessionClosed(IoSession session) throws Exception {
        log.debug("sessionClosed()...");
        Connection connection = (Connection) session.getAttribute(CONNECTION);
        connection.close();
    }

 Connection.java

/**
     * Closes the session including associated socket connection,
     * notifing all listeners that the channel is shutting down.
     */
    public void close() {
        boolean closedSuccessfully = false;
        synchronized (this) {
            if (!isClosed()) {
                try {
                    deliverRawText("</stream:stream>", false);
                } catch (Exception e) {
                    // Ignore
                }
                if (session != null) {
                    session.setStatus(Session.STATUS_CLOSED);
                }
                ioSession.close(false);
                closed = true;
                closedSuccessfully = true;
            }
        }
        if (closedSuccessfully) {
            notifyCloseListeners();
        }
    }

 此处使用了观察者模式,首先传递了Xmpp的结束xml“</stream:stream>”,再关闭了mina最低层的ioSession,再通知观察者,也就是androidpn封装的ClientSession在内存中删除该会话。

 

 

 

 

 原创文章,转载请声名出处  http://spjich.iteye.com/blog/2226149
0
0
分享到:
评论

相关推荐

    androidpn-client-0.5.0.zip和androidpn-server-0.5.0-bin.zip

    首先, 我们需要下载androidpn-client-0.5.0.zip和androidpn-server-0.5.0-bin.zip。 下载地址:http://sourceforge.net/projects/Androidpn/ 解压两个包,Eclipse导入client,配置好目标平台,打开raw/...

    androidpn-client-0.5.01.rar和androidpn-server-0.5.0-bin.zip可在真机上运行

    androidpn-server-0.5.0-bin.zip解压后,打卡bin目录下run.bat运行,之后在浏览器中输入http://127.0.0.1:7070/ 将androidpn-client-0.5.0解压后导入Eclipse,修改/raw/androidpn.properties中的xmppHost=xxx.xxx.x...

    androidpn-server-0.5.0-bin、androidpn-client-0.5.0、androidpn-demoapp-0.5.0

    1. **AndroidPN-Server-0.5.0-bin**: 这个压缩包包含了服务器端的二进制文件和必要的配置文件。服务器端是整个推送系统的核心,它负责接收来自应用开发者或后台系统的推送消息,并将这些消息推送到已注册的Android...

    androidpn-client-0.5.0和androidpn-server-0.5.0-bin

    总的来说,"androidpn-client-0.5.0"和"androidpn-server-0.5.0-bin"是构建高效、可靠的Android消息推送服务的基础,它们共同实现了从服务器到设备的无缝消息传递,极大地丰富了Android应用的功能性和用户体验。...

    androidpn-tomcat版本

    AndroidPN-tomcat版本是一个专为Android平台设计的推送通知服务,它采用了开源的Tomcat应用服务器作为其核心组件。这个版本确保了在部署和运行过程中可以顺利进行,为开发者提供了一个稳定且可靠的推送服务环境。 ...

    androidpn-tomcat-server端

    这个"androidpn-tomcat-server端"是AndroidPN服务端的实现,它使用了Apache Tomcat作为应用服务器来托管Java代码。Tomcat是一个流行的开源Java Servlet容器,支持Java EE的Web应用程序。 首先,我们要理解推送服务...

    androidpn-tomcat-0.5.0

    另一方面,"androidpn-server-tomcat.zip"则包含了AndroidPN服务端的部署包,这部分是基于Tomcat服务器运行的。Tomcat是Apache软件基金会的Jakarta项目下的一个开源产品,它实现了Java Servlet和JavaServer Pages...

    androidpn-client-0.5.0 AND androidpn-server-0.5.0

    使用Apndroid Push Notification 实现android信息推送,AndroidPn项目是使用XMPP协议实现信息推送的一个开源项目。内涵服务端和客户端源码

    androidpn-server-0.6.0.jar

    androidpn-server-0.6.0.jar

    androidpn-server-1.0.0.zip-jre7

    该程序包修改至开源项目androidpn-server-0.0.5,携带jre7(此为windows版本,如要运行在linix,需更换为linix jre7),无需java配置运行环境,实现离线推送功能,支持推送消息有效期设置,服务端能够自主判断消息...

    androidpn-client-0.5.0.zip

    "AndroidPN-client-0.5.0.zip" 是一个针对Android平台的Push Notification服务客户端的压缩包文件。Push Notification(简称PN)是一种允许服务器向移动设备推送消息的技术,无需应用程序处于活动状态。在这个版本...

    androidpn_tomcat整合(发布直接使用)

    AndroidPN-server是AndroidPN服务的后台部分,负责处理来自客户端的注册请求、存储设备信息以及发送推送通知。 接下来,我们讨论如何将AndroidPN-server集成到Tomcat中。Tomcat是一个轻量级的Java应用服务器,常...

    androidpn-client-0.5.0

    "AndroidPN-client-0.5.0"是一个针对Android平台的Push Notification服务的客户端库,主要功能是为了实现设备与服务器之间的即时通信。Push Notification服务在移动应用开发中扮演着重要角色,它允许服务器向已安装...

    androidpn-server-app-master

    "androidpn-server-app-master" 是一个基于Android的推送通知服务(Push Notification)的项目,它采用了Tomcat作为服务器端的应用程序容器。这个项目的核心目标是为Android设备提供实时的消息推送功能,使得应用...

    androidpn-client-0.5.0.rar_androidpn-server_消息推送an

    这是一个消息推送客户端,安装在自己的手机上,可以实现与andriodpn-sever的连接,接受推送的消息

    androidpn-bin-server-0.5.0

    在"androidpn-server-0.5.0"这个压缩包中,包含了服务端的源代码或二进制文件,开发者可以部署并配置这个服务器,以便为自己的Android应用提供推送通知服务。安装和配置过程可能涉及到设置环境变量、配置文件修改...

    androidpn-0.5.0 server+demoapp+client+成功配置方法

    内含:配置方法一份、androidpn-server-0.5.0-bin.zip、androidpn-demoapp-0.5.0-bin.zip、androidpn-client-0.5.0-bin.zip。三个压缩包都是从官网下载http://sourceforge.net/projects/androidpn/,未做任何修改。...

    androidpn-client-dragon

    androidpn-client-0.5.0 修改后的版本 修改原来的通知操作 为action配置 这样就可以定义自己的操作 action为: org.androidpn.client.NotificationProcess 请看文章: ...

    androidpn-client-Demo.zip_DEMO_androidpn-server_asmack _xmpp _推送

    基于XMPP协议、使用Androidpn服务器,Asmack编程的即时通讯IM客户端源代码;主要功能实现向客户端推送消息!

Global site tag (gtag.js) - Google Analytics