在openfire中与客户端之间的交互代码主要在org.jivesoftware.openfire.nio,org.jivesoftware.openfire.net这个2个包中。
当openfire启动的时候,会去加载连接管理中心ConnectionManagerImpl这个类,其中有几个方法值得去细细阅读,
private synchronized void createListeners() {
if (isSocketStarted || sessionManager == null || deliverer == null || router == null || serverName == null) {
return;
}
// Create the port listener for s2s communication
createServerListener(localIPAddress);
// Create the port listener for Connections Multiplexers
createConnectionManagerListener();
// Create the port listener for external components
createComponentListener();
// Create the port listener for clients
createClientListeners();
// Create the port listener for secured clients
createClientSSLListeners();
}
//这个地方只要是建立5个socket,下面介绍一下:
1. createServerListener,此主要是建立一个基于java.net.ServerSocket。
private void createServerListener(String localIPAddress) {
// Start servers socket unless it's been disabled.
if (isServerListenerEnabled()) {
int port = getServerListenerPort();
try {
serverSocketThread = new SocketAcceptThread(this, new ServerPort(port, serverName,
localIPAddress, false, null, ServerPort.Type.server));
ports.add(serverSocketThread.getServerPort());
serverSocketThread.setDaemon(true);
serverSocketThread.setPriority(Thread.MAX_PRIORITY);
}
catch (Exception e) {
System.err.println("Error creating server listener on port " + port + ": " +
e.getMessage());
Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
}
}
}
进入SocketAcceptThread会发现其真面目,
public class SocketAcceptThread extends Thread {
/**
* Holds information about the port on which the server will listen for connections.
*/
private ServerPort serverPort;
private SocketAcceptingMode acceptingMode;
public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)
throws IOException {
super("Socket Listener at port " + serverPort.getPort());
// Listen on a specific network interface if it has been set.
String interfaceName = JiveGlobals.getXMLProperty("network.interface");
InetAddress bindInterface = null;
if (interfaceName != null) {
if (interfaceName.trim().length() > 0) {
bindInterface = InetAddress.getByName(interfaceName);
// Create the new server port based on the new bind address
serverPort = new ServerPort(serverPort.getPort(),
serverPort.getDomainNames().get(0), interfaceName, serverPort.isSecure(),
serverPort.getSecurityType(), serverPort.getType());
}
}
this.serverPort = serverPort;
// Set the blocking reading mode to use
acceptingMode = new BlockingAcceptingMode(connManager, serverPort, bindInterface);
}
/**
* Retrieve the port this server socket is bound to.
*
* @return the port the socket is bound to.
*/
public int getPort() {
return serverPort.getPort();
}
/**
* Returns information about the port on which the server is listening for connections.
*
* @return information about the port on which the server is listening for connections.
*/
public ServerPort getServerPort() {
return serverPort;
}
/**
* Unblock the thread and force it to terminate.
*/
public void shutdown() {
acceptingMode.shutdown();
}
/**
* About as simple as it gets. The thread spins around an accept
* call getting sockets and handing them to the SocketManager.
*/
@Override
public void run() {
acceptingMode.run();
// We stopped accepting new connections so close the listener
shutdown();
}
}
class BlockingAcceptingMode extends SocketAcceptingMode {
private static final Logger Log = LoggerFactory.getLogger(BlockingAcceptingMode.class);
protected BlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
InetAddress bindInterface) throws IOException {
super(connManager, serverPort);
serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface);
}
/**
* About as simple as it gets. The thread spins around an accept
* call getting sockets and creating new reading threads for each new connection.
*/
@Override
public void run() {
while (notTerminated) {
try {
Socket sock = serverSocket.accept();
if (sock != null) {
Log.debug("Connect " + sock.toString());
SocketReader reader =
connManager.createSocketReader(sock, false, serverPort, true);
Thread thread = new Thread(reader, reader.getName());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
thread.start();
}
}
catch (IOException ie) {
if (notTerminated) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
ie);
}
}
catch (Throwable e) {
Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
}
}
}
}
//此处只要这个服务器,接受到一个请求,就会创建一个新的线程去解析相关数据,并做逻辑处理。
@Override
public void run() {
try {
socketReader.reader.getXPPParser().setInput(new InputStreamReader(
ServerTrafficCounter.wrapInputStream(socket.getInputStream()), CHARSET));
// Read in the opening tag and prepare for packet stream
try {
socketReader.createSession();
}
catch (IOException e) {
Log.debug("Error creating session", e);
throw e;
}
// Read the packet stream until it ends
if (socketReader.session != null) {
readStream();
}
}
catch (EOFException eof) {
// Normal disconnect
}
catch (SocketException se) {
// The socket was closed. The server may close the connection for several
// reasons (e.g. user requested to remove his account). Do nothing here.
}
catch (AsynchronousCloseException ace) {
// The socket was closed.
}
catch (XmlPullParserException ie) {
// It is normal for clients to abruptly cut a connection
// rather than closing the stream document. Since this is
// normal behavior, we won't log it as an error.
// Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
}
catch (Exception e) {
if (socketReader.session != null) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
socketReader.session, e);
}
}
finally {
if (socketReader.session != null) {
if (Log.isDebugEnabled()) {
Log.debug("Logging off " + socketReader.session.getAddress() + " on " + socketReader.connection);
}
try {
socketReader.session.close();
}
catch (Exception e) {
Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());
}
}
else {
// Close and release the created connection
socketReader.connection.close();
Log.debug(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());
}
socketReader.shutdown();
}
}
//createSession中进行报文的解析,
protected void createSession()
throws UnauthorizedException, XmlPullParserException, IOException {
XmlPullParser xpp = reader.getXPPParser();
for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
eventType = xpp.next();
}
// Check that the TO attribute of the stream header matches the server name or a valid
// subdomain. If the value of the 'to' attribute is not valid then return a host-unknown
// error and close the underlying connection.
System.out.println(reader.getXPPParser().toString());
String host = reader.getXPPParser().getAttributeValue("", "to");
if (validateHost() && isHostUnknown(host)) {
StringBuilder sb = new StringBuilder(250);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
// Append stream header
sb.append("<stream:stream ");
sb.append("from=\"").append(serverName).append("\" ");
sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");
sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
sb.append("version=\"1.0\">");
// Set the host_unknown error
StreamError error = new StreamError(StreamError.Condition.host_unknown);
sb.append(error.toXML());
// Deliver stanza
connection.deliverRawText(sb.toString());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this cases from the server side
Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +
". Connection: " + connection);
}
// Create the correct session based on the sent namespace. At this point the server
// may offer the client to secure the connection. If the client decides to secure
// the connection then a <starttls> stanza should be received
else if (!createSession(xpp.getNamespace(null))) {
// No session was created because of an invalid namespace prefix so answer a stream
// error and close the underlying connection
StringBuilder sb = new StringBuilder(250);
sb.append("<?xml version='1.0' encoding='");
sb.append(CHARSET);
sb.append("'?>");
// Append stream header
sb.append("<stream:stream ");
sb.append("from=\"").append(serverName).append("\" ");
sb.append("id=\"").append(StringUtils.randomString(5)).append("\" ");
sb.append("xmlns=\"").append(xpp.getNamespace(null)).append("\" ");
sb.append("xmlns:stream=\"").append(xpp.getNamespace("stream")).append("\" ");
sb.append("version=\"1.0\">");
// Include the bad-namespace-prefix in the response
StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);
sb.append(error.toXML());
connection.deliverRawText(sb.toString());
// Close the underlying connection
connection.close();
// Log a warning so that admins can track this cases from the server side
Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +
xpp.getNamespace(null) + ". Connection: " + connection);
}
}
将相关信息转换为XMPP报文格式。
2. createConnectionManagerListener();
private void createConnectionManagerListener() {
// Start multiplexers socket unless it's been disabled.
if (isConnectionManagerListenerEnabled()) {
// Create SocketAcceptor with correct number of processors
multiplexerSocketAcceptor = buildSocketAcceptor();
// Customize Executor that will be used by processors to process incoming stanzas
ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");
int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();
eventExecutor.setCorePoolSize(eventThreads + 1);
eventExecutor.setMaximumPoolSize(eventThreads + 1);
eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
// Add the XMPP codec filter
multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
}
}
从 multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));中可以知道其报文处理是MultiplexerConnectionHandler。
MultiplexerConnectionHandler,ClientConnectionHandler,这个3个类都是继承与ConnectionHandler,下面主要讲解ConnectionHandler。
ConnectionHandler继承与IoHandlerAdapter,重写了IoHandlerAdapter的6个方法。
当客户端和服务器建立连接的时候会调用:
@Override
public void sessionOpened(IoSession session) throws Exception {
// Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
session.setAttribute(XML_PARSER, parser);
// Create a new NIOConnection for the new session
final NIOConnection connection = createNIOConnection(session);
session.setAttribute(CONNECTION, connection);
session.setAttribute(HANDLER, createStanzaHandler(connection));
// Set the max time a connection can be idle before closing it. This amount of seconds
// is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
// before disconnecting them (at 100% of the max idle time). This prevents Openfire from
// removing connections without warning.
final int idleTime = getMaxIdleTime() / 2;
if (idleTime > 0) {
session.setIdleTime(IdleStatus.READER_IDLE, idleTime);
}
}
//当有一个连接被打开的时候,就会初始化XMPP 的解析器,业务数据解析,以及会话空闲时间。
当客户端向服务端发起消息时会调用:
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// Get the stanza handler for this session
StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
// Get the parser to use to process stanza. For optimization there is going
// to be a parser for each running thread. Each Filter will be executed
// by the Executor placed as the first Filter. So we can have a parser associated
// to each Thread
int hashCode = Thread.currentThread().hashCode();
XMPPPacketReader parser = parsers.get(hashCode);
if (parser == null) {
parser = new XMPPPacketReader();
parser.setXPPFactory(factory);
parsers.put(hashCode, parser);
}
// Update counter of read btyes
updateReadBytesCounter(session);
//System.out.println("RCVD: " + message);
// Let the stanza handler process the received stanza
try {
handler.process((String) message, parser);
} catch (Exception e) {
Log.error("Closing connection due to error while processing message: " + message, e);
Connection connection = (Connection) session.getAttribute(CONNECTION);
connection.close();
}
}
public void process(String stanza, XMPPPacketReader reader) throws Exception {
boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
if (!sessionCreated || initialStream) {
if (!initialStream) {
// Allow requests for flash socket policy files directly on the client listener port
if (stanza.startsWith("<policy-file-request/>")) {
String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
connection.deliverRawText(crossDomainText);
return;
}
else {
// Ignore <?xml version="1.0"?>
return;
}
}
// Found an stream:stream tag...
if (!sessionCreated) {
sessionCreated = true;
MXParser parser = reader.getXPPParser();
parser.setInput(new StringReader(stanza));
createSession(parser);
}
else if (startedTLS) {
startedTLS = false;
tlsNegotiated();
}
else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
startedSASL = false;
saslSuccessful();
}
else if (waitingCompressionACK) {
waitingCompressionACK = false;
compressionSuccessful();
}
return;
}
// Verify if end of stream was requested
if (stanza.equals("</stream:stream>")) {
session.close();
return;
}
// Ignore <?xml version="1.0"?> stanzas sent by clients
if (stanza.startsWith("<?xml")) {
return;
}
// Create DOM object from received stanza
Element doc = reader.read(new StringReader(stanza)).getRootElement();
if (doc == null) {
// No document found.
return;
}
String tag = doc.getName();
if ("starttls".equals(tag)) {
// Negotiate TLS
if (negotiateTLS()) {
startedTLS = true;
}
else {
connection.close();
session = null;
}
}
else if ("auth".equals(tag)) {
// User is trying to authenticate using SASL
startedSASL = true;
// Process authentication stanza
saslStatus = SASLAuthentication.handle(session, doc);
}
else if (startedSASL && "response".equals(tag)) {
// User is responding to SASL challenge. Process response
saslStatus = SASLAuthentication.handle(session, doc);
}
else if ("compress".equals(tag)) {
// Client is trying to initiate compression
if (compressClient(doc)) {
// Compression was successful so open a new stream and offer
// resource binding and session establishment (to client sessions only)
waitingCompressionACK = true;
}
}
else {
process(doc);
}
}
//此过程主要的工作是:
1.在调用此方法之前,会调用其编码解码器(XMPPDecoder),对来的报文进行解码。
2.获取XML解析对象XMPPPacketReader,统计接受的数据量,对数据进行解码。
3.根据不同的消息头做不同的处理
此篇文章就到此,稍后会有更多关于openfire的个人解读。
联系方式(qq):851392159
相关推荐
内容概要:本文详细介绍了基于MATLAB GUI界面和卷积神经网络(CNN)的模糊车牌识别系统。该系统旨在解决现实中车牌因模糊不清导致识别困难的问题。文中阐述了整个流程的关键步骤,包括图像的模糊还原、灰度化、阈值化、边缘检测、孔洞填充、形态学操作、滤波操作、车牌定位、字符分割以及最终的字符识别。通过使用维纳滤波或最小二乘法约束滤波进行模糊还原,再利用CNN的强大特征提取能力完成字符分类。此外,还特别强调了MATLAB GUI界面的设计,使得用户能直观便捷地操作整个系统。 适合人群:对图像处理和深度学习感兴趣的科研人员、高校学生及从事相关领域的工程师。 使用场景及目标:适用于交通管理、智能停车场等领域,用于提升车牌识别的准确性和效率,特别是在面对模糊车牌时的表现。 其他说明:文中提供了部分关键代码片段作为参考,并对实验结果进行了详细的分析,展示了系统在不同环境下的表现情况及其潜在的应用前景。
嵌入式八股文面试题库资料知识宝典-计算机专业试题.zip
嵌入式八股文面试题库资料知识宝典-C and C++ normal interview_3.zip
内容概要:本文深入探讨了一款额定功率为4kW的开关磁阻电机,详细介绍了其性能参数如额定功率、转速、效率、输出转矩和脉动率等。同时,文章还展示了利用RMxprt、Maxwell 2D和3D模型对该电机进行仿真的方法和技术,通过外电路分析进一步研究其电气性能和动态响应特性。最后,文章提供了基于RMxprt模型的MATLAB仿真代码示例,帮助读者理解电机的工作原理及其性能特点。 适合人群:从事电机设计、工业自动化领域的工程师和技术人员,尤其是对开关磁阻电机感兴趣的科研工作者。 使用场景及目标:适用于希望深入了解开关磁阻电机特性和建模技术的研究人员,在新产品开发或现有产品改进时作为参考资料。 其他说明:文中提供的代码示例仅用于演示目的,实际操作时需根据所用软件的具体情况进行适当修改。
少儿编程scratch项目源代码文件案例素材-剑客冲刺.zip
少儿编程scratch项目源代码文件案例素材-几何冲刺 转瞬即逝.zip
内容概要:本文详细介绍了基于PID控制器的四象限直流电机速度驱动控制系统仿真模型及其永磁直流电机(PMDC)转速控制模型。首先阐述了PID控制器的工作原理,即通过对系统误差的比例、积分和微分运算来调整电机的驱动信号,从而实现转速的精确控制。接着讨论了如何利用PID控制器使有刷PMDC电机在四个象限中精确跟踪参考速度,并展示了仿真模型在应对快速负载扰动时的有效性和稳定性。最后,提供了Simulink仿真模型和详细的Word模型说明文档,帮助读者理解和调整PID控制器参数,以达到最佳控制效果。 适合人群:从事电力电子与电机控制领域的研究人员和技术人员,尤其是对四象限直流电机速度驱动控制系统感兴趣的读者。 使用场景及目标:适用于需要深入了解和掌握四象限直流电机速度驱动控制系统设计与实现的研究人员和技术人员。目标是在实际项目中能够运用PID控制器实现电机转速的精确控制,并提高系统的稳定性和抗干扰能力。 其他说明:文中引用了多篇相关领域的权威文献,确保了理论依据的可靠性和实用性。此外,提供的Simulink模型和Word文档有助于读者更好地理解和实践所介绍的内容。
嵌入式八股文面试题库资料知识宝典-2013年海康威视校园招聘嵌入式开发笔试题.zip
少儿编程scratch项目源代码文件案例素材-驾驶通关.zip
小区开放对周边道路通行能力影响的研究.pdf
内容概要:本文探讨了冷链物流车辆路径优化问题,特别是如何通过NSGA-2遗传算法和软硬时间窗策略来实现高效、环保和高客户满意度的路径规划。文中介绍了冷链物流的特点及其重要性,提出了软时间窗概念,允许一定的配送时间弹性,同时考虑碳排放成本,以达到绿色物流的目的。此外,还讨论了如何将客户满意度作为路径优化的重要评价标准之一。最后,通过一段简化的Python代码展示了遗传算法的应用。 适合人群:从事物流管理、冷链物流运营的专业人士,以及对遗传算法和路径优化感兴趣的科研人员和技术开发者。 使用场景及目标:适用于冷链物流企业,旨在优化配送路线,降低运营成本,减少碳排放,提升客户满意度。目标是帮助企业实现绿色、高效的物流配送系统。 其他说明:文中提供的代码仅为示意,实际应用需根据具体情况调整参数设置和模型构建。
少儿编程scratch项目源代码文件案例素材-恐怖矿井.zip
内容概要:本文详细介绍了基于STM32F030的无刷电机控制方案,重点在于高压FOC(磁场定向控制)技术和滑膜无感FOC的应用。该方案实现了过载、过欠压、堵转等多种保护机制,并提供了完整的源码、原理图和PCB设计。文中展示了关键代码片段,如滑膜观测器和电流环处理,以及保护机制的具体实现方法。此外,还提到了方案的移植要点和实际测试效果,确保系统的稳定性和高效性。 适合人群:嵌入式系统开发者、电机控制系统工程师、硬件工程师。 使用场景及目标:适用于需要高性能无刷电机控制的应用场景,如工业自动化设备、无人机、电动工具等。目标是提供一种成熟的、经过验证的无刷电机控制方案,帮助开发者快速实现并优化电机控制性能。 其他说明:提供的资料包括详细的原理图、PCB设计文件、源码及测试视频,方便开发者进行学习和应用。
基于有限体积法Godunov格式的管道泄漏检测模型研究.pdf
嵌入式八股文面试题库资料知识宝典-CC++笔试题-深圳有为(2019.2.28)1.zip
少儿编程scratch项目源代码文件案例素材-几何冲刺 V1.5.zip
Android系统开发_Linux内核配置_USB-HID设备模拟_通过root权限将Android设备转换为全功能USB键盘的项目实现_该项目需要内核支持configFS文件系统
C# WPF - LiveCharts Project
少儿编程scratch项目源代码文件案例素材-恐怖叉子 动画.zip
嵌入式八股文面试题库资料知识宝典-嵌⼊式⼯程师⾯试⾼频问题.zip