- 浏览: 366028 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
cywhfe:
看完了关于quickfixj的文章,国内最服这一系列。lz高人 ...
QuickFix/J 源代码研究(三) -
leonana:
你好,这篇文档记录了很多实用的场景,非常有价值。(但显然有些相 ...
GIT基本概念和用法总结 -
wujyou:
太感谢了,刚好和我的问题完全一样
在MAC上查找和设置$JAVA_HOME -
huangfq:
总结的不错,但是我看到后面有点晕
GIT基本概念和用法总结 -
LIPENGYU:
有错误。JAVA_HOME=`/usr/libexec/jav ...
在MAC上查找和设置$JAVA_HOME
QuickFix/J 源代码研究(一)
guibin.beijing@gmail.com
(〇)QuickFix/J简介
FIX是Financial Information eXchange的简称。FIX是一种专门为实时电子证券交易设计的标准消息协议。FIX协议由FIX protocol, Ltd(FPL)所有并维护。FIX协议的网址为http://www.fixprotocol.org
QuickFix/J是实现了FIX协议所有版本及其功能的开源软件,100%使用JAVA实现。
QuickFix/J的网址为http://www.quickfixj.org
QuickFix/J的源代码可以从http://sourceforge.net/projects/quickfixj/files/QuickFIX_J 下载,也可以去QuickFix/J的官方网站,进入下载页面下载源代码。
那么能用QuickFix/J做什么事情呢?关注股票的兄弟们一定留意过LevelII这个名词,他是中国股票交易新行情的简称。简单说,可以将QuickFix/J的代码改造一下,就用来接受深圳证券交易的LevelII行情数据。当然接受行情不是免费的,需要诸多的商务手续,但是本文仅仅讨论QuickFix/J的开源代码的设计和实现,并且侧重于QuickFix/J的客户端实现。服务器端留在以后的文章介绍。关于上海证券交易所的LevelII数据的格式和接受,跟深圳的有诸多的不同,也留在以后讨论。
首先QuickFixJ代码功能主要有两大部分,一部分是Fix协议数据的解析,另外一部分是客户端跟服务器端建立连接并维持回话,传输数据。第一部分将主要介绍QuickFix/J的传输部分的实现。
(一) QuickFix/J传输功能部分
QuickFix/J的连接管理和传输功能是基于MINA框架实现的。MINA是什么?MINA是Apache旗下的一个网络应用框架,能够帮助大家轻松的开发高性能、高扩展性的网络程序。它使用NIO在传输协议(比如TCP/IP,UDP/IP)之上提供了抽象的、事件驱动的、异步处理的API。MINA的网址为http://mina.apache.org。
A). QuickFix/J客户端用到的主要类的功能说明(V1.5.0)
1. quickfix.Initiator:定义了一些从配置中获取通信协议、主机、端口及重连接的时间间隔的KEY,仅仅是key而已,没有其他的。
2. quickfix.mina.SessionConnector:定义了一些helper方法,为initiator和acceptor提供公用的功能,比如获取Session,创建Session,动态添加/删除Session,判断是否已经登陆。在SessionConnector中定义了SessionTimerTask,这个Timer的主要目的是例行检查和更新Session的状态,发现有问题及时操作。SessionTimerTask发现登陆状态错误之后能自动重连登陆、更新Session时间戳、发送心跳消息。检查的具体逻辑在Session.next()中,细节请参考5。
3. quickfix.mina.initiator.AbstractSocketInitiator:是SocketInitiator的基础抽象基类,继承了SessionConnector和Initiator。在QuickFix/J中提供了两种默认的具体实现,分别是SocketInitiator和ThreadedSocketInitiator。这两种具体实现的功能都一样,两者的区别仅仅是处理消息时使用线程的策略不同,具体请参考7和8。抽象类AbstractSocketInitiator提供的功能有:
a) 遍历配置文件取得所有[session]节的配置并创建相应的FixSession (如果[session]中没有指定ConnectionType或者明确指定了ConnectionType为initiator,则建立FixSession(quickfix.Session),其他类型的ConnectionType无效,如acceptor)。配置文件中可以指定多个[session]。
b) 通过已经生成的FixSession和传入的eventHandlingStrategy创建IoSessionInitiator,并保存入initiators(Set类型的缓存)中。
那么FixSession和IoSessionInitiator有什么区别呢?请参考5、6。
c) 启动、关闭客户端(initiator)。启动initiator时首先启动应用层的SessionTimer(请参考2),然后启动连接层的initiator(IoSessionInitiator)。关闭initiator时,先关闭连接层的initiator(IoSessionInitiator),再关闭应用层的SessionTimer。
4. quickfix.SessionID:是Session的唯一标识。SessionID中包含beginString(必须),senderCompID(必须),senderSubID (可选),senderLocationID(可选),targetCompID(必须),targetSubID(可选),targetSubID(可选),targetLocationID(可选),sessionQualifier(可选)。sessionQualifer用于区分具有相同的targetCompID不同的session,只能用在initiator角色中。SessionID.toString生成的可读的Session ID字符串组成为:beginString:senderCompID/senderSubID/senderLocationID->targetCompID/targetSubID/targetLocationID/sessionQualifier。如果可选值未设置则在Session ID字符串中默认空字符串。
5. quickfix.Session:Session是FIX消息通讯中最基本的抽象。
a) fixSession维护Session内部消息的自增序列号、自动错误恢复、与通信对方(counterpart)建立通信信道(communication channel)。
b) Session是独立于特定的传输层协议的。Session被新建时,消息序列号置为1,每次通信序列号自增,直到Session被重置(reset)。每个Session能够跨越多个传输连接(并非同时跨越,而是说第一次网络连接断开后,随后重连,虽然底层的网络连接已经是新建的了,但是Session还能保持跟断网之前是同一个Session)。
c) fixSession中核心逻辑在next()方法中。
quickfix.Session.next():主要检查了以下8个状态,并且根据不同的状态做了响应的操作。next()在SessionTimerTask中被定时执行,作为例行检查Session状态的任务。
1) 检查Session是否是enabled。enabled状态表示什么?enable是Session中记录当前是否处于登陆状态。当执行了logon时将enabled置为true,当logout时将enabled置为false。
如果不是enabled状态并且不是loggedOn状态,则说明正常退出登陆。结束此次检查并返回。
如果不是enabled状态,但是是loggedOn状态说明Session上次非正常logout或者logout消息已经发出,正在返回的途中,那么就检查是否isLogoutSent,如果还没有发送logout,那么现在就发送logout消息。
2) 检查完Session的enabled状态之后,检查checkSessionTime。没有必要接收到每条消息都去检查Session的时间戳,顶多1秒检查一次就可以了,否则容易影响性能。如果checkSessionTime失败,则reset Session。(reset时使用了java.util.concurrent.automatic. AutomicBoolean 作为锁,使得reset成为一个原子操作)reset做了什么事情?首先回调用户接口onBeforeSessionReset,然后生成logout消息,并在state的中记录logoutSent的状态为true,然后关闭I/O连接,关闭I/O连接之后,设置一系列state的状态。最后resetState(将messageStore重置)。
3) 检查hasResponder,如果没有responder,则返回。关于responder的解释,请参考15。
4) 检查isLogonReceived,只有receivedLogon才算真正处于登陆状态。如果没有收到logon响应,检查state看是否需要发送logon消息(对Initiator而言,如果还未发logon则需要发送logon消息)如果需要发送logon消息,检查此时是否可以发送logon消息(为了防止不断重复发送logon,对发送logon消息有频率限制),如果可以发送则回调用户接口canLogon,判断用户是否允许发送logon消息,如果不允许则直接返回;如果允许则generateLogon并发送。发送完毕logon消息之后,返回。
5) 检查getHeartBeatInterval是否为0。看到这里,发现了一个QuickFix/J的bug,搜索了全部代码,没有发现作为客户端的Initiator在任何地方设置了state的heartBeatInterval。而作为服务器端的Acceptor在quickfix.mina.acceptor.AcceptorIoHandler.processMessage中设置了state的heartBeatInterval,代码如下:
因此在写客户化的Initiator时,记得fix这个bug,在客户端收到了来自服务器端Logon消息响应之后,从中取出HeartBtInt字段的值,设置到fixSession的heartBeatInterval中去。设置可以参考上述代码。
6) 检查state isLogoutTimedOut(logout消息已经发出去,并且Session已经超时,说明客户端确实已经收到了服务器端发送的Logon消息,并且客户端已经logout了),则断开I/O连接。
7) 检查state isWithinHeartBeat。目前发现QuickFix/J并没有设置state.withinHeartBeat,始终使用的是默认初始值false,不知道为何?既然这个检查始终返回false,则不会返回,继续下一步检查。
8) 检查state isTimedOut(如果距离上次收到HeartBeat已经超过了2.4*HeartBeat秒,则认为已经TimeOut)。如果已经timeOut,没有取消HeartBeatCheck的话,则断开I/O连接,然后回调用户的onHeartBeatTimeout接口。如果仍然未timeout,则根据需要发送TestRequest或者Heartbeat消息。
STEP1.0.0中的Logon消息响应中,还给出了MaxFailInt,即允许心跳最大失败次数,超过此次数之后便认为对方已经超时,可以断开连接,准备重新登陆。为Session设置MaxFailInt可以将取到的MaxFailInt乘以HeartBtInt乘以1000换算成秒,然后为Session添加API,使得能够对state的isTimedOut属性进行操作设置,然后在在此判断timeout。
首先为quickfix.SessionState添加一个属性
然后添加一个setTimeout一个getTimeout,然后为Session添加响应的setter和getter。最后在收到了Logon消息的响应时调用Session的setTimeout设置超时时间。
quickfix.Session.next(message):
首先检查SessionTime,如果超过1秒未刷新则刷新时间戳;如果发现Session不存在,则执行reset,重置Session,并返回。
然后检查消息的header中的beginString,如果和SessionID中记录的beginString不匹配则抛异常,返回。
然后检查msgType,如果是Logon消息,根据版本的不同,设置Session中的targetDefaultApplVerID。targetDefaultApplVerID有什么用处呢?因为FIXT协议和之前版本的FIX协议的ApplVerID有不同的组成格式,这个targetDefaultApplVerID主要是给quickfix.MessageCracker用来解析FIXT消息的。
然后检查dataDictionaryProvider, 根据customApplVerID和applVerID取到数据字典(applicationDataDictionary及sessionDataDictionary,细节请参考16)。
然后根据取到的字典验证message是否合法,不合法的根据情况打印警告日志或者直接抛出异常退出。
然后根据消息类型,分别做相应的状态设置(通过nextLogon,nextHeartBeat,nextTestRequest,nextSequenceReset,nextLogout),然后调用用户call back API(在verify中)。
处理完毕该消息之后,调用nextQueued()递归处理下一个队列中的消息。
请注意verify(message)函数,所有的普通消息通过这个函数去回调application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。关于消息的解析,其中普通Message是通过quickfix.MessageUtils.parse将String类型的消息解析成Message。
next(message)被SingleThreadedEventHandlingStrategy或者ThreadPerSessionEventHandlingStrategy处理每一条收到的消息时调用,是处理收到消息的源头。
quickfix.Session.next(string):就是next(message)的重载函数,将输入的字符串解析成Message,然后调用next(message)处理。
quickfix.Session.verify(msg, checkTooHigh, checkTooLow):收到的消息的MsgSeqNum与预期的不一样时所做的状态检查和操作。操作比如,如果发现丢包,则自动重发ResendRequest请求或者仅仅记录报警日志;如果发现重复MsgSeqNum,如果消息中设置了可能重复标志,则正常返回,否则说明消息有误,则Logout,disconnect。
quickfix.Session.verify(msg):verify(msg, checkTooHigh, checkTooLow)的重载函数。根据是否是Admin类型的消息及从配置中读取的checkGapFieldOnAdminMessage确定checkTooHigh和checkTooLow这两个标志。如果不是Admin消息则checkTooXX,如果checkGapFieldOnAdminMessage为真则checkTooXX。
quickfix.Session.doTargetTooHigh(message):收到的消息MsgSeqNum比预期的大,即丢失了消息,先检查是否需要reset或者disconnect(至于是否需要reset或者disconnect由创建Session时通过构造函数的参数指定,默认均是不需要,也可以通过指定配置ResetOnError和DisconnectOnError),如果需要则reset或者disconnect后返回。如果不需要,则将这条消息入队(enqueue)存入state中,然后生成ResendRequest并发送出去,请求从expectedTargetNum开始到此消息前的所有消息。由此可见,在state的messageQueue中存储的消息都是因为发现消息序号跳跃缺失消息后,临时存储的跳跃之后新消息。而这些临时存储在state中的消息,会在收到并处理完每种消息之后都会去检查并处理,比如next(msg),nextHeartBeat(msg),nextLogon(msg),nextReject(msg),nextTestRequest(msg)这些处理过程的最后都会检查并处理state临时存储的消息。
quickfix.Session.doTargetTooLow(message):收到的消息MsgSeqNum比预期的小,即收到了重复消息,应该做的事情。
quickfix.Session.parseMessage(stringData):将输入的字符串数据解析成Message。实际是调用quickfix.parse(session, stringData)解析的。
quickfix.Session.send(String messageString):真正通过底层Mina发送消息给对方的API。在QuickFix/J中,send(msgString)通过responder发送消息,responder最终调用了Mina的IoSession.write(object)将数据写入网络中。
responder是通过quickfix.Session.setResponder(Responder responder)在quickfix.mina.initiator.InitiatorIoHandler中创建Session时设置的。关于responder的细节请参考15。
quickfix.Session.sendRaw(message, int num):
Session内部的helper方法,首先将传入的消息和该消息的MsgSeqNum拼装好,把消息头中与系统相关的参数设置好,参数比如BeginString,SenderCompID,TargetCompID,SendingTime。
然后根据消息类型回调响应的用户call back API,如果是Admin类型的消息,回调用户的toAdmin,非Admin类型的消息,回调用户的toApp。如果是LOGON消息,还要对是否重置SeqNum做相应的处理(具体处理过程是:如果消息中有ResetSeqNumFlag这个字段,并且这个字段被设置true则重置。如果重置,则首先resetState,取到期望的seqNum设置到消息头的MsgSeqNum字段中。无论是否重置,都将resetSeqNumFlag写入state中)。
最后把拼装好的message调用toString转换成字符串,然后调用send(string)方法真正的将消息发送到网络上去。
整个处理过程需要锁定SenderMsgSeqNum,直到全部操作完成返回。因为如果call back调用失败,则roll back会更高效。
sendRaw被Session中generateXXX调用,比如generateHeartbeat,generateLogon,generateLogout,generateReject,generateBusinessReject,generateResendRequest,generateSequenceReset,generateTestRequest。这些generateXXX中生成了相应消息之后,最后调用sendRaw把消息通过网络发送出去。
quickfix.Session.send(Message message):做了两件事,首先去掉消息头中的PossDupFlag和OrigSendingTime,然后调用sendRaw把消息发送到网络上去(设置seqNum为0)。主意send(message)返回的boolean并不能说明消息是否成功发送出去,仅仅是说明消息已经成功放入发送队列了,因为默认情况下QuickFix/J使用异步IO发送网络数据。
6. quickfix.mina.initiator.IoSessionInitiator:使用MINA提供的传输层的API,建立、维护同服务器之间的传输层的网络连接,而不是应用层的网络连接。这些网络功能都在一个叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的一个私有的TimerTask中实现。具体实现功能有连接(包括普通连接和加密SSL连接)、重连、判断是否应该重连、处理连接异常、启动和停止ConnectTask。
7. quickfix.SocketInitiator:使用单独的线程去为所有的Session处理消息。SocketInitiator提供的功能有:
a) 初始化,即用eventHandlingStrategy创建Initiator,然后注册此SocketInitiator所管理的全部Session,然后启动Initiator,最后调用eventHandlingStrategy.blockInThread()在另外的后台线程中去处理SessionTimer收到的插入队列的消息。启动Initiator做的事情依次是:先启动SessionTimer去监听从传输层过来的消息,如果没有Logon则先Logon,然后在收到消息后回调用户代码处理消息;启动reconnectTask去建立和维护传输层的网络连接。
b) 启动Initiator,在另外的线程中后台(Daemon)处理消息。
c) 阻塞Initiator,在同一线程中处理消息。
d) 停止Initiator。分为强制停止和非强制停止。强制或者非强制Logout所有FixSession,停止连接层的Initiator,取消注册所有此SocketInitiator所管理的全部Session。
e) 关于a) b)如何处理来自底层的消息的逻辑,请参考11和12。因为这里所谓的处理消息实际上是直接或者间接调用了SingleThreadedEventHandlingStrategy的block处理消息。
8. quickfix.ThreadedSocketInitiator:为每一个Session使用一个单独的线程去处理消息。功能参考7。除了线程工作模式不一样,功能和7完全一样。
9. quickfix.SessionState: Session和对方通信过程中使用的helper类。主要功能就是存储了Session的所有状态,并且提供了响应API访问这些状态。状态包括heartBeatInterval,heartBeatMillis,是否需要heartBeat,判断Session所在应用程序的角色(客户端Initiator or 服务器端Acceptor),lastReceivedTime,lastSentTime,获取logger,判断作为客户端的Initiator登陆消息是否已经发出,判断作为服务器端的Acceptor登陆消息是否收到,判断是否需要登陆,判断登陆是否TimeOut,messageStore,testRequestCounter,判断是否需要TestRequest,判断是否处于TimeOut状态,将收到的Message入队(enqueue),出队(dequeue),锁定和解锁发送/接受的Sequence Number,获取、设置自增的下一个Sequence Number,重置(即将Sequence清空,重新从1开始计数),设置、获取Logout的原因。
10. quickfix.mina.EventHandlingStrategy:用于不同版本FIX协议处理事件的策略的接口,是应用级处理消息回调接口的根源。当传输层消息到达时调用此接口onMessage,可以这么理解,onMessage是EventHandlingStrategy的输入,这个输入来自底层。getSessionConnector获取和这个策略相关的SessionConnector,即获取和这个Session相关的Initiator/Acceptor去处理响应的输入消息,一般情况下是逐层将消息向上层传出,回调用户的函数处理该消息。getQueueSize获取当前被处理消息队列的长度。EventHandlingStrategy一般作为AbstractSocketInitiator的成员,建立IoSessionInitiator时传给IoSessionInitiator,请参考3 b)。目前在QuickFix/J中有两个具体实现,分别是quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。这两个具体实现类的说明请参考11,12。
11. quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J处理消息的核心类。处理消息时即便有多个Session也使用单线程模式。
a) 为了不阻塞输入,那么就需要一个eventQueue来临时快速的存储收到的所有消息。
b) onMessage接到底层传入的消息包装成SessionMessageEvent,首先将其存入eventQueue。
c) 那么SessionMessageEvent里面有什么?SessionEvent仅仅是把fixSession和Message包装到一起,并且提供了处理Message的方法processMessage。可以这样理解,onMessage是事件处理策略底层Message的输入,SessionEvent中的processMessage是Message的输出,未来被应用程序级别的函数处理。
d) getSessionConnector获取需要处理应用级的connector以便处理eventQueue中的消息。
e) getMessage从eventQueue中取出SessionMessageEvent待处理。
f) block就是应用程序级别处理消息的入口。block判断HandlingMessage是否应该继续运新,如果是则从消息队列中取出SessionMessageEvent,调用其中的processMessage去处理该Message。
g) processMessage如何处理了收到的消息呢?它会调用fixSession的next方法,将消息传给Session,由fixSession再接力将消息回调到用户手中。请参考5。
h) 也许你会注意到处理消息的block在run中始终被调用,而且没有任何sleep时间,难道它在没有消息的时候始终不停的死循环运行且丝毫不休息?CPU会保持100%?实际上效果不是这样的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在没有消息的时候,这个循环会每休息一秒再执行下一次循环。如何做到这样的效果呢?原因是如果eventQueue中如果没有消息,而该eventQueue设置了阻塞超时1000毫秒,则取消息的操作会等待最多1000毫秒,如果没有等到消息则超时退出不再等待,执行完毕本次循环,如果等到了则按照正常流程处理消息。这样做最大的好处就是,如果eventQueue中有事件,那么就会连续不断的处理,如果没有消息,就会休息timeout毫秒再查看。
i) blockInThread,在新启动的后台线程中处理SessionMessageEvent
12. quickfix.mina.ThreadPerSessionEventHandlingStrategy:同样是QuickFix/J处理消息的核心类。和单线程模式不同的是该策略会为每个session启动一个新的线程去处理消息。
a) 由于是每个Session对应一个线程,因此该策略内部需要一个称之为dispatchersMap作为缓存为每个Session保存响应的处理线程(MessageDispatchingThread)引用。
b) 当onMessage收到来自底层的输入消息时,根据输入的fixSession从dispatchers中取到相应的处理线程,并将该消息加入(enqueue)到该线程内部的消息队列中待处理。
c) 每个dispatcher(MessageDispatchingThread类型)内部均维护了自己的消息队列,和单线程模式不同在于,消息队列中的消息仅仅是Message,不是SessionMessageEvent。处理Message的逻辑从单线程中的SessionMessageEvent中移出到dispatcher中。
13. quickfix.DataDictionaryProvider:是一个接口,为指定的session protocol或者application version提供数据字典。getSessionDataDictionary根据提供的beginString 即协议版本获取相应的数据字典。getApplicationDataDictionary根据提供的application version ID和custom application ID获取数据字典。application version ID在FIXT.1.1之前由BeginString字段确定。custom application ID是可选值,不是必须的。
14. quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默认实现。在DefaultDataDictionaryProvider中,有两种数据字典,一种是传输用的数据字典,一种是应用程序用的数据字典,分别缓存在两个Map中。这个DefaultDataDictionaryProvider是在创建Session时由默认的DefaultSessionFactory根据beginString创建的。addApplicationDictionary和addTransportDictionary分别用于向DefaultDataDictionaryProvider添加新的数据字典。目前QuickFix/J的实现中,在DefaultSessionFactory中初始化Session时添加字典。对于fixt之前版本的数据字典,每个数据字典会被同时添加进入到传输数据字典和应用程序数据字典中。
15. quickfix.Responder:这是个接口,供Session使用,用于发送原始FIX消息(raw FIX message)或者切断和对方的I/O连接。send(stringData)发送原始的FIX消息,disconnect()切断底层的连接,getRemoteIPAddress()提供对方的IP。目前的QuickFix/J中quickfix.mina.IoSessionResponder实现了Responder接口。IoSessionResponder.send(string)调用了Mina的IoSession.write(object)将数据写入网络中。默认是异步写入。(如果同步写入在创建responder时显示传入synchronousWrites为true。同步写入实际是write(object)之后,取到ioSession的WriteFuture,然后向WriteFuture发送join指令。)
16. quickfix.DataDictionary:为不同版本的FIX协议提供消息的元数据(metadata),提供helper方法帮助判断field类型,判断消息的类型,判断组、组件的类型,检查message、field,field等。
总共有3种方式生成DataDictionary,首先可以通过一个系统路径读入配置文件生成,或者载入inputStream生成,或者从已有的DataDictionary拷贝生成。
DataDictionary中最复杂的方法就是load,思想是根据fixXXX.xml中的定义,判断xml文件是否合法,然后根据DTD取到各个section中的信息并存入预先定义好的Map、List、Set中,便于之后的Helper方法能够快速准确的搜索和判断相关信息。
17. quickfix.CompositeLog:将日志信息输出到多个logger中。多个logger在new CompositeLog(Log[] logs)时传入。但是多个logger没有优先级,也不能控制多个logger,仅仅是日志来了,同时写入这多个logger中。CompositeLog由CompositeLogFactory创建。
QuickFix/J中其他的logger还有:
quickfix.ScreenLog,将日志通过System.out打印到屏幕,相应的有quickfix.ScreenLogFactory。
quickfix.FileLog,将日志写入本地文件中。相应的有quickfix.FileLogFactory。
quickfix.JdbcLog,将日志写入数据库中。相应的有quickfix.JdbcLogFactory。
quickfix.SLF4JLog,使用SLF4J wrapper写日志,SLF4J支持JDK logging,log4j等。
另外多数Log的实现都extends了quickfix.AbstractLog。AbstractLog的作用是添加了对HeartBeat是否记录日志的开关。
B). 网络数据在QuickFix/J中的流向
ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服务器端的通信。收到网络数据,ioConnector触发相应事件,并把事件交给ioHandler(InitiatorIoHandler)的processMessage -> processMessage中调用eventHandlingStrategy.onMessage(quickfixSession, message) ,将消息向外回调 -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 将收到的消息入队(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中启动单独的后台线程,依次从eventQueue取出消息处理,向session回调 -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根据msgType判断回调 ->逐层回调 (verify -> veriry -> fromCallback -> fromAdmin/fromApp),从fromAdmin/fromApp(msg, sessionID)回调用户处理逻辑。
Guibin
2010-12-30
能不能介绍一下!
等我把核心内容整理完毕之后会及时补充周边的介绍性内容和背景知识的。
能不能介绍一下!
guibin.beijing@gmail.com
(〇)QuickFix/J简介
FIX是Financial Information eXchange的简称。FIX是一种专门为实时电子证券交易设计的标准消息协议。FIX协议由FIX protocol, Ltd(FPL)所有并维护。FIX协议的网址为http://www.fixprotocol.org
QuickFix/J是实现了FIX协议所有版本及其功能的开源软件,100%使用JAVA实现。
QuickFix/J的网址为http://www.quickfixj.org
QuickFix/J的源代码可以从http://sourceforge.net/projects/quickfixj/files/QuickFIX_J 下载,也可以去QuickFix/J的官方网站,进入下载页面下载源代码。
那么能用QuickFix/J做什么事情呢?关注股票的兄弟们一定留意过LevelII这个名词,他是中国股票交易新行情的简称。简单说,可以将QuickFix/J的代码改造一下,就用来接受深圳证券交易的LevelII行情数据。当然接受行情不是免费的,需要诸多的商务手续,但是本文仅仅讨论QuickFix/J的开源代码的设计和实现,并且侧重于QuickFix/J的客户端实现。服务器端留在以后的文章介绍。关于上海证券交易所的LevelII数据的格式和接受,跟深圳的有诸多的不同,也留在以后讨论。
首先QuickFixJ代码功能主要有两大部分,一部分是Fix协议数据的解析,另外一部分是客户端跟服务器端建立连接并维持回话,传输数据。第一部分将主要介绍QuickFix/J的传输部分的实现。
(一) QuickFix/J传输功能部分
QuickFix/J的连接管理和传输功能是基于MINA框架实现的。MINA是什么?MINA是Apache旗下的一个网络应用框架,能够帮助大家轻松的开发高性能、高扩展性的网络程序。它使用NIO在传输协议(比如TCP/IP,UDP/IP)之上提供了抽象的、事件驱动的、异步处理的API。MINA的网址为http://mina.apache.org。
A). QuickFix/J客户端用到的主要类的功能说明(V1.5.0)
1. quickfix.Initiator:定义了一些从配置中获取通信协议、主机、端口及重连接的时间间隔的KEY,仅仅是key而已,没有其他的。
2. quickfix.mina.SessionConnector:定义了一些helper方法,为initiator和acceptor提供公用的功能,比如获取Session,创建Session,动态添加/删除Session,判断是否已经登陆。在SessionConnector中定义了SessionTimerTask,这个Timer的主要目的是例行检查和更新Session的状态,发现有问题及时操作。SessionTimerTask发现登陆状态错误之后能自动重连登陆、更新Session时间戳、发送心跳消息。检查的具体逻辑在Session.next()中,细节请参考5。
3. quickfix.mina.initiator.AbstractSocketInitiator:是SocketInitiator的基础抽象基类,继承了SessionConnector和Initiator。在QuickFix/J中提供了两种默认的具体实现,分别是SocketInitiator和ThreadedSocketInitiator。这两种具体实现的功能都一样,两者的区别仅仅是处理消息时使用线程的策略不同,具体请参考7和8。抽象类AbstractSocketInitiator提供的功能有:
a) 遍历配置文件取得所有[session]节的配置并创建相应的FixSession (如果[session]中没有指定ConnectionType或者明确指定了ConnectionType为initiator,则建立FixSession(quickfix.Session),其他类型的ConnectionType无效,如acceptor)。配置文件中可以指定多个[session]。
b) 通过已经生成的FixSession和传入的eventHandlingStrategy创建IoSessionInitiator,并保存入initiators(Set类型的缓存)中。
那么FixSession和IoSessionInitiator有什么区别呢?请参考5、6。
c) 启动、关闭客户端(initiator)。启动initiator时首先启动应用层的SessionTimer(请参考2),然后启动连接层的initiator(IoSessionInitiator)。关闭initiator时,先关闭连接层的initiator(IoSessionInitiator),再关闭应用层的SessionTimer。
4. quickfix.SessionID:是Session的唯一标识。SessionID中包含beginString(必须),senderCompID(必须),senderSubID (可选),senderLocationID(可选),targetCompID(必须),targetSubID(可选),targetSubID(可选),targetLocationID(可选),sessionQualifier(可选)。sessionQualifer用于区分具有相同的targetCompID不同的session,只能用在initiator角色中。SessionID.toString生成的可读的Session ID字符串组成为:beginString:senderCompID/senderSubID/senderLocationID->targetCompID/targetSubID/targetLocationID/sessionQualifier。如果可选值未设置则在Session ID字符串中默认空字符串。
5. quickfix.Session:Session是FIX消息通讯中最基本的抽象。
a) fixSession维护Session内部消息的自增序列号、自动错误恢复、与通信对方(counterpart)建立通信信道(communication channel)。
b) Session是独立于特定的传输层协议的。Session被新建时,消息序列号置为1,每次通信序列号自增,直到Session被重置(reset)。每个Session能够跨越多个传输连接(并非同时跨越,而是说第一次网络连接断开后,随后重连,虽然底层的网络连接已经是新建的了,但是Session还能保持跟断网之前是同一个Session)。
c) fixSession中核心逻辑在next()方法中。
quickfix.Session.next():主要检查了以下8个状态,并且根据不同的状态做了响应的操作。next()在SessionTimerTask中被定时执行,作为例行检查Session状态的任务。
1) 检查Session是否是enabled。enabled状态表示什么?enable是Session中记录当前是否处于登陆状态。当执行了logon时将enabled置为true,当logout时将enabled置为false。
如果不是enabled状态并且不是loggedOn状态,则说明正常退出登陆。结束此次检查并返回。
如果不是enabled状态,但是是loggedOn状态说明Session上次非正常logout或者logout消息已经发出,正在返回的途中,那么就检查是否isLogoutSent,如果还没有发送logout,那么现在就发送logout消息。
2) 检查完Session的enabled状态之后,检查checkSessionTime。没有必要接收到每条消息都去检查Session的时间戳,顶多1秒检查一次就可以了,否则容易影响性能。如果checkSessionTime失败,则reset Session。(reset时使用了java.util.concurrent.automatic. AutomicBoolean 作为锁,使得reset成为一个原子操作)reset做了什么事情?首先回调用户接口onBeforeSessionReset,然后生成logout消息,并在state的中记录logoutSent的状态为true,然后关闭I/O连接,关闭I/O连接之后,设置一系列state的状态。最后resetState(将messageStore重置)。
3) 检查hasResponder,如果没有responder,则返回。关于responder的解释,请参考15。
4) 检查isLogonReceived,只有receivedLogon才算真正处于登陆状态。如果没有收到logon响应,检查state看是否需要发送logon消息(对Initiator而言,如果还未发logon则需要发送logon消息)如果需要发送logon消息,检查此时是否可以发送logon消息(为了防止不断重复发送logon,对发送logon消息有频率限制),如果可以发送则回调用户接口canLogon,判断用户是否允许发送logon消息,如果不允许则直接返回;如果允许则generateLogon并发送。发送完毕logon消息之后,返回。
5) 检查getHeartBeatInterval是否为0。看到这里,发现了一个QuickFix/J的bug,搜索了全部代码,没有发现作为客户端的Initiator在任何地方设置了state的heartBeatInterval。而作为服务器端的Acceptor在quickfix.mina.acceptor.AcceptorIoHandler.processMessage中设置了state的heartBeatInterval,代码如下:
qfSession.setHeartBeatInterval(heartbeatInterval);
因此在写客户化的Initiator时,记得fix这个bug,在客户端收到了来自服务器端Logon消息响应之后,从中取出HeartBtInt字段的值,设置到fixSession的heartBeatInterval中去。设置可以参考上述代码。
6) 检查state isLogoutTimedOut(logout消息已经发出去,并且Session已经超时,说明客户端确实已经收到了服务器端发送的Logon消息,并且客户端已经logout了),则断开I/O连接。
7) 检查state isWithinHeartBeat。目前发现QuickFix/J并没有设置state.withinHeartBeat,始终使用的是默认初始值false,不知道为何?既然这个检查始终返回false,则不会返回,继续下一步检查。
8) 检查state isTimedOut(如果距离上次收到HeartBeat已经超过了2.4*HeartBeat秒,则认为已经TimeOut)。如果已经timeOut,没有取消HeartBeatCheck的话,则断开I/O连接,然后回调用户的onHeartBeatTimeout接口。如果仍然未timeout,则根据需要发送TestRequest或者Heartbeat消息。
STEP1.0.0中的Logon消息响应中,还给出了MaxFailInt,即允许心跳最大失败次数,超过此次数之后便认为对方已经超时,可以断开连接,准备重新登陆。为Session设置MaxFailInt可以将取到的MaxFailInt乘以HeartBtInt乘以1000换算成秒,然后为Session添加API,使得能够对state的isTimedOut属性进行操作设置,然后在在此判断timeout。
首先为quickfix.SessionState添加一个属性
private long timeoutMs = 24000L;
然后添加一个setTimeout一个getTimeout,然后为Session添加响应的setter和getter。最后在收到了Logon消息的响应时调用Session的setTimeout设置超时时间。
quickfix.Session.next(message):
首先检查SessionTime,如果超过1秒未刷新则刷新时间戳;如果发现Session不存在,则执行reset,重置Session,并返回。
然后检查消息的header中的beginString,如果和SessionID中记录的beginString不匹配则抛异常,返回。
然后检查msgType,如果是Logon消息,根据版本的不同,设置Session中的targetDefaultApplVerID。targetDefaultApplVerID有什么用处呢?因为FIXT协议和之前版本的FIX协议的ApplVerID有不同的组成格式,这个targetDefaultApplVerID主要是给quickfix.MessageCracker用来解析FIXT消息的。
然后检查dataDictionaryProvider, 根据customApplVerID和applVerID取到数据字典(applicationDataDictionary及sessionDataDictionary,细节请参考16)。
然后根据取到的字典验证message是否合法,不合法的根据情况打印警告日志或者直接抛出异常退出。
然后根据消息类型,分别做相应的状态设置(通过nextLogon,nextHeartBeat,nextTestRequest,nextSequenceReset,nextLogout),然后调用用户call back API(在verify中)。
处理完毕该消息之后,调用nextQueued()递归处理下一个队列中的消息。
请注意verify(message)函数,所有的普通消息通过这个函数去回调application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。关于消息的解析,其中普通Message是通过quickfix.MessageUtils.parse将String类型的消息解析成Message。
next(message)被SingleThreadedEventHandlingStrategy或者ThreadPerSessionEventHandlingStrategy处理每一条收到的消息时调用,是处理收到消息的源头。
quickfix.Session.next(string):就是next(message)的重载函数,将输入的字符串解析成Message,然后调用next(message)处理。
quickfix.Session.verify(msg, checkTooHigh, checkTooLow):收到的消息的MsgSeqNum与预期的不一样时所做的状态检查和操作。操作比如,如果发现丢包,则自动重发ResendRequest请求或者仅仅记录报警日志;如果发现重复MsgSeqNum,如果消息中设置了可能重复标志,则正常返回,否则说明消息有误,则Logout,disconnect。
quickfix.Session.verify(msg):verify(msg, checkTooHigh, checkTooLow)的重载函数。根据是否是Admin类型的消息及从配置中读取的checkGapFieldOnAdminMessage确定checkTooHigh和checkTooLow这两个标志。如果不是Admin消息则checkTooXX,如果checkGapFieldOnAdminMessage为真则checkTooXX。
quickfix.Session.doTargetTooHigh(message):收到的消息MsgSeqNum比预期的大,即丢失了消息,先检查是否需要reset或者disconnect(至于是否需要reset或者disconnect由创建Session时通过构造函数的参数指定,默认均是不需要,也可以通过指定配置ResetOnError和DisconnectOnError),如果需要则reset或者disconnect后返回。如果不需要,则将这条消息入队(enqueue)存入state中,然后生成ResendRequest并发送出去,请求从expectedTargetNum开始到此消息前的所有消息。由此可见,在state的messageQueue中存储的消息都是因为发现消息序号跳跃缺失消息后,临时存储的跳跃之后新消息。而这些临时存储在state中的消息,会在收到并处理完每种消息之后都会去检查并处理,比如next(msg),nextHeartBeat(msg),nextLogon(msg),nextReject(msg),nextTestRequest(msg)这些处理过程的最后都会检查并处理state临时存储的消息。
quickfix.Session.doTargetTooLow(message):收到的消息MsgSeqNum比预期的小,即收到了重复消息,应该做的事情。
quickfix.Session.parseMessage(stringData):将输入的字符串数据解析成Message。实际是调用quickfix.parse(session, stringData)解析的。
quickfix.Session.send(String messageString):真正通过底层Mina发送消息给对方的API。在QuickFix/J中,send(msgString)通过responder发送消息,responder最终调用了Mina的IoSession.write(object)将数据写入网络中。
responder是通过quickfix.Session.setResponder(Responder responder)在quickfix.mina.initiator.InitiatorIoHandler中创建Session时设置的。关于responder的细节请参考15。
quickfix.Session.sendRaw(message, int num):
Session内部的helper方法,首先将传入的消息和该消息的MsgSeqNum拼装好,把消息头中与系统相关的参数设置好,参数比如BeginString,SenderCompID,TargetCompID,SendingTime。
然后根据消息类型回调响应的用户call back API,如果是Admin类型的消息,回调用户的toAdmin,非Admin类型的消息,回调用户的toApp。如果是LOGON消息,还要对是否重置SeqNum做相应的处理(具体处理过程是:如果消息中有ResetSeqNumFlag这个字段,并且这个字段被设置true则重置。如果重置,则首先resetState,取到期望的seqNum设置到消息头的MsgSeqNum字段中。无论是否重置,都将resetSeqNumFlag写入state中)。
最后把拼装好的message调用toString转换成字符串,然后调用send(string)方法真正的将消息发送到网络上去。
整个处理过程需要锁定SenderMsgSeqNum,直到全部操作完成返回。因为如果call back调用失败,则roll back会更高效。
sendRaw被Session中generateXXX调用,比如generateHeartbeat,generateLogon,generateLogout,generateReject,generateBusinessReject,generateResendRequest,generateSequenceReset,generateTestRequest。这些generateXXX中生成了相应消息之后,最后调用sendRaw把消息通过网络发送出去。
quickfix.Session.send(Message message):做了两件事,首先去掉消息头中的PossDupFlag和OrigSendingTime,然后调用sendRaw把消息发送到网络上去(设置seqNum为0)。主意send(message)返回的boolean并不能说明消息是否成功发送出去,仅仅是说明消息已经成功放入发送队列了,因为默认情况下QuickFix/J使用异步IO发送网络数据。
6. quickfix.mina.initiator.IoSessionInitiator:使用MINA提供的传输层的API,建立、维护同服务器之间的传输层的网络连接,而不是应用层的网络连接。这些网络功能都在一个叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的一个私有的TimerTask中实现。具体实现功能有连接(包括普通连接和加密SSL连接)、重连、判断是否应该重连、处理连接异常、启动和停止ConnectTask。
7. quickfix.SocketInitiator:使用单独的线程去为所有的Session处理消息。SocketInitiator提供的功能有:
a) 初始化,即用eventHandlingStrategy创建Initiator,然后注册此SocketInitiator所管理的全部Session,然后启动Initiator,最后调用eventHandlingStrategy.blockInThread()在另外的后台线程中去处理SessionTimer收到的插入队列的消息。启动Initiator做的事情依次是:先启动SessionTimer去监听从传输层过来的消息,如果没有Logon则先Logon,然后在收到消息后回调用户代码处理消息;启动reconnectTask去建立和维护传输层的网络连接。
b) 启动Initiator,在另外的线程中后台(Daemon)处理消息。
c) 阻塞Initiator,在同一线程中处理消息。
d) 停止Initiator。分为强制停止和非强制停止。强制或者非强制Logout所有FixSession,停止连接层的Initiator,取消注册所有此SocketInitiator所管理的全部Session。
e) 关于a) b)如何处理来自底层的消息的逻辑,请参考11和12。因为这里所谓的处理消息实际上是直接或者间接调用了SingleThreadedEventHandlingStrategy的block处理消息。
8. quickfix.ThreadedSocketInitiator:为每一个Session使用一个单独的线程去处理消息。功能参考7。除了线程工作模式不一样,功能和7完全一样。
9. quickfix.SessionState: Session和对方通信过程中使用的helper类。主要功能就是存储了Session的所有状态,并且提供了响应API访问这些状态。状态包括heartBeatInterval,heartBeatMillis,是否需要heartBeat,判断Session所在应用程序的角色(客户端Initiator or 服务器端Acceptor),lastReceivedTime,lastSentTime,获取logger,判断作为客户端的Initiator登陆消息是否已经发出,判断作为服务器端的Acceptor登陆消息是否收到,判断是否需要登陆,判断登陆是否TimeOut,messageStore,testRequestCounter,判断是否需要TestRequest,判断是否处于TimeOut状态,将收到的Message入队(enqueue),出队(dequeue),锁定和解锁发送/接受的Sequence Number,获取、设置自增的下一个Sequence Number,重置(即将Sequence清空,重新从1开始计数),设置、获取Logout的原因。
10. quickfix.mina.EventHandlingStrategy:用于不同版本FIX协议处理事件的策略的接口,是应用级处理消息回调接口的根源。当传输层消息到达时调用此接口onMessage,可以这么理解,onMessage是EventHandlingStrategy的输入,这个输入来自底层。getSessionConnector获取和这个策略相关的SessionConnector,即获取和这个Session相关的Initiator/Acceptor去处理响应的输入消息,一般情况下是逐层将消息向上层传出,回调用户的函数处理该消息。getQueueSize获取当前被处理消息队列的长度。EventHandlingStrategy一般作为AbstractSocketInitiator的成员,建立IoSessionInitiator时传给IoSessionInitiator,请参考3 b)。目前在QuickFix/J中有两个具体实现,分别是quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。这两个具体实现类的说明请参考11,12。
11. quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J处理消息的核心类。处理消息时即便有多个Session也使用单线程模式。
a) 为了不阻塞输入,那么就需要一个eventQueue来临时快速的存储收到的所有消息。
b) onMessage接到底层传入的消息包装成SessionMessageEvent,首先将其存入eventQueue。
c) 那么SessionMessageEvent里面有什么?SessionEvent仅仅是把fixSession和Message包装到一起,并且提供了处理Message的方法processMessage。可以这样理解,onMessage是事件处理策略底层Message的输入,SessionEvent中的processMessage是Message的输出,未来被应用程序级别的函数处理。
d) getSessionConnector获取需要处理应用级的connector以便处理eventQueue中的消息。
e) getMessage从eventQueue中取出SessionMessageEvent待处理。
f) block就是应用程序级别处理消息的入口。block判断HandlingMessage是否应该继续运新,如果是则从消息队列中取出SessionMessageEvent,调用其中的processMessage去处理该Message。
g) processMessage如何处理了收到的消息呢?它会调用fixSession的next方法,将消息传给Session,由fixSession再接力将消息回调到用户手中。请参考5。
h) 也许你会注意到处理消息的block在run中始终被调用,而且没有任何sleep时间,难道它在没有消息的时候始终不停的死循环运行且丝毫不休息?CPU会保持100%?实际上效果不是这样的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在没有消息的时候,这个循环会每休息一秒再执行下一次循环。如何做到这样的效果呢?原因是如果eventQueue中如果没有消息,而该eventQueue设置了阻塞超时1000毫秒,则取消息的操作会等待最多1000毫秒,如果没有等到消息则超时退出不再等待,执行完毕本次循环,如果等到了则按照正常流程处理消息。这样做最大的好处就是,如果eventQueue中有事件,那么就会连续不断的处理,如果没有消息,就会休息timeout毫秒再查看。
i) blockInThread,在新启动的后台线程中处理SessionMessageEvent
12. quickfix.mina.ThreadPerSessionEventHandlingStrategy:同样是QuickFix/J处理消息的核心类。和单线程模式不同的是该策略会为每个session启动一个新的线程去处理消息。
a) 由于是每个Session对应一个线程,因此该策略内部需要一个称之为dispatchersMap作为缓存为每个Session保存响应的处理线程(MessageDispatchingThread)引用。
b) 当onMessage收到来自底层的输入消息时,根据输入的fixSession从dispatchers中取到相应的处理线程,并将该消息加入(enqueue)到该线程内部的消息队列中待处理。
c) 每个dispatcher(MessageDispatchingThread类型)内部均维护了自己的消息队列,和单线程模式不同在于,消息队列中的消息仅仅是Message,不是SessionMessageEvent。处理Message的逻辑从单线程中的SessionMessageEvent中移出到dispatcher中。
13. quickfix.DataDictionaryProvider:是一个接口,为指定的session protocol或者application version提供数据字典。getSessionDataDictionary根据提供的beginString 即协议版本获取相应的数据字典。getApplicationDataDictionary根据提供的application version ID和custom application ID获取数据字典。application version ID在FIXT.1.1之前由BeginString字段确定。custom application ID是可选值,不是必须的。
14. quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默认实现。在DefaultDataDictionaryProvider中,有两种数据字典,一种是传输用的数据字典,一种是应用程序用的数据字典,分别缓存在两个Map中。这个DefaultDataDictionaryProvider是在创建Session时由默认的DefaultSessionFactory根据beginString创建的。addApplicationDictionary和addTransportDictionary分别用于向DefaultDataDictionaryProvider添加新的数据字典。目前QuickFix/J的实现中,在DefaultSessionFactory中初始化Session时添加字典。对于fixt之前版本的数据字典,每个数据字典会被同时添加进入到传输数据字典和应用程序数据字典中。
15. quickfix.Responder:这是个接口,供Session使用,用于发送原始FIX消息(raw FIX message)或者切断和对方的I/O连接。send(stringData)发送原始的FIX消息,disconnect()切断底层的连接,getRemoteIPAddress()提供对方的IP。目前的QuickFix/J中quickfix.mina.IoSessionResponder实现了Responder接口。IoSessionResponder.send(string)调用了Mina的IoSession.write(object)将数据写入网络中。默认是异步写入。(如果同步写入在创建responder时显示传入synchronousWrites为true。同步写入实际是write(object)之后,取到ioSession的WriteFuture,然后向WriteFuture发送join指令。)
16. quickfix.DataDictionary:为不同版本的FIX协议提供消息的元数据(metadata),提供helper方法帮助判断field类型,判断消息的类型,判断组、组件的类型,检查message、field,field等。
总共有3种方式生成DataDictionary,首先可以通过一个系统路径读入配置文件生成,或者载入inputStream生成,或者从已有的DataDictionary拷贝生成。
DataDictionary中最复杂的方法就是load,思想是根据fixXXX.xml中的定义,判断xml文件是否合法,然后根据DTD取到各个section中的信息并存入预先定义好的Map、List、Set中,便于之后的Helper方法能够快速准确的搜索和判断相关信息。
17. quickfix.CompositeLog:将日志信息输出到多个logger中。多个logger在new CompositeLog(Log[] logs)时传入。但是多个logger没有优先级,也不能控制多个logger,仅仅是日志来了,同时写入这多个logger中。CompositeLog由CompositeLogFactory创建。
QuickFix/J中其他的logger还有:
quickfix.ScreenLog,将日志通过System.out打印到屏幕,相应的有quickfix.ScreenLogFactory。
quickfix.FileLog,将日志写入本地文件中。相应的有quickfix.FileLogFactory。
quickfix.JdbcLog,将日志写入数据库中。相应的有quickfix.JdbcLogFactory。
quickfix.SLF4JLog,使用SLF4J wrapper写日志,SLF4J支持JDK logging,log4j等。
另外多数Log的实现都extends了quickfix.AbstractLog。AbstractLog的作用是添加了对HeartBeat是否记录日志的开关。
B). 网络数据在QuickFix/J中的流向
ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服务器端的通信。收到网络数据,ioConnector触发相应事件,并把事件交给ioHandler(InitiatorIoHandler)的processMessage -> processMessage中调用eventHandlingStrategy.onMessage(quickfixSession, message) ,将消息向外回调 -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 将收到的消息入队(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中启动单独的后台线程,依次从eventQueue取出消息处理,向session回调 -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根据msgType判断回调 ->逐层回调 (verify -> veriry -> fromCallback -> fromAdmin/fromApp),从fromAdmin/fromApp(msg, sessionID)回调用户处理逻辑。
Guibin
2010-12-30
评论
6 楼
JavaLuSir
2014-07-07
好文章,赞一个
5 楼
hsbcnet
2012-02-18
我也只是用过quickfix/j写过一些应用,没看过源代码。收藏!
4 楼
tryto21
2011-11-18
在做股指的基金期货接口,正好用quickfix/j,感谢。
3 楼
dickwin
2011-09-27
只是从技术上来说吧
如果券商的前台和fix协议还是有一部分区别的
如果券商的前台和fix协议还是有一部分区别的
2 楼
guibin
2010-12-28
mercyblitz 写道
能不能介绍一下!
等我把核心内容整理完毕之后会及时补充周边的介绍性内容和背景知识的。
1 楼
mercyblitz
2010-12-27
能不能介绍一下!
相关推荐
QuickFIX,全称为"QuickFIX/J",是一个开源的、跨平台的金融信息交换(FIX)协议实现,专为高效、可靠地处理金融交易而设计。FIX协议是金融业广泛采用的标准,用于在金融机构之间传递订单、报价和其他市场数据。...
QuickFIX/j是QuickFIX家族的一员,是一个开源、跨平台的Java实现,旨在简化FIX消息的处理。本教程将通过一个简单的Java示例介绍如何使用QuickFIX/j来理解和应用FIX协议。 首先,我们需要理解FIX协议的基础。FIX协议...
本文将深入探讨C# QuickFix的使用、例子和源代码分析。 首先,FIX协议是金融机构之间通信的标准,用于传输订单、报价、交易确认等各种金融信息。QuickFixN库使得C#开发者能够轻松地解析、创建和发送这些FIX消息,...
QuickFIX /转到 网站: : 邮件列表: Go中实现的开源库 入门和文档 安装 要安装QuickFIX / Go,请使用go get : $ go get github.com/quickfixgo/quickfix 保持最新 要更新QuickFIX / Go到最新版本,请使用go ...
QuickFIX / J 这是官方的QuickFIX / J项目存储库。介绍QuickFIX / J是用于FIX协议(FIX版本4.0-5.0SP2 / FIXT1.1)的全功能消息传递引擎。 它是流行的C ++ QuickFIX引擎的100%Java开源实现。 金融信息交换(FIX)...
QuickFIX / J是流行的QuickFIX开源FIX协议引擎的100%Java实现。 QuickFIX / J功能包括对FIX协议版本4.0至4.4和5.0 / FIXT1.1(www.fixprotocol.org)的支持。 请注意,SourceForge SVN存储库是只读的。 当前仓库...
1. 源代码文件:展示了 QuickFix 的内部实现,包括消息解析、网络通信和事件处理等功能。 2. 示例代码:提供了使用 QuickFix 进行FIX消息交互的基本示例,可以帮助初学者快速上手。 3. 配置文件:通常为 `.ini` 或 `...
QuickFIX/J 2.2.0 是一个针对金融行业的开源消息引擎,专门处理基于FIX(Financial Information eXchange)协议的数据交换。FIX协议是全球金融市场广泛采用的标准,用于在金融机构之间传输交易和市场数据。QuickFIX/...
QuickFix,全称为"Financial Information eXchange"(金融信息交换),是一个开源的、跨平台的C++库,专门用于处理FIX(金融信息交换协议)消息。FIX协议是金融行业中广泛使用的标准通信协议,用于在金融机构之间...
*数据收集选项,包括每日历史数据,日内数据和实时数据* MATLAB 中的模型构建和原型设计* 回测和校准模型*与现有的库和软件进行交互以执行交易(X_Trader,QuickFIX / J,消息队列) 建议您观看网络研讨会以了解...
受启发,一个应该测试我们以前的quickfix安装。 从根本上扩展了原始示例SSL证书上的vanilla示例。 检查证书 重用先前的代码,此子任务旨在并与它们运行简单的连接。 考虑到本小节非常取决于您的具体操作系统。 检测...
在"quickfix"这个压缩包文件中,可能包含了QuickFix的源代码、文档、示例项目以及配置文件等资源。通过这些资源,开发者可以学习QuickFix的工作原理,理解其API使用方法,并将QuickFix应用到实际的交易系统中。对于...
`.whl`文件是Python的Wheel格式,是一种预编译的Python包,可以直接通过pip安装,无需编译源代码,这简化了安装过程。 使用QuickFix Python库,开发者可以方便地集成FIX通信功能到他们的应用程序中,无论是创建订单...
在"quickfix-1.13.3.rar"文件中,我们可以找到Quick Fix框架的1.13.3版本源代码或库。这可能包含了用于集成到你项目的API,以及用于调试和配置的文档。开发者可以通过这个库来构建自己的FIX应用程序,如订单管理系统...
quickfix, 在Go中,修复协议库 quickfix 网站:http://www.quickfixgo.org邮件列表:GoogleGo中实现的开源修复协议库入门和文档用户手册API文档安装要安装 quickfix/围棋,请使用 go g
在“Lenovo Quick Fix 联想快速修复工具打包.zip”压缩包中,包含了一个名为“Lenovo Quick Fix 联想快速修复工具打包.txt”的文件,这个文件很可能是该工具的使用指南或者下载链接信息。用户可以通过解压并查看此...
FIXimulator是一款基于Java开发的FIX(Financial Information eXchange)交易模拟器,它利用了开源的QuickFIX/J库,这是FIX协议的一个强大的Java实现。FIX协议是一种广泛用于金融市场的通信协议,用于在金融机构之间...
通过对这些源代码的研究,我们可以学习到如何在实际项目中运用Qt和C++,理解其设计思想,提升我们的编程能力。同时,对于初学者来说,这样的个人记录也是一份宝贵的参考资料,能帮助他们更好地理解和掌握Qt框架的...