public class ThriftExporter extends RemoteExporter implements InitializingBean { private static final Logger LOGGER = LoggerFactory.getLogger(ThriftExporter.class); public static final String CONTENT_TYPE_HESSIAN = "application/x-thrift"; protected TProcessorFactory processorFactory_; protected TTransportFactory inputTransportFactory_ = new TTransportFactory(); protected TTransportFactory outputTransportFactory_ = new TTransportFactory(); protected TProtocolFactory inputProtocolFactory_; protected TProtocolFactory outputProtocolFactory_; protected TServerEventHandler eventHandler_; protected Class<?> processorClass; @Override public void afterPropertiesSet() throws Exception { // LocationThrfitTestService.Processor<LocationThrfitTestService.Iface> processor = new LocationThrfitTestService.Processor<LocationThrfitTestService.Iface>(new LocationThrfitTestServiceImpl()); // LocationThrfitTestService.Processor<LocationThrfitTestService.Iface> processor = new LocationThrfitTestService.Processor<LocationThrfitTestService.Iface>((LocationThrfitTestService.Iface) getProxyForService()); Object service = getService(); Class<?> serviceInterface = getServiceInterface(); Constructor<?> constructor = processorClass.getConstructor(serviceInterface); TProcessor processor = (TProcessor) constructor.newInstance(getProxyForService()); processorFactory_ = new TProcessorFactory(processor); TBinaryProtocol.Factory portFactory = new TBinaryProtocol.Factory(true, true); inputProtocolFactory_ = portFactory; outputProtocolFactory_ = portFactory; eventHandler_ = null; } public void invoke(InputStream inputStream, OutputStream outputStream) throws Throwable { //Assert.notNull(this.skeleton, "Thrift exporter has not been initialized"); //doInvoke(this.skeleton, inputStream, outputStream); TIOStreamTransport client_ = new TIOStreamTransport(inputStream, outputStream); TProcessor processor = null; TTransport inputTransport = null; TTransport outputTransport = null; TProtocol inputProtocol = null; TProtocol outputProtocol = null; TServerEventHandler eventHandler = null; ServerContext connectionContext = null; try { processor = processorFactory_.getProcessor(client_); inputTransport = inputTransportFactory_.getTransport(client_); outputTransport = outputTransportFactory_.getTransport(client_); inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); eventHandler = getEventHandler(); if (eventHandler != null) { connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); } // we check stopped_ first to make sure we're not supposed to be shutting // down. this is necessary for graceful shutdown. // while (true) { // // if (eventHandler != null) { // eventHandler.processContext(connectionContext, inputTransport, outputTransport); // } // // if(stopped_ || !processor.process(inputProtocol, outputProtocol)) { // break; // } // } if (eventHandler != null) { eventHandler.processContext(connectionContext, inputTransport, outputTransport); } processor.process(inputProtocol, outputProtocol); } catch (TSaslTransportException ttx) { // Something thats not SASL was in the stream, continue silently } catch (TTransportException ttx) { // Assume the client died and continue silently } catch (TException tx) { LOGGER.error("Thrift error occurred during processing of message.", tx); } catch (Exception x) { LOGGER.error("Error occurred during processing of message.", x); } finally { if (eventHandler != null) { eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); } if (inputTransport != null) { inputTransport.close(); } if (outputTransport != null) { outputTransport.close(); } if (client_.isOpen()) { client_.close(); } } } public TServerEventHandler getEventHandler() { return eventHandler_; } public void setProcessorClass(Class<?> processorClass) { this.processorClass = processorClass; } }