- 浏览: 185823 次
- 性别:
最新评论
-
lshhjxlj:
楼主辛苦了,感谢分享!
VB全面控制Excel方法大汇总 -
lshhjxlj:
感谢楼主分享,谢谢啦!
VB控制Word文档实例精选二 -
lshhjxlj:
整理的不错,赞一个!
VB控制Word文档实例精选一 -
pikaqiu_2013:
[color=orange][/color]
页面使用struts2标签获取List中的对象属性值,Struts2常用标签总结
HttpCore组件案例程序(Java描述) (Http Components- HttpCore Examples)
转:http://www.myexception.cn/program/586831.html
HttpCore组件案例程序(Java描述) (Http Components- HttpCore Examples)
HttpCore组件案例程序(Java描述) (Http Components-- HttpCore Examples)
一、基本概念
HttpCore是一套实现了HTTP协议最基础方面的组件,尽管HTTP协议在使用最小占用来开发全功能的客户端和服务器的HTTP服务是足够的。
HttpCore有如下的范围和目标(@Author:
南磊):
1. HttpCore范围
■构建客户端/代理/服务器端HTTP服务一致的API
■构建同步和异步HTTP服务一致的API
■基于阻塞(经典的)和非阻塞(NIO)I/O模型的一套低等级组件
2. HttpCore目标
■实现最基本的HTTP传输方面
■良好性能和清晰度&表现力之间的平衡
■小的(预测)内存占用
■自我包含的类库(没有超越JRE的额外依赖)
3. 什么是HttpCore不能做的
■HttpClient的替代
■Servlet容器或Servlet API竞争对手的替代
二、源代码详解
1.基本的Http获取程序(基于同步或加锁的I/O模型)
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples;
import java.net.Socket;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpClientConnection;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestExecutor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
/**
* Elemental example for executing multiple GET requests sequentially.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP client.
*/
public class ElementalHttpGet {
public static void main(String[] args) throws Exception {
HttpParams params = new SyncBasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "HttpComponents/1.1");
HttpProtocolParams.setUseExpectContinue(params, true);
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Required protocol interceptors
new RequestContent(),
new RequestTargetHost(),
// Recommended protocol interceptors
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
HttpRequestExecutor httpexecutor = new HttpRequestExecutor();
HttpContext context = new BasicHttpContext(null);
HttpHost host = new HttpHost("localhost", 8080);
DefaultHttpClientConnection conn = new DefaultHttpClientConnection();
ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);
try {
String[] targets = {
"/",
"/servlets-examples/servlet/RequestInfoExample",
"/somewhere%20in%20pampa"};
for (int i = 0; i < targets.length; i++) {
if (!conn.isOpen()) {
Socket socket = new Socket(host.getHostName(), host.getPort());
conn.bind(socket, params);
}
BasicHttpRequest request = new BasicHttpRequest("GET", targets[i]);
System.out.println(">> Request URI: " + request.getRequestLine().getUri());
request.setParams(params);
httpexecutor.preProcess(request, httpproc, context);
HttpResponse response = httpexecutor.execute(request, conn, context);
response.setParams(params);
httpexecutor.postProcess(response, httpproc, context);
System.out.println("<< Response: " + response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
System.out.println("==============");
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
System.out.println("Connection kept alive...");
}
}
} finally {
conn.close();
}
}
}
2.基本的Http发送程序(基于同步或加锁的I/O模型)
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples;
import java.io.ByteArrayInputStream;
import java.net.Socket;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpClientConnection;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestExecutor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
/**
* Elemental example for executing multiple POST requests sequentially.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP client.
*/
public class ElementalHttpPost {
public static void main(String[] args) throws Exception {
HttpParams params = new SyncBasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "Test/1.1");
HttpProtocolParams.setUseExpectContinue(params, true);
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Required protocol interceptors
new RequestContent(),
new RequestTargetHost(),
// Recommended protocol interceptors
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
HttpRequestExecutor httpexecutor = new HttpRequestExecutor();
HttpContext context = new BasicHttpContext(null);
HttpHost host = new HttpHost("localhost", 8080);
DefaultHttpClientConnection conn = new DefaultHttpClientConnection();
ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);
try {
HttpEntity[] requestBodies = {
new StringEntity(
"This is the first test request", "UTF-8"),
new ByteArrayEntity(
"This is the second test request".getBytes("UTF-8")),
new InputStreamEntity(
new ByteArrayInputStream(
"This is the third test request (will be chunked)"
.getBytes("UTF-8")), -1)
};
for (int i = 0; i < requestBodies.length; i++) {
if (!conn.isOpen()) {
Socket socket = new Socket(host.getHostName(), host.getPort());
conn.bind(socket, params);
}
BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST",
"/servlets-examples/servlet/RequestInfoExample");
request.setEntity(requestBodies[i]);
System.out.println(">> Request URI: " + request.getRequestLine().getUri());
request.setParams(params);
httpexecutor.preProcess(request, httpproc, context);
HttpResponse response = httpexecutor.execute(request, conn, context);
response.setParams(params);
httpexecutor.postProcess(response, httpproc, context);
System.out.println("<< Response: " + response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
System.out.println("==============");
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
System.out.println("Connection kept alive...");
}
}
} finally {
conn.close();
}
}
}
3.基本的Http服务器程序(基于同步或加锁的I/O模型)
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.Locale;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.impl.DefaultHttpServerConnection;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpRequestHandlerRegistry;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.util.EntityUtils;
/**
* Basic, yet fully functional and spec compliant, HTTP/1.1 file server.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP file server.
*
*
*/
public class ElementalHttpServer {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Please specify document root directory");
System.exit(1);
}
Thread t = new RequestListenerThread(8080, args[0]);
t.setDaemon(false);
t.start();
}
static class HttpFileHandler implements HttpRequestHandler {
private final String docRoot;
public HttpFileHandler(final String docRoot) {
super();
this.docRoot = docRoot;
}
public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();
if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
byte[] entityContent = EntityUtils.toByteArray(entity);
System.out.println("Incoming entity content (bytes): " + entityContent.length);
}
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {
response.setStatusCode(HttpStatus.SC_NOT_FOUND);
StringEntity entity = new StringEntity(
"<html><body><h1>File" + file.getPath() +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");
} else if (!file.canRead() || file.isDirectory()) {
response.setStatusCode(HttpStatus.SC_FORBIDDEN);
StringEntity entity = new StringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
} else {
response.setStatusCode(HttpStatus.SC_OK);
FileEntity body = new FileEntity(file, ContentType.create("text/html", (Charset) null));
response.setEntity(body);
System.out.println("Serving file " + file.getPath());
}
}
}
static class RequestListenerThread extends Thread {
private final ServerSocket serversocket;
private final HttpParams params;
private final HttpService httpService;
public RequestListenerThread(int port, final String docroot) throws IOException {
this.serversocket = new ServerSocket(port);
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpComponents/1.1");
// Set up the HTTP protocol processor
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Set up request handlers
HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry();
reqistry.register("*", new HttpFileHandler(docroot));
// Set up the HTTP service
this.httpService = new HttpService(
httpproc,
new DefaultConnectionReuseStrategy(),
new DefaultHttpResponseFactory(),
reqistry,
this.params);
}
@Override
public void run() {
System.out.println("Listening on port " + this.serversocket.getLocalPort());
while (!Thread.interrupted()) {
try {
// Set up HTTP connection
Socket socket = this.serversocket.accept();
DefaultHttpServerConnection conn = new DefaultHttpServerConnection();
System.out.println("Incoming connection from " + socket.getInetAddress());
conn.bind(socket, this.params);
// Start worker thread
Thread t = new WorkerThread(this.httpService, conn);
t.setDaemon(true);
t.start();
} catch (InterruptedIOException ex) {
break;
} catch (IOException e) {
System.err.println("I/O error initialising connection thread: "
+ e.getMessage());
break;
}
}
}
}
static class WorkerThread extends Thread {
private final HttpService httpservice;
private final HttpServerConnection conn;
public WorkerThread(
final HttpService httpservice,
final HttpServerConnection conn) {
super();
this.httpservice = httpservice;
this.conn = conn;
}
@Override
public void run() {
System.out.println("New connection thread");
HttpContext context = new BasicHttpContext(null);
try {
while (!Thread.interrupted() && this.conn.isOpen()) {
this.httpservice.handleRequest(this.conn, context);
}
} catch (ConnectionClosedException ex) {
System.err.println("Client closed connection");
} catch (IOException ex) {
System.err.println("I/O error: " + ex.getMessage());
} catch (HttpException ex) {
System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());
} finally {
try {
this.conn.shutdown();
} catch (IOException ignore) {}
}
}
}
}
4.异步的Http获取程序(基于非阻塞I/O模型:NIO框架)
HttpCore NIO是基于由Doug Lea定义描述的反应器模式的。I/O反应器的目标是反应I/O事件然后分发事件通知到独立的I/O会话中。I/O反应器模式的主要思想是从每次连接模型 摆脱一个由经典阻塞I/O模型强加的线程。IOReactor接口代表了反应器模式抽象的对象实现。在其内部,IOReactor实现封装了了NIO java.nio.channels.Selector的功能。
I/O反应器通常使用一小部分分发线程(通常是一个)来分发I/O事件通知到一个很大数量(通常是好几千)的I/O会话或连接。通常建议每个CPU核心只有一个分发线程。
II.I/O分发器
IOReactor实现使用了IOEventDispatch接口来通知事件客户端为特定的会话挂起。IOEventDispatch的所有方法在I/O 反应器的分发线程上执行。因此,在事件方法上的处理不会阻塞分发线程很长时间,这一点是很重要的,而且I/O反应器也不可能对其它事件做出反应。
由IOEventDispatch接口定义的通用I/O事件:
- connected:当一个新的会话创建时触发。
- inputReady:当新的会话等待输入时触发。
- outputReady:当会话准备输出时触发。
- timeout:当会话超时时触发。
- disconnected:当会话终止时触发。
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
/**
* Asynchronous HTTP/1.1 client.
*/
public class NHttpClient {
public static void main(String[] args) throws Exception {
// HTTP parameters for the client
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Use standard client-side protocol interceptors
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
// Create client-side HTTP protocol handler
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
// Create client-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
// Create client-side I/O reactor
IOReactorConfig config = new IOReactorConfig();
config.setIoThreadCount(1);
final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
// Create HTTP connection pool
BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, params);
// Limit total number of connections to just two
pool.setDefaultMaxPerRoute(2);
pool.setMaxTotal(2);
// Run the I/O reactor in a separate thread
Thread t = new Thread(new Runnable() {
public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
});
// Start the client thread
t.start();
// Create HTTP requester
HttpAsyncRequester requester = new HttpAsyncRequester(
httpproc, new DefaultConnectionReuseStrategy(), params);
// Execute HTTP GETs to the following hosts and
HttpHost[] targets = new HttpHost[] {
new HttpHost("www.apache.org", 80, "http"),
new HttpHost("www.verisign.com", 443, "https"),
new HttpHost("www.google.com", 80, "http")
};
final CountDownLatch latch = new CountDownLatch(targets.length);
for (final HttpHost target: targets) {
BasicHttpRequest request = new BasicHttpRequest("GET", "/");
requester.execute(
new BasicAsyncRequestProducer(target, request),
new BasicAsyncResponseConsumer(),
pool,
new BasicHttpContext(),
// Handle HTTP response from a callback
new FutureCallback<HttpResponse>() {
public void completed(final HttpResponse response) {
latch.countDown();
System.out.println(target + "->" + response.getStatusLine());
}
public void failed(final Exception ex) {
latch.countDown();
System.out.println(target + "->" + ex);
}
public void cancelled() {
latch.countDown();
System.out.println(target + " cancelled");
}
});
}
latch.await();
System.out.println("Shutting down I/O reactor");
ioReactor.shutdown();
System.out.println("Done");
}
}
5.异步的Http服务器程序(基于非阻塞I/O模型:NIO框架)
This example demonstrates the use of HttpCore NIO to build an asynchronous (non-blocking) HTTP server capable of direct channel
(zero copy) data transfer.
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
/**
* HTTP/1.1 file server based on the non-blocking I/O model and capable of direct channel
* (zero copy) data transfer.
*/
public class NHttpServer {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Please specify document root directory");
System.exit(1);
}
// Document root directory
File docRoot = new File(args[0]);
int port = 8080;
if (args.length >= 2) {
port = Integer.parseInt(args[1]);
}
// HTTP parameters for the server
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
// Use standard server-side protocol interceptors
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Create request handler registry
HttpAsyncRequestHandlerRegistry reqistry = new HttpAsyncRequestHandlerRegistry();
// Register the default handler for all URIs
reqistry.register("*", new HttpFileHandler(docRoot));
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(
httpproc, new DefaultConnectionReuseStrategy(), reqistry, params) {
@Override
public void connected(final NHttpServerConnection conn) {
System.out.println(conn + ": connection open");
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
System.out.println(conn + ": connection closed");
super.closed(conn);
}
};
// Create HTTP connection factory
NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
if (port == 8443) {
// Initialize SSL context
ClassLoader cl = NHttpServer.class.getClassLoader();
URL url = cl.getResource("my.keystore");
if (url == null) {
System.out.println("Keystore not found");
System.exit(1);
}
KeyStore keystore = KeyStore.getInstance("jks");
keystore.load(url.openStream(), "secret".toCharArray());
KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmfactory.init(keystore, "secret".toCharArray());
KeyManager[] keymanagers = kmfactory.getKeyManagers();
SSLContext sslcontext = SSLContext.getInstance("TLS");
sslcontext.init(keymanagers, null, null);
connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
} else {
connFactory = new DefaultNHttpServerConnectionFactory(params);
}
// Create server-side I/O event dispatch
IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Create server-side I/O reactor
ListeningIOReactor ioReactor = new DefaultListeningIOReactor();
try {
// Listen of the given port
ioReactor.listen(new InetSocketAddress(port));
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> {
private final File docRoot;
public HttpFileHandler(final File docRoot) {
super();
this.docRoot = docRoot;
}
public HttpAsyncRequestConsumer<HttpRequest> processRequest(
final HttpRequest request,
final HttpContext context) {
// Buffer request content in memory for simplicity
return new BasicAsyncRequestConsumer();
}
public void handle(
final HttpRequest request,
final HttpAsyncExchange httpexchange,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = httpexchange.getResponse();
handleInternal(request, response, context);
httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
}
private void handleInternal(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {
response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"<html><body><h1>File" + file.getPath() +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");
} else if (!file.canRead() || file.isDirectory()) {
response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
} else {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
response.setStatusCode(HttpStatus.SC_OK);
NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
response.setEntity(body);
System.out.println(conn + ": serving file " + file.getPath());
}
}
}
}
6.异步的Http逆向代理程序(基于非阻塞I/O模型:NIO框架)
This example demonstrates how HttpCore NIO can be used to build an asynchronous, fully streaming reverse HTTP proxy.
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.EnglishReasonPhraseCatalog;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.pool.NIOConnFactory;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.nio.protocol.HttpAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
/**
* Asynchronous, fully streaming HTTP/1.1 reverse proxy.
*/
public class NHttpReverseProxy {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.println("Usage: NHttpReverseProxy <hostname> [port]");
System.exit(1);
}
URI uri = new URI(args[0]);
int port = 8080;
if (args.length > 1) {
port = Integer.parseInt(args[1]);
}
// Target host
HttpHost targetHost = new HttpHost(
uri.getHost(),
uri.getPort() > 0 ? uri.getPort() : 80,
uri.getScheme() != null ? uri.getScheme() : "http");
System.out.println("Reverse proxy to " + targetHost);
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1")
.setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");
IOReactorConfig config = new IOReactorConfig();
config.setIoThreadCount(1);
final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config);
final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config);
// Set up HTTP protocol processor for incoming connections
HttpProcessor inhttpproc = new ImmutableHttpProcessor(
new HttpResponseInterceptor[] {
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Set up HTTP protocol processor for outgoing connections
HttpProcessor outhttpproc = new ImmutableHttpProcessor(
new HttpRequestInterceptor[] {
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()
});
ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();
HttpAsyncRequester executor = new HttpAsyncRequester(
outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params);
ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, params);
connPool.setMaxTotal(100);
connPool.setDefaultMaxPerRoute(20);
HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry();
handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));
ProxyServiceHandler serviceHandler = new ProxyServiceHandler(
inhttpproc, new ProxyIncomingConnectionReuseStrategy(), handlerRegistry, params);
final IOEventDispatch connectingEventDispatch = new DefaultHttpClientIODispatch(
clientHandler, params);
final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch(
serviceHandler, params);
Thread t = new Thread(new Runnable() {
public void run() {
try {
connectingIOReactor.execute(connectingEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
listeningIOReactor.shutdown();
} catch (IOException ex2) {
ex2.printStackTrace();
}
}
}
});
t.start();
try {
listeningIOReactor.listen(new InetSocketAddress(port));
listeningIOReactor.execute(listeningEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
connectingIOReactor.shutdown();
} catch (IOException ex2) {
ex2.printStackTrace();
}
}
}
static class ProxyHttpExchange {
private final ByteBuffer inBuffer;
private final ByteBuffer outBuffer;
private volatile String id;
private volatile HttpHost target;
private volatile HttpAsyncExchange responseTrigger;
private volatile IOControl originIOControl;
private volatile IOControl clientIOControl;
private volatile HttpRequest request;
private volatile boolean requestReceived;
private volatile HttpResponse response;
private volatile boolean responseReceived;
private volatile Exception ex;
public ProxyHttpExchange() {
super();
this.inBuffer = ByteBuffer.allocateDirect(10240);
this.outBuffer = ByteBuffer.allocateDirect(10240);
}
public ByteBuffer getInBuffer() {
return this.inBuffer;
}
public ByteBuffer getOutBuffer() {
return this.outBuffer;
}
public String getId() {
return this.id;
}
public void setId(final String id) {
this.id = id;
}
public HttpHost getTarget() {
return this.target;
}
public void setTarget(final HttpHost target) {
this.target = target;
}
public HttpRequest getRequest() {
return this.request;
}
public void setRequest(final HttpRequest request) {
this.request = request;
}
public HttpResponse getResponse() {
return this.response;
}
public void setResponse(final HttpResponse response) {
this.response = response;
}
public HttpAsyncExchange getResponseTrigger() {
return this.responseTrigger;
}
public void setResponseTrigger(final HttpAsyncExchange responseTrigger) {
this.responseTrigger = responseTrigger;
}
public IOControl getClientIOControl() {
return this.clientIOControl;
}
public void setClientIOControl(final IOControl clientIOControl) {
this.clientIOControl = clientIOControl;
}
public IOControl getOriginIOControl() {
return this.originIOControl;
}
public void setOriginIOControl(final IOControl originIOControl) {
this.originIOControl = originIOControl;
}
public boolean isRequestReceived() {
return this.requestReceived;
}
public void setRequestReceived() {
this.requestReceived = true;
}
public boolean isResponseReceived() {
return this.responseReceived;
}
public void setResponseReceived() {
this.responseReceived = true;
}
public Exception getException() {
return this.ex;
}
public void setException(final Exception ex) {
this.ex = ex;
}
public void reset() {
this.inBuffer.clear();
this.outBuffer.clear();
this.target = null;
this.id = null;
this.responseTrigger = null;
this.clientIOControl = null;
this.originIOControl = null;
this.request = null;
this.requestReceived = false;
this.response = null;
this.responseReceived = false;
this.ex = null;
}
}
static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> {
private final HttpHost target;
private final HttpAsyncRequester executor;
private final BasicNIOConnPool connPool;
private final AtomicLong counter;
public ProxyRequestHandler(
final HttpHost target,
final HttpAsyncRequester executor,
final BasicNIOConnPool connPool) {
super();
this.target = target;
this.executor = executor;
this.connPool = connPool;
this.counter = new AtomicLong(1);
}
public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest(
final HttpRequest request,
final HttpContext context) {
ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange");
if (httpExchange == null) {
httpExchange = new ProxyHttpExchange();
context.setAttribute("http-exchange", httpExchange);
}
synchronized (httpExchange) {
httpExchange.reset();
String id = String.format("%08X", this.counter.getAndIncrement());
httpExchange.setId(id);
httpExchange.setTarget(this.target);
return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool);
}
}
public void handle(
final ProxyHttpExchange httpExchange,
final HttpAsyncExchange responseTrigger,
final HttpContext context) throws HttpException, IOException {
synchronized (httpExchange) {
Exception ex = httpExchange.getException();
if (ex != null) {
System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex);
int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
String message = ex.getMessage();
if (message == null) {
message = "Unexpected error";
}
response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));
responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered");
}
HttpResponse response = httpExchange.getResponse();
if (response != null) {
responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange));
System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered");
}
// No response yet.
httpExchange.setResponseTrigger(responseTrigger);
}
}
}
static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> {
private final ProxyHttpExchange httpExchange;
private final HttpAsyncRequester executor;
private final BasicNIOConnPool connPool;
private volatile boolean completed;
public ProxyRequestConsumer(
final ProxyHttpExchange httpExchange,
final HttpAsyncRequester executor,
final BasicNIOConnPool connPool) {
super();
this.httpExchange = httpExchange;
this.executor = executor;
this.connPool = connPool;
}
public void close() throws IOException {
}
public void requestReceived(final HttpRequest request) {
synchronized (this.httpExchange) {
System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine());
this.httpExchange.setRequest(request);
this.executor.execute(
new ProxyRequestProducer(this.httpExchange),
new ProxyResponseConsumer(this.httpExchange),
this.connPool);
}
}
public void consumeContent(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setClientIOControl(ioctrl);
// Receive data from the client
ByteBuffer buf = this.httpExchange.getInBuffer();
int n = decoder.read(buf);
System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read");
if (decoder.isCompleted()) {
System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read");
}
// If the buffer is full, suspend client input until there is free
// space in the buffer
if (!buf.hasRemaining()) {
ioctrl.suspendInput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input");
}
// If there is some content in the input buffer make sure origin
// output is active
if (buf.position() > 0) {
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestOutput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");
}
}
}
}
public void requestCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
this.completed = true;;
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed");
this.httpExchange.setRequestReceived();
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestOutput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");
}
}
}
public Exception getException() {
return null;
}
public ProxyHttpExchange getResult() {
return this.httpExchange;
}
public boolean isDone() {
return this.completed;
}
public void failed(final Exception ex) {
System.out.println("[client->proxy] " + ex.toString());
}
}
static class ProxyRequestProducer implements HttpAsyncRequestProducer {
private final ProxyHttpExchange httpExchange;
public ProxyRequestProducer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}
public void close() throws IOException {
}
public HttpHost getTarget() {
synchronized (this.httpExchange) {
return this.httpExchange.getTarget();
}
}
public HttpRequest generateRequest() {
synchronized (this.httpExchange) {
HttpRequest request = this.httpExchange.getRequest();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine());
// Rewrite request!!!!
if (request instanceof HttpEntityEnclosingRequest) {
BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest(
request.getRequestLine());
r.setEntity(((HttpEntityEnclosingRequest) request).getEntity());
return r;
} else {
return new BasicHttpRequest(request.getRequestLine());
}
}
}
public void produceContent(
final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setOriginIOControl(ioctrl);
// Send data to the origin server
ByteBuffer buf = this.httpExchange.getInBuffer();
buf.flip();
int n = encoder.write(buf);
buf.compact();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written");
// If there is space in the buffer and the message has not been
// transferred, make sure the client is sending more data
if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) {
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestInput();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input");
}
}
if (buf.position() == 0) {
if (this.httpExchange.isRequestReceived()) {
encoder.complete();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written");
} else {
// Input buffer is empty. Wait until the client fills up
// the buffer
ioctrl.suspendOutput();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output");
}
}
}
}
public void requestCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed");
}
}
public boolean isRepeatable() {
return false;
}
public void resetRequest() {
}
public void failed(final Exception ex) {
System.out.println("[proxy->origin] " + ex.toString());
}
}
static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> {
private final ProxyHttpExchange httpExchange;
private volatile boolean completed;
public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}
public void close() throws IOException {
}
public void responseReceived(final HttpResponse response) {
synchronized (this.httpExchange) {
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine());
this.httpExchange.setResponse(response);
HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();
if (responseTrigger != null && !responseTrigger.isCompleted()) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered");
responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange));
}
}
}
public void consumeContent(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setOriginIOControl(ioctrl);
// Receive data from the origin
ByteBuffer buf = this.httpExchange.getOutBuffer();
int n = decoder.read(buf);
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read");
if (decoder.isCompleted()) {
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read");
}
// If the buffer is full, suspend origin input until there is free
// space in the buffer
if (!buf.hasRemaining()) {
ioctrl.suspendInput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input");
}
// If there is some content in the input buffer make sure client
// output is active
if (buf.position() > 0) {
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestOutput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");
}
}
}
}
public void responseCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
if (this.completed) {
return;
}
this.completed = true;
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed");
this.httpExchange.setResponseReceived();
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestOutput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");
}
}
}
public void failed(final Exception ex) {
synchronized (this.httpExchange) {
if (this.completed) {
return;
}
this.completed = true;
this.httpExchange.setException(ex);
HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();
if (responseTrigger != null && !responseTrigger.isCompleted()) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex);
int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
String message = ex.getMessage();
if (message == null) {
message = "Unexpected error";
}
response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));
responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
}
}
}
public boolean cancel() {
synchronized (this.httpExchange) {
if (this.completed) {
return false;
}
failed(new InterruptedIOException("Cancelled"));
return true;
}
}
public ProxyHttpExchange getResult() {
return this.httpExchange;
}
public Exception getException() {
return null;
}
public boolean isDone() {
return this.completed;
}
}
static class ProxyResponseProducer implements HttpAsyncResponseProducer {
private final ProxyHttpExchange httpExchange;
public ProxyResponseProducer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}
public void close() throws IOException {
this.httpExchange.reset();
}
public HttpResponse generateResponse() {
synchronized (this.httpExchange) {
HttpResponse response = this.httpExchange.getResponse();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine());
// Rewrite response!!!!
BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine());
r.setEntity(response.getEntity());
return r;
}
}
public void produceContent(
final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setClientIOControl(ioctrl);
// Send data to the client
ByteBuffer buf = this.httpExchange.getOutBuffer();
buf.flip();
int n = encoder.write(buf);
buf.compact();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written");
// If there is space in the buffer and the message has not been
// transferred, make sure the origin is sending more data
if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) {
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestInput();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input");
}
}
if (buf.position() == 0) {
if (this.httpExchange.isResponseReceived()) {
encoder.complete();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written");
} else {
// Input buffer is empty. Wait until the origin fills up
// the buffer
ioctrl.suspendOutput();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output");
}
}
}
}
public void responseCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed");
}
}
public void failed(final Exception ex) {
System.out.println("[client<-proxy] " + ex.toString());
}
}
static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
@Override
public boolean keepAlive(final HttpResponse response, final HttpContext context) {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
boolean keepAlive = super.keepAlive(response, context);
if (keepAlive) {
System.out.println("[client->proxy] connection kept alive " + conn);
}
return keepAlive;
}
};
static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {
@Override
public boolean keepAlive(final HttpResponse response, final HttpContext context) {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
boolean keepAlive = super.keepAlive(response, context);
if (keepAlive) {
System.out.println("[proxy->origin] connection kept alive " + conn);
}
return keepAlive;
}
};
static class ProxyServiceHandler extends HttpAsyncService {
public ProxyServiceHandler(
final HttpProcessor httpProcessor,
final ConnectionReuseStrategy reuseStrategy,
final HttpAsyncRequestHandlerResolver handlerResolver,
final HttpParams params) {
super(httpProcessor, reuseStrategy, handlerResolver, params);
}
@Override
protected void log(final Exception ex) {
ex.printStackTrace();
}
@Override
public void connected(final NHttpServerConnection conn) {
System.out.println("[client->proxy] connection open " + conn);
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
System.out.println("[client->proxy] connection closed " + conn);
super.closed(conn);
}
}
static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor {
public ProxyClientProtocolHandler() {
super();
}
@Override
protected void log(final Exception ex) {
ex.printStackTrace();
}
@Override
public void connected(final NHttpClientConnection conn,
final Object attachment) throws IOException, HttpException {
System.out.println("[proxy->origin] connection open " + conn);
super.connected(conn, attachment);
}
@Override
public void closed(final NHttpClientConnection conn) {
System.out.println("[proxy->origin] connection closed " + conn);
super.closed(conn);
}
}
static class ProxyConnPool extends BasicNIOConnPool {
public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) {
super(ioreactor, params);
}
public ProxyConnPool(
final ConnectingIOReactor ioreactor,
final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
final HttpParams params) {
super(ioreactor, connFactory, params);
}
@Override
public void release(final BasicNIOPoolEntry entry, boolean reusable) {
System.out.println("[proxy->origin] connection released " + entry.getConnection());
super.release(entry, reusable);
StringBuilder buf = new StringBuilder();
PoolStats totals = getTotalStats();
buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
buf.append(" of ").append(totals.getMax()).append("]");
System.out.println("[proxy->origin] " + buf.toString());
}
}
}
相关推荐
java 核心技术 第八版 core 卷1 Core Java, 8th Edition Core Java. Volume I. Fundamentals, 8th Edition Core Java. Volume II. Advanced Features, 8th Edition 官方网站 http://horstmann.com/corejava.html ...
Undocumented_Secrets_of_MATLAB_Java_Programming.part2 Undocumented_Secrets_of_MATLAB_Java_Programming 第2部分(共3部分) http://undocumentedmatlab.com/books/matlab-java This book shows how using ...
### WebLogic 10.3 安装及应用程序部署指南 #### 一、WebLogic 10.3 安装步骤详解 ##### 1. 准备安装环境 在进行 WebLogic 10.3 的安装之前,首先需要确保目标系统满足安装所需的最低配置要求。此外,还需上传 ...
Undocumented_Secrets_of_MATLAB_Java_Programming.part3 Undocumented_Secrets_of_MATLAB_Java_Programming 第3部分(共3部分) http://undocumentedmatlab.com/books/matlab-java This book shows how using ...
2. **HttpCore**:这是底层的网络通信引擎,提供了TCP/IP连接管理、HTTP协议解析和编码等功能。它是HttpClient的基础,优化了性能和资源管理。 3. **HttpAsyncClient**:是异步HTTP客户端,适用于高并发场景。它...
Apache Camel 是一个流行的开源企业集成库,它提供了一种模型化的途径来定义和执行复杂的业务流程,使得在Java应用程序中实现路由和消息处理变得简单。这个"apache-camel-2.9.0-RC1-src.zip"文件是Apache Camel ...
- **组件、辅助程序和行为**:描述了如何在插件中定义这些核心组件。 - **扩展插件**:提供了有关如何扩展插件功能的建议。 - **插件技巧**:分享了一些实用的插件开发技巧和最佳实践。 #### 九、Console and ...
The architecture section provides an overview of the core components and design patterns used in Hibernate. - **Overview**: This introduces the main concepts and components of Hibernate's ...
Chapter 2, Getting Started with Typescript, gives a summary of TypeScript's core concepts with practical examples on how to set up a vanilla JavaScript plus TypeScript project. All loosely typed ...
With a focus on practical examples and real-world scenarios, readers will learn how to build robust and scalable web applications using Angular 15's extensive features and enhancements.
By customizing middleware, developers can add powerful features without modifying core functionalities. **Key Concepts:** - **Middleware Functions:** Middleware functions are executed in a specific ...
The C++ standard library provides a set of common classes and interfaces that greatly extend the core C++ language. The library, however, is not self-explanatory. To make full use of its components - ...
E-learning components such as online testing and certifications are uniquely combined. <br>Productivity. The cause for rather than a consequence of e-learning. The Business Case for E-Learning ...
best practices at its core. But please be aware, these settings may break ; compatibility with older or less security conscience applications. We ; recommending using the production ini in production...
Construct a fully-functional MEAN application by using its components along with the best third-party modules Harness the power of the JavaScript ecosystem to effectively run, build, and test your ...
EurekaLog 7.5 (18-August-2016) 1)..Important: Installation layout was changed. All packages now have version suffix (e.g. EurekaLogCore240.bpl). No files are copied to \bin folder of IDE....
- **Emulation**: Thug emulates a core browser and other components like Java, Flash, and PDF readers to interact with potentially malicious content. - **Detection**: It can detect and analyze ...