在Http上传中,Apache common fileupload 的文件上传组件要求传入Inputstream对象。
而Netty中数据是按块(HttpChunk)来传送数据,没有直接的流。
因此要想在Netty中使用Apache Common Fileupload,则必须将httpchunk适配成InputStream。
实现Apache FileUpload
/**
* 用Netty来实现上传
*/
public class NettyFileUpload extends FileUpload {
private NettyRequestContext context;
public static final boolean isMultipartContent(HttpRequest request) {
if (HttpMethod.POST != request.getMethod()) {
return false;
}
if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
return false;
}
String contentType = request.getHeaders("Content-Type").get(0);
if (contentType == null) {
return false;
}
if (contentType.toLowerCase().startsWith("multipart/")) {
return true;
}
return false;
}
public NettyFileUpload(NettyRequestContext context) {
this.context = context;
}
public NettyFileUpload(FileItemFactory fileItemFactory) {
super(fileItemFactory);
}
public FileItemIterator getItemIterator() throws FileUploadException, IOException {
return super.getItemIterator(context);
}
public class NettyRequestContext implements RequestContext {
private String encoding;
private String contentType;
private int contentLength = -1;
/**
* 上传的内容流
*/
private InputStream inputStream;
public NettyRequestContext(String encoding, String contentType,
int contentLength, InputStream inputStream) {
this.encoding = encoding;
this.contentType = contentType;
this.contentLength = contentLength;
this.inputStream = inputStream;
}
@Override
public String getCharacterEncoding() {
return encoding;
}
@Override
public String getContentType() {
return contentType;
}
@Override
public int getContentLength() {
return contentLength;
}
@Override
public InputStream getInputStream() throws IOException {
// 不能直接用request的流,因为有HttpChunk
return inputStream;
}
@Override
public String toString() {
return "ContentLength=" + this.getContentLength() + ", ContentType="
+ this.getContentType();
}
public void closeInputStream() throws IOException {
getInputStream().close();
}
}
适配成InputStream:
public class NettyChunkInputStream extends InputStream {
private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128);
private HttpChunk currentChunk = null;
private volatile boolean closed;
public boolean putChunk(HttpChunk chunk) throws IOException {
if (!closed) {
try {
chunkQueue.put(chunk);
} catch (InterruptedException e) {
throw new IOException(e);
}
return true;
}
throw new IOException(" this inputstream has been closed!");
}
@Override
public int read() throws IOException {
byte resultByte = -1;
try {
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else if (!getChunk().isLast()) {
nextChunk();
if (getChunk().getContent().readable()) {
resultByte = getChunk().getContent().readByte();
} else {
return -1;
}
} else {
return -1;
}
} catch (InterruptedException e) {
throw new IOException(e);
}
// InputStream.read()返回0-255之间的int
return resultByte >= 0 ? resultByte : 256 + resultByte;
}
private HttpChunk getChunk() throws InterruptedException {
if (currentChunk == null) {
currentChunk = chunkQueue.take();
}
return currentChunk;
}
private void nextChunk() throws InterruptedException {
currentChunk = chunkQueue.take();
}
@Override
public int available() throws IOException {
throw new UnsupportedOperationException("unsupport available()");
}
@Override
public void close() throws IOException {
chunkQueue = null;
closed = true;
}
public boolean isClosed() {
return closed;
}
}
Netty FileUpload应用:
public class NettyUploadHandler extends SimpleChannelUpstreamHandler {
private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32);
private boolean hasReadChunk;
private NettyChunkInputStream chunkStream = new NettyChunkInputStream();
private NettyRequestContext context;
private volatile Map<String, String> resultMap = null;
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (!hasReadChunk) {
handleHttpRequest(ctx, e);
} else {
handleHttpChunk(e);
}
}
private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException {
HttpRequest request = (HttpRequest) e.getMessage();
if (isUploadFile(request)) {
handleUploadRequest(request);
} else {
ctx.sendUpstream(e);
}
}
private void handleUploadRequest(HttpRequest request) throws IOException {
context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream);
if (request.isChunked()) {
hasReadChunk = true;
} else {
HttpChunk chunk = new DefaultHttpChunk(request.getContent());
chunkStream.putChunk(chunk);
}
startUpload();
}
private void handleHttpChunk(MessageEvent e) throws IOException {
if (isUploadFinished()) {
writeResult(e.getChannel());
return;
}
HttpChunk chunk = (HttpChunk) e.getMessage();
chunkStream.putChunk(chunk);
if (chunk.isLast()) {
for (;;) {
if (isUploadFinished()) {
writeResult(e.getChannel());
return;
}
}
}
}
private boolean isUploadFinished() {
return resultMap != null || chunkStream.isClosed();
}
private boolean isUploadFile(HttpRequest request) {
return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request);
}
private void startUpload() {
EXECUTOR.execute(new UploadTask());
}
private void writeResult(Channel channel) {
String json = JsonUtil.beanToJson(resultMap);
byte[] data = json.getBytes();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setContent(buffer);
response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));
channel.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
}
class UploadTask implements Runnable {
public UploadTask() {
super();
}
@Override
public void run() {
long start = System.currentTimeMillis();
try {
NettyFileUpload upload = new NettyFileUpload(context);
FileItemIterator iter = upload.getItemIterator();
while (iter.hasNext()) {
FileItemStream item = iter.next();
//这里处理逻辑
}
resultMap = handler.getResult();
context.closeInputStream();
long end = System.currentTimeMillis();
System.out.println("spend time : " + (end - start));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。
PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。
分享到:
相关推荐
赠送jar包:netty-common-4.1.68.Final.jar; 赠送原API文档:netty-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-common-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.68.Final....
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
赠送jar包:netty-common-4.1.24.Final.jar; 赠送原API文档:netty-common-4.1.24.Final-javadoc.jar; 赠送源代码:netty-common-4.1.24.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.24.Final....
赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....
赠送jar包:netty-common-4.1.23.Final.jar; 赠送原API文档:netty-common-4.1.23.Final-javadoc.jar; 赠送源代码:netty-common-4.1.23.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.23.Final....
赠送jar包:netty-common-4.1.73.Final.jar; 赠送原API文档:netty-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-common-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.73.Final....
赠送jar包:netty-common-4.1.11.Final.jar; 赠送原API文档:netty-common-4.1.11.Final-javadoc.jar; 赠送源代码:netty-common-4.1.11.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.11.Final....
赠送jar包:netty-transport-native-unix-common-4.1.68.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.68....
赠送jar包:netty-common-4.1.27.Final.jar; 赠送原API文档:netty-common-4.1.27.Final-javadoc.jar; 赠送源代码:netty-common-4.1.27.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.27.Final....
赠送jar包:netty-common-4.1.11.Final.jar; 赠送原API文档:netty-common-4.1.11.Final-javadoc.jar; 赠送源代码:netty-common-4.1.11.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.11.Final....
赠送jar包:netty-common-4.1.16.Final.jar; 赠送原API文档:netty-common-4.1.16.Final-javadoc.jar; 赠送源代码:netty-common-4.1.16.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.16.Final....
在本文中,我们将深入探讨Netty在实际应用中的实例——对象传递调用,以及如何解决TCP粘包问题。同时,我们还会讨论Java序列化方案在Netty中的编解码对比。 首先,让我们来看看TCP粘包问题。在TCP协议中,由于其...
Netty在Android开发中的应用实战系列(二)——— Encoder | Decoder | Handler 的使用:https://azhon.blog.csdn.net/article/details/100831777 Netty在Android开发中的应用实战系列(三)——— 心跳处理 | 断线...
Netty 是一个高性能、...总之,“最新Netty中文文档CHM版”对于想要学习和使用Netty的开发者来说,是一份宝贵的参考资料,它可以帮助你快速理解Netty的工作原理,掌握其主要功能,从而在实际开发中发挥出Netty的潜力。
赠送jar包:netty-common-4.1.73.Final.jar; 赠送原API文档:netty-common-4.1.73.Final-javadoc.jar; 赠送源代码:netty-common-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.73.Final....
赠送jar包:netty-common-4.1.74.Final.jar; 赠送原API文档:netty-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-common-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.74.Final....
Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码
赠送jar包:netty-transport-native-unix-common-4.1.74.Final.jar; 赠送原API文档:netty-transport-native-unix-common-4.1.74.Final-javadoc.jar; 赠送源代码:netty-transport-native-unix-common-4.1.74....
Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源...