`
buerkai
  • 浏览: 169546 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

openfire(8)openfire中的mina框架使用

 
阅读更多

在openfire中与客户端之间的交互代码主要在org.jivesoftware.openfire.nio,org.jivesoftware.openfire.net这个2个包中。

 

当openfire启动的时候,会去加载连接管理中心ConnectionManagerImpl这个类,其中有几个方法值得去细细阅读,

 private synchronized void createListeners() {

        if (isSocketStarted || sessionManager == null || deliverer == null || router == null || serverName == null) {

            return;

        }

        // Create the port listener for s2s communication

        createServerListener(localIPAddress);

        // Create the port listener for Connections Multiplexers

        createConnectionManagerListener();

        // Create the port listener for external components

        createComponentListener();

        // Create the port listener for clients

        createClientListeners();

        // Create the port listener for secured clients

        createClientSSLListeners();

    }

//这个地方只要是建立5个socket,下面介绍一下:

1. createServerListener,此主要是建立一个基于java.net.ServerSocket。

 

 private void createServerListener(String localIPAddress) {

        // Start servers socket unless it's been disabled.

        if (isServerListenerEnabled()) {

            int port = getServerListenerPort();

            try {

                serverSocketThread = new SocketAcceptThread(this, new ServerPort(port, serverName,

                        localIPAddress, false, null, ServerPort.Type.server));

                ports.add(serverSocketThread.getServerPort());

                serverSocketThread.setDaemon(true);

                serverSocketThread.setPriority(Thread.MAX_PRIORITY);

            }

            catch (Exception e) {

                System.err.println("Error creating server listener on port " + port + ": " +

                        e.getMessage());

                Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);

            }

        }

    }

进入SocketAcceptThread会发现其真面目,

public class SocketAcceptThread extends Thread {

 

    /**

     * Holds information about the port on which the server will listen for connections.

     */

    private ServerPort serverPort;

 

    private SocketAcceptingMode acceptingMode;

 

    public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)

            throws IOException {

        super("Socket Listener at port " + serverPort.getPort());

        // Listen on a specific network interface if it has been set.

        String interfaceName = JiveGlobals.getXMLProperty("network.interface");

        InetAddress bindInterface = null;

        if (interfaceName != null) {

            if (interfaceName.trim().length() > 0) {

                bindInterface = InetAddress.getByName(interfaceName);

                // Create the new server port based on the new bind address

                serverPort = new ServerPort(serverPort.getPort(),

                        serverPort.getDomainNames().get(0), interfaceName, serverPort.isSecure(),

                        serverPort.getSecurityType(), serverPort.getType());

            }

        }

        this.serverPort = serverPort;

        // Set the blocking reading mode to use

        acceptingMode = new BlockingAcceptingMode(connManager, serverPort, bindInterface);

    }

 

    /**

     * Retrieve the port this server socket is bound to.

     *

     * @return the port the socket is bound to.

     */

    public int getPort() {

        return serverPort.getPort();

    }

 

    /**

     * Returns information about the port on which the server is listening for connections.

     *

     * @return information about the port on which the server is listening for connections.

     */

    public ServerPort getServerPort() {

        return serverPort;

    }

 

    /**

     * Unblock the thread and force it to terminate.

     */

    public void shutdown() {

        acceptingMode.shutdown();

    }

 

    /**

     * About as simple as it gets.  The thread spins around an accept

     * call getting sockets and handing them to the SocketManager.

     */

    @Override

public void run() {

        acceptingMode.run();

        // We stopped accepting new connections so close the listener

        shutdown();

    }

}

 

class BlockingAcceptingMode extends SocketAcceptingMode {

 

private static final Logger Log = LoggerFactory.getLogger(BlockingAcceptingMode.class);

 

    protected BlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,

            InetAddress bindInterface) throws IOException {

        super(connManager, serverPort);

        serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface);

    }

 

    /**

     * About as simple as it gets.  The thread spins around an accept

     * call getting sockets and creating new reading threads for each new connection.

     */

    @Override

public void run() {

        while (notTerminated) {

            try {

                Socket sock = serverSocket.accept();

                if (sock != null) {

                    Log.debug("Connect " + sock.toString());

                    SocketReader reader =

                            connManager.createSocketReader(sock, false, serverPort, true);

                    Thread thread = new Thread(reader, reader.getName());

                    thread.setDaemon(true);

                    thread.setPriority(Thread.NORM_PRIORITY);

                    thread.start();

                }

            }

            catch (IOException ie) {

                if (notTerminated) {

                    Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),

                            ie);

                }

            }

            catch (Throwable e) {

                Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);

            }

        }

    }

}

//此处只要这个服务器,接受到一个请求,就会创建一个新的线程去解析相关数据,并做逻辑处理。

 

 @Override

public void run() {

        try {

            socketReader.reader.getXPPParser().setInput(new InputStreamReader(

                    ServerTrafficCounter.wrapInputStream(socket.getInputStream()), CHARSET));

 

            // Read in the opening tag and prepare for packet stream

            try {

                socketReader.createSession();

            }

            catch (IOException e) {

                Log.debug("Error creating session", e);

                throw e;

            }

 

            // Read the packet stream until it ends

            if (socketReader.session != null) {

                readStream();

            }

 

        }

        catch (EOFException eof) {

            // Normal disconnect

        }

        catch (SocketException se) {

            // The socket was closed. The server may close the connection for several

            // reasons (e.g. user requested to remove his account). Do nothing here.

        }

        catch (AsynchronousCloseException ace) {

            // The socket was closed.

        }

        catch (XmlPullParserException ie) {

            // It is normal for clients to abruptly cut a connection

            // rather than closing the stream document. Since this is

            // normal behavior, we won't log it as an error.

            // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);

        }

        catch (Exception e) {

            if (socketReader.session != null) {

                Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +

                        socketReader.session, e);

            }

        }

        finally {

            if (socketReader.session != null) {

                if (Log.isDebugEnabled()) {

                    Log.debug("Logging off " + socketReader.session.getAddress() + " on " + socketReader.connection);

                }

                try {

                    socketReader.session.close();

                }

                catch (Exception e) {

                    Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());

                }

            }

            else {

                // Close and release the created connection

                socketReader.connection.close();

                Log.debug(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());

            }

            socketReader.shutdown();

        }

    }

 

//createSession中进行报文的解析,

protected void createSession()

            throws UnauthorizedException, XmlPullParserException, IOException {

        XmlPullParser xpp = reader.getXPPParser();

        for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {

            eventType = xpp.next();

        }

 

        // Check that the TO attribute of the stream header matches the server name or a valid

        // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown

        // error and close the underlying connection.

        System.out.println(reader.getXPPParser().toString());

        String host = reader.getXPPParser().getAttributeValue("", "to");

        if (validateHost() && isHostUnknown(host)) {

            StringBuilder sb = new StringBuilder(250);

            sb.append("<?xml version='1.0' encoding='");

            sb.append(CHARSET);

            sb.append("'?>");

            // Append stream header

            sb.append("<stream:stream ");

            sb.append("from=\"").append(serverName).append("\" ");

            sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");

            sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");

            sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");

            sb.append("version=\"1.0\">");

            // Set the host_unknown error

            StreamError error = new StreamError(StreamError.Condition.host_unknown);

            sb.append(error.toXML());

            // Deliver stanza

            connection.deliverRawText(sb.toString());

            // Close the underlying connection

            connection.close();

            // Log a warning so that admins can track this cases from the server side

            Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +

                    ". Connection: " + connection);

        }

 

        // Create the correct session based on the sent namespace. At this point the server

        // may offer the client to secure the connection. If the client decides to secure

        // the connection then a <starttls> stanza should be received

        else if (!createSession(xpp.getNamespace(null))) {

            // No session was created because of an invalid namespace prefix so answer a stream

            // error and close the underlying connection

            StringBuilder sb = new StringBuilder(250);

            sb.append("<?xml version='1.0' encoding='");

            sb.append(CHARSET);

            sb.append("'?>");

            // Append stream header

            sb.append("<stream:stream ");

            sb.append("from=\"").append(serverName).append("\" ");

            sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");

            sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");

            sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");

            sb.append("version=\"1.0\">");

            // Include the bad-namespace-prefix in the response

            StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);

            sb.append(error.toXML());

            connection.deliverRawText(sb.toString());

            // Close the underlying connection

            connection.close();

            // Log a warning so that admins can track this cases from the server side

            Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +

                    xpp.getNamespace(null) + ". Connection: " + connection);

        }

    }

将相关信息转换为XMPP报文格式。

 

2.  createConnectionManagerListener();

 private void createConnectionManagerListener() {

        // Start multiplexers socket unless it's been disabled.

        if (isConnectionManagerListenerEnabled()) {

            // Create SocketAcceptor with correct number of processors

            multiplexerSocketAcceptor = buildSocketAcceptor();

            // Customize Executor that will be used by processors to process incoming stanzas

            ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");

            int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);

            ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();

            eventExecutor.setCorePoolSize(eventThreads + 1);

            eventExecutor.setMaximumPoolSize(eventThreads + 1);

            eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);

            multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);

            // Add the XMPP codec filter

            multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));

 

        }

    }

 

从 multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));中可以知道其报文处理是MultiplexerConnectionHandler。

MultiplexerConnectionHandler,ClientConnectionHandler,这个3个类都是继承与ConnectionHandler,下面主要讲解ConnectionHandler。

 

ConnectionHandler继承与IoHandlerAdapter,重写了IoHandlerAdapter的6个方法。

当客户端和服务器建立连接的时候会调用:

@Override

public void sessionOpened(IoSession session) throws Exception {

        // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.

        final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);

        session.setAttribute(XML_PARSER, parser);

        // Create a new NIOConnection for the new session

        final NIOConnection connection = createNIOConnection(session);

        session.setAttribute(CONNECTION, connection);

        session.setAttribute(HANDLER, createStanzaHandler(connection));

        // Set the max time a connection can be idle before closing it. This amount of seconds

        // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)

        // before disconnecting them (at 100% of the max idle time). This prevents Openfire from

        // removing connections without warning.

        final int idleTime = getMaxIdleTime() / 2;

        if (idleTime > 0) {

            session.setIdleTime(IdleStatus.READER_IDLE, idleTime);

        }

    }

//当有一个连接被打开的时候,就会初始化XMPP 的解析器,业务数据解析,以及会话空闲时间。

 

当客户端向服务端发起消息时会调用:

 @Override

public void messageReceived(IoSession session, Object message) throws Exception {

        // Get the stanza handler for this session

        StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);

        // Get the parser to use to process stanza. For optimization there is going

        // to be a parser for each running thread. Each Filter will be executed

        // by the Executor placed as the first Filter. So we can have a parser associated

        // to each Thread

        int hashCode = Thread.currentThread().hashCode();

        XMPPPacketReader parser = parsers.get(hashCode);

        if (parser == null) {

            parser = new XMPPPacketReader();

            parser.setXPPFactory(factory);

            parsers.put(hashCode, parser);

        }

        // Update counter of read btyes

        updateReadBytesCounter(session);

        //System.out.println("RCVD: " + message);

        // Let the stanza handler process the received stanza

        try {

            handler.process((String) message, parser);

        } catch (Exception e) {

            Log.error("Closing connection due to error while processing message: " + message, e);

            Connection connection = (Connection) session.getAttribute(CONNECTION);

            connection.close();

        }

    }

 

 public void process(String stanza, XMPPPacketReader reader) throws Exception {

 

        boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");

        if (!sessionCreated || initialStream) {

            if (!initialStream) {

                // Allow requests for flash socket policy files directly on the client listener port

                if (stanza.startsWith("<policy-file-request/>")) {

                    String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +

                            XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +

                            FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';

                    connection.deliverRawText(crossDomainText);

                    return;

                }

                else {

                    // Ignore <?xml version="1.0"?>

                    return;

                }

            }

            // Found an stream:stream tag...

            if (!sessionCreated) {

                sessionCreated = true;

                MXParser parser = reader.getXPPParser();

                parser.setInput(new StringReader(stanza));

                createSession(parser);

            }

            else if (startedTLS) {

                startedTLS = false;

                tlsNegotiated();

            }

            else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {

                startedSASL = false;

                saslSuccessful();

            }

            else if (waitingCompressionACK) {

                waitingCompressionACK = false;

                compressionSuccessful();

            }

            return;

        }

 

        // Verify if end of stream was requested

        if (stanza.equals("</stream:stream>")) {

            session.close();

            return;

        }

        // Ignore <?xml version="1.0"?> stanzas sent by clients

        if (stanza.startsWith("<?xml")) {

            return;

        }

        // Create DOM object from received stanza

        Element doc = reader.read(new StringReader(stanza)).getRootElement();

        if (doc == null) {

            // No document found.

            return;

        }

        String tag = doc.getName();

        if ("starttls".equals(tag)) {

            // Negotiate TLS

            if (negotiateTLS()) {

                startedTLS = true;

            }

            else {

                connection.close();

                session = null;

            }

        }

        else if ("auth".equals(tag)) {

            // User is trying to authenticate using SASL

            startedSASL = true;

            // Process authentication stanza

            saslStatus = SASLAuthentication.handle(session, doc);

        }

        else if (startedSASL && "response".equals(tag)) {

            // User is responding to SASL challenge. Process response

            saslStatus = SASLAuthentication.handle(session, doc);

        }

        else if ("compress".equals(tag)) {

            // Client is trying to initiate compression

            if (compressClient(doc)) {

                // Compression was successful so open a new stream and offer

                // resource binding and session establishment (to client sessions only)

                waitingCompressionACK = true;

            }

        }

        else {

            process(doc);

        }

    }

//此过程主要的工作是:

1.在调用此方法之前,会调用其编码解码器(XMPPDecoder),对来的报文进行解码。

2.获取XML解析对象XMPPPacketReader,统计接受的数据量,对数据进行解码。

3.根据不同的消息头做不同的处理

 

 

 

 

此篇文章就到此,稍后会有更多关于openfire的个人解读。

联系方式(qq):851392159

出处:http://buerkai.iteye.com

分享到:
评论

相关推荐

    Mina 框架研究与实现

    这一框架已被广泛应用于多个领域,如Red5项目中的RTMP协议实现、SubEtha.SMTP项目中的SMTP协议、Apache Qpid项目中的AMQP协议,以及Openfire项目中的XMPP协议等。这些实例充分展示了Mina框架在不同场景下的强大适用...

    MINA框架源码

    目前正在使用 MINA 的软件包括有:Apache Directory Project、AsyncWeb、AMQP(Advanced Message Queuing Protocol)、RED5 Server(Macromedia Flash Media RTMP)、ObjectRADIUS、Openfire 等等。

    目前最详尽的openfire介绍

    Openfire采用了Apache MINA框架来处理网络通信,MINA是一个强大的网络应用程序框架,旨在简化高性能和高可靠性网络应用的开发。MINA通过Java NIO提供跨不同传输类型(如TCP/IP、UDP/IP)的统一API,同时支持串口通讯...

    openfire处理消息流程及openfire详细信息

    Openfire采用Apache MINA框架来处理通信任务,MINA是一个高度灵活的网络应用程序框架,它基于Java NIO技术,提供了异步、事件驱动的API,适用于TCP/IP、UDP/IP等多种传输类型。MINA的特点包括: - 统一的API接口,...

    openfire综合介绍

    Openfire是一个开源的即时通讯(IM)服务器,它基于XMPP(Extensible Messaging and Presence Protocol)协议,该协议使用XML进行消息交换,以实现高度可扩展的通信。XMPP是一种标准化的协议,允许开发者构建即时...

    openfire消息机制和源码分析

    5. Apache MINA是一个提供高性能和高可靠性网络应用框架的Java库,它使用Java NIO框架来实现网络通信。MINA能够为TCP/IP、UDP/IP等不同类型的传输提供统一的事件驱动的异步API。它还提供了过滤器功能、多线程模型、...

    Openfire源码分析

    具体来说,Openfire中的`ConnectionHandler`类继承自MINA的`IoHandlerAdapter`类,负责客户端连接的创建与销毁,以及XML数据包的传递。此外,Openfire还根据连接类型的不同,进一步细分为`ClientConnectionHandler`...

    Mina状态机介绍和实例

    通过使用MINA框架可以可以省下处理底层I/O和线程并发等复杂工作,开发人员能够把更多的精力投入到业务设计和开发当中。MINA框架的应用比较广泛,应用的开源项目有Apache Directory、AsyncWeb、Apache Qpid、QuickFIX...

    OpenFire二次开发环境搭建

    通过以上详细介绍,我们可以看到OpenFire二次开发涉及的技术栈相当广泛,包括网络编程、Mina框架的应用以及XMPP协议的理解等多个方面。对于开发者而言,熟悉这些技术和概念是进行有效二次开发的基础。

    mina-1.1.7.tar(1).gz稳定版

    通过使用MINA框架可以可以省下处理底层I/O和线程并发等复杂工作,开发人员能够把更多的精力投入到业务设计和开发当中。MINA框架的应用比较广泛,应用的开源项目有Apache Directory、AsyncWeb、Apache Qpid、QuickFIX...

    openfire3.10.2缺少的jar包

    在使用Openfire的过程中,可能会遇到一些依赖库或jar包缺失的问题,这将导致服务无法正常启动或功能受限。标题提到的是"openfire3.10.2缺少的jar包",描述中指出这些jar包是最新的,日期为2015年10月27日。从提供的...

    openfire3.10.1源码编译缺少的jar包

    openfire3.10.1源码编译缺少的jar包: jetty-websocket-8.1.17.v20150415 jnsapi mina-core-1.1.7-sources.jar mina-filter-compression-1.1.7-sources.jar mina-filter-ssl-1.1.7-sources.jar mina-filter-ssl.jar ...

    openfire_3_10_2所有缺少的jar包整理

    《Openfire 3.10.2在Eclipse中的集成与缺失JAR包解析》 Openfire是一款开源的即时通讯服务器软件,它基于XMPP协议,为企业或组织提供了一个强大的、可扩展的通信平台。在将Openfire项目导入Eclipse进行开发时,可能...

    Openfire 性能优化

    Openfire 是一款基于 XMPP(Extensible Messaging and Presence Protocol)协议的即时通讯(IM)服务器,它使用 MINA 库的 Java NIO 实现,通常搭配 MySQL 数据库使用。默认情况下,Openfire 官方宣称可支持约 5000 ...

    Android XMPP 即时通讯 Openfire二次开发资料,非常齐

    Openfire 的通信处理基于 Apache MINA 框架实现,提供了一个通过 Java NIO 在不同的传输例如 TCP/IP 和 UDP/IP 上抽象的事件驱动的异步 API。 Openfire 中常见的类名后缀命名包括 Starter、Plugin、Listener、...

    openfire所需要的jar包

    在Openfire的运行环境中,Java Archive (JAR) 文件起着至关重要的作用,它们包含了必要的类库和资源,使得Openfire能够正常运行。本文将深入探讨"openfire所需要的jar包"以及与之相关的知识点。 首先,我们关注到的...

    openfire处理消息流程,及openfire详细信息.docx

    Apache MINA 是 Openfire 采用的网络应用程序框架,它为开发者提供了异步、事件驱动的网络编程接口,支持多种传输协议,如 TCP/IP、UDP/IP、串口通信以及虚拟机内部通信。MINA 的特性包括统一的 API、过滤器机制、...

    openfire综合介绍.doc

    在Openfire中,Apache MINA被用作网络应用程序框架,它提供了一种异步事件驱动的NIO(非阻塞I/O)机制,简化了高性能、高可靠性的网络应用开发。MINA支持多种传输协议,包括TCP/IP、UDP/IP,甚至串口通信,并且具备...

Global site tag (gtag.js) - Google Analytics