`
canofy
  • 浏览: 829688 次
  • 性别: Icon_minigender_1
  • 来自: 北京、四川
社区版块
存档分类
最新评论

http stream

    博客分类:
  • j2EE
 
阅读更多
StringBuilder sb = new StringBuilder();   
sb.append("HTTP/1.1 200 OK\r\n");   
sb.append("Content-Type: text/plain\r\n");   
sb.append("Transfer-Encoding: chunked\r\n\r\n");   
sb.append("25\r\n");           
sb.append("This is the data in the first chunk\r\n"); // 37 bytes   
sb.append("\r\n1A\r\n");   
sb.append("and this is the second one"); // 26 bytes   
sb.append("\r\n0\r\n\r\n");  


十六进制包长+\r\n+报文包+\r\n  为一个传输单元

0+\r\n+\r\n 当遇到这种空传输单元时结束

下面是客户端例子

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;



public class Client {
	public boolean isAsync=false;
	
	
	/**
	 * 建立socket
	 * @param ip
	 * @param port
	 * @return
	 * @throws IOException 
	 * @throws NumberFormatException 
	 * @throws PachiraAsrSocketCreateException
	 */
	protected SocketChannel createSocketChannel(String ip,String port) throws NumberFormatException, IOException {
		SocketChannel socketChannel=null;
			if(isAsync){
				 socketChannel = SocketChannel.open();  
				 socketChannel.configureBlocking(false);  
		        //向服务端发起连接  
		        if (!socketChannel.connect(new InetSocketAddress(ip, Integer.parseInt(port)))){  
		            //不断地轮询连接状态,直到完成连接  
		            while (!socketChannel.finishConnect()){  
		                //在等待连接的时间里,可以执行其他任务,以充分发挥非阻塞IO的异步特性  
		                //这里为了演示该方法的使用,只是一直打印"."  
		            	try {
							Thread.sleep(10);
						} catch (InterruptedException e) {
						}
//		                SrvLogger.debug(getClass(), "");  
		            }  
		        }
			}else{
				socketChannel = SocketChannel.open();
				socketChannel.connect(new InetSocketAddress(ip, Integer.parseInt(port)));
			}
		
		return socketChannel;
	}
	
	
	/**
	 * 关闭socket
	 * @param socketChannel
	 * @param uuid
	 * @throws IOException 
	 */
	protected void closeSocketChannel(SocketChannel socketChannel) throws IOException{
		if(socketChannel!=null){
			socketChannel.close();
		}
	}
	/**
	 * 传输数据
	 * @param socket
	 * @param in
	 * @param uuid
	 * @param audioType
	 * @throws IOException 
	 */
	protected boolean sendStringData(final SocketChannel socketChannel ,final String str) throws IOException{
		ByteBuffer buffer=ByteBuffer.wrap(str.getBytes(), 0, str.length());
		int size=0;
		int wl=0;
		System.out.println("buf.limit="+buffer.limit());
		wl=socketChannel.write(buffer);
		while (buffer.hasRemaining()) {
			if (wl < 0){ 
		        System.out.println("sendData len is -1;size="+size);
		        break;
		    } 
			if (wl == 0) {
				System.out.println("sendData len is 0 ;size="+size);
			}
			size+=wl;
			
		}
		
		buffer.flip();
		
		return true;
	}
	
	
	/**
	 * 传输数据
	 * @param socket
	 * @param in
	 * @param uuid
	 * @param audioType
	 */
	protected boolean sendData(final SocketChannel socketChannel ,final InputStream is){
		FutureTask<Integer> task  = new FutureTask<Integer>(new Callable<Integer>(){  
		    public Integer call() {
		    		System.out.println("sendData start...;");
		    		byte[] buf = new byte[8096];
		    		int totalSize=0;
		    		int sendTotalSize=0;
		    		try {
		    			int read = is.read(buf, 0, buf.length);
		    			while (read > 0) {
		    				totalSize+=read;
	    					ByteBuffer buffer=ByteBuffer.wrap(buf, 0, read);
    						int size=0;
    						int wl=0;
    						wl=socketChannel.write(buffer);
		    				while (buffer.hasRemaining()) {
		    					if (wl < 0){ 
	    					        System.out.println("sendData len is -1;size="+size);
	    					        break;
	    					    } 
		    					if (wl == 0) {
		    						System.out.println("sendData len is 0 ;size="+size);
		    					}
		    					size+=wl;
		    					
		    				}
		    				
		    				buffer.flip();
		    				sendTotalSize+=read;
		    				read = is.read(buf, 0, buf.length);
		    			}
		    			sendStringData(socketChannel, "------------V2ymHFg03ehbqgZCaKO6jy--");
		    			System.out.println("sendData end,sendTotalSize="+sendTotalSize+";totalSize="+totalSize);
		    		}catch (Exception e) {
		    			e.printStackTrace();
		    		}finally{
		    			
		    		}
		    		
		    		
		    	return new Integer(8); 
		    }
		});
		ExecutorService sendDataPool=Executors.newCachedThreadPool();
		sendDataPool.execute(task);
		return true;
		
	}
	
	
	/**
	 * 传输数据
	 * 十六进制包长+\r\n+报文包+\r\n  为一个传输单元

		0+\r\n+\r\n 当遇到这种空传输单元时结束
	 * @param socket
	 * @param in
	 * @param uuid
	 * @param audioType
	 */
	protected boolean sendDataChunk(final SocketChannel socketChannel ,final InputStream is){
		FutureTask<Integer> task  = new FutureTask<Integer>(new Callable<Integer>(){  
		    public Integer call() throws IOException {
		    		System.out.println("sendData start...;");
		    		sendStringData(socketChannel, "\r\n");
		    		String parameter="------------V2ymHFg03ehbqgZCaKO6jy\r\nContent-Disposition: form-data;name=\"chon.wav\";opcode=\"transcribe_audio\";sessionid=\"\";filename=\"chon.wav\";type=\"23\";\r\n\r\n";
		    		sendStringData(socketChannel, Integer.toHexString(parameter.length())+"\r\n");
		    		sendStringData(socketChannel, parameter+"\r\n");
//		    		sendStringData(socketChannel, Integer.toHexString("1234".length())+"\r\n1234\r\n");
//		    		sendStringData(socketChannel, Integer.toHexString("1234".length())+"\r\n1234\r\n");
//		    		sendStringData(socketChannel, Integer.toHexString("1234".length())+"\r\n1234\r\n");
//		    		sendStringData(socketChannel, Integer.toHexString("1234".length())+"\r\n1234\r\n");
//		    		sendStringData(socketChannel, Integer.toHexString("------------V2ymHFg03ehbqgZCaKO6jy".length())+"\r\n");
//		    		sendStringData(socketChannel, "------------V2ymHFg03ehbqgZCaKO6jy\r\n");
//		    		String parameter="Content-Disposition: form-data;name=\"chon.wav\";opcode=\"transcribe_audio\";sessionid=\"\";filename=\"chon.wav\";type=\"23\";";
//		    		sendStringData(socketChannel, Integer.toHexString(parameter.length())+"\r\n");
//		    		sendStringData(socketChannel, parameter+"\r\n");
		    		
		    		byte[] buf = new byte[8096];
		    		int totalSize=0;
		    		int sendTotalSize=0;
		    		try {
		    			int read = is.read(buf, 0, buf.length);
		    			while (read > 0) {
		    				
		    				totalSize+=read;
	    					ByteBuffer buffer=ByteBuffer.wrap(buf, 0, read);
	    					
	    					String hex= Integer.toHexString(read);
	    					System.out.println("read="+read+";hex="+hex);
	    					sendStringData(socketChannel,hex+"\r\n");
    						int size=0;
    						int wl=0;
//    						System.out.println("send..");
    						wl=socketChannel.write(buffer);
//    						System.out.println("send...");
		    				while (buffer.hasRemaining()) {
		    					if (wl < 0){ 
	    					        System.out.println("sendData len is -1;size="+size);
	    					        break;
	    					    } 
		    					if (wl == 0) {
		    						System.out.println("sendData len is 0 ;size="+size);
		    					}
		    					size+=wl;
		    					
		    				}
		    				sendStringData(socketChannel, "\r\n");
		    				buffer.flip();
		    				sendTotalSize+=read;
		    				read = is.read(buf, 0, buf.length);
		    				Thread.sleep(50);
		    			}
		    			sendStringData(socketChannel, Integer.toHexString("------------V2ynHFg03ehbqgZCaKO6jy--".length())+"\r\n");
		    			sendStringData(socketChannel, "------------V2ymHFg03ehbqgZCaKO6jy--");
		    			sendStringData(socketChannel, "\r\n");
		    			sendStringData(socketChannel, "0\r\n\r\n");
		    			System.out.println("sendData end,sendTotalSize="+sendTotalSize+";totalSize="+totalSize);
		    		}catch (Exception e) {
		    			e.printStackTrace();
		    		}finally{
		    			
		    		}
		    		
		    		
		    	return new Integer(8); 
		    }
		});
		ExecutorService sendDataPool=Executors.newCachedThreadPool();
		sendDataPool.execute(task);
		return true;
		
	}
	
	/**
	 * 读取
	 * @param inputStream
	 * @param buf
	 * @return
	 * @throws IOException
	 */
	protected boolean readData(SocketChannel socketChannel, ByteBuffer buf) {
		boolean ret = true;
		long count=0;
		try {
			count = socketChannel.read(buf);
//			if(this.isAsync){
				while(count<buf.limit()){
					if(count==-1){
						System.out.println("readData count is -1");
						return false;
					}
					count += socketChannel.read(buf);
					try {
						Thread.sleep(10);
					} catch (InterruptedException e) {
						return false;
					}
				}
//				System.out.println("buffer.position()="+buf.position()+";buffer.limit()="+buf.limit());
//				System.out.println("count="+count);
				
//			}
			
			if(count>0){
				buf.flip();
			}
		} catch (Exception e) {
			ret=false;
		}finally{
			System.out.println("readData count="+count+";bufLen="+buf.limit());
		}
		
		return ret;
	}
	
	
	/**
	 * 读取
	 * @param inputStream
	 * @param buf
	 * @return
	 * @throws IOException
	 */
	protected boolean readDataBySocket(SocketChannel socketChannel) throws IOException {
		Socket socket=socketChannel.socket();
		InputStream in=socket.getInputStream();
		byte[] buf1=new byte[7];
		while(this.read(in, buf1)){
			System.out.println("result"+new String(buf1));
		}
		
		return false;
	}
	
	protected boolean read(InputStream inputStream, byte[] buf)
			throws IOException {
		boolean ret = true;
		int totalSize = buf.length;
		int read = inputStream.read(buf, 0, buf.length);
		while (read < totalSize) {
			read += inputStream.read(buf, read, (totalSize - read));
		}

		return ret;

	}
	
	
	public void nonstream() throws IOException{
		String ip="127.0.0.1";
		String port="8080";
		File file=new File("I:/1/pase/90s_9.wav");
		FileInputStream fis=new FileInputStream(file);
		SocketChannel socketChannel=createSocketChannel(ip, port);
		String parameter="------------V2ynHFg03ehbqgZCaKO6jy\r\nContent-Disposition: form-data;name=\"chon.wav\";opcode=\"transcribe_audio\";sessionid=\"\";filename=\"chon.wav\";type=\"0\";";
		sendStringData(socketChannel, "POST /project/uploader HTTP/1.1\r\n");
		sendStringData(socketChannel, "Accept: */*\r\n");
		sendStringData(socketChannel, "User-Agent: Mozilla/4.0\r\n");
		sendStringData(socketChannel, "Content-Length: "+(file.length()+parameter.length())+"\r\n");
		sendStringData(socketChannel, "Accept-Language: en-us\r\n");
		sendStringData(socketChannel, "Accept-Encoding: gzip, deflate\r\n");
		sendStringData(socketChannel, "Host: 127.0.0.1\r\n");
		sendStringData(socketChannel, "Content-Type: multipart/form-data;boundary=----------V2ymHFg03ehbqgZCaKO6jy\r\n");
		sendStringData(socketChannel, "\r\n");
		sendStringData(socketChannel, parameter+"\r\n");
		sendStringData(socketChannel, "\r\n");
		//send file1449930
		sendData(socketChannel, fis);
//		client.sendStringData(socketChannel, "------------V2ymHFg03ehbqgZCaKO6jy--");
		
		ByteBuffer bb=ByteBuffer.allocate(2000);
		readData(socketChannel, bb);
		byte[] b=new byte[bb.limit()];
		bb.get(b, 0, bb.limit()-1);
		System.out.println(new String(b));
	}
	
	public static void main(String[] args) throws NumberFormatException, IOException {
		String ip="localhost";
		String port="8080";
		Client client=new Client();
//		File file=new File("I:/1/a.txt");
		File file=new File("I:/1/pase/90s_9.wav");
		FileInputStream fis=new FileInputStream(file);
		SocketChannel socketChannel=client.createSocketChannel(ip, port);
		
		client.sendStringData(socketChannel, "POST /project/uploader HTTP/1.1\r\n");
		client.sendStringData(socketChannel, "Accept: */*\r\n");
		client.sendStringData(socketChannel, "User-Agent: Mozilla/4.0\r\n");
//		client.sendStringData(socketChannel, "Content-Length: "+(file.length()+parameter.length())+"\r\n");
		client.sendStringData(socketChannel, "Transfer-Encoding: chunked\r\n");
		client.sendStringData(socketChannel, "Accept-Language: en-us\r\n");
		client.sendStringData(socketChannel, "Accept-Encoding: gzip, deflate\r\n");
		client.sendStringData(socketChannel, "Host: 127.0.0.1\r\n");
		client.sendStringData(socketChannel, "Content-Type: multipart/form-data;boundary=----------V2ynHFg03ehbqgZCaKO6jy\r\n");
		
		
		//send file1449930
		client.sendDataChunk(socketChannel, fis);
		while(true){
			System.out.println("read start....");
			ByteBuffer bb=ByteBuffer.allocate(200);
			boolean flag=client.readData(socketChannel, bb);
			byte[] b=new byte[bb.limit()];
			bb.get(b, 0, bb.limit()-1);
			System.out.println(new String(b,"UTF-8"));
			if(!flag){
				System.out.println("socket close....");
				client.closeSocketChannel(socketChannel);
				break;
			}
		}
		System.out.println("read data end....");
		System.exit(0);
	}
	
	
}
分享到:
评论

相关推荐

    httpstream:HTTPStream 是 Python 的 HTTP 客户端库,具有易于使用的 API 并支持增量 JSON 文档检索

    安装HTTPStream 托管在 PyPI 上,因此要安装,只需使用pip : pip install httpstream快速开始&gt;&gt;&gt; from httpstream import get&gt;&gt;&gt; get("https://api.duckduckgo.com/?q=neo4j&format=json").content{'Abstract': '...

    node-httpstream:HTTP资源的可靠可读流

    httpstream:HTTP资源的可靠可读流 httpstream提供了一个Readable流接口(用于Node 0.10及更高版本),该接口通过从上次中断的地方重试请求来抽象化上游的瞬时故障。 httpstream需要Node 0.10或更高版本以及node-...

    IOS stream模拟http请求获取数据

    ios 有三个demo 1.NSURL 下载网络图片(block和delegate两种方式) 2.NSURLSession 下载网络图片,请求json数据(自己封装的网络block) 3.利用 NSstream 来模拟http请求获取数据

    VLC_stream.rar_VLC组播_igmp_vlc http stream_vlc visual_vlc播放组播流

    VLC 可以播放组播流, 该软件能够根据IGMP joint, leave, quiery report 自动控制VLC的开关。这样可以自适应的减少网络的带宽。 适合与在局域网能搭建视频播放平台。 希望和朋友们进行交流。QQ119818027.

    stream-http:浏览器中的流节点http

    根据其名称, stream-http尽可能在请求完成之前尝试向其调用者提供数据。 支持反压,允许浏览器仅在消耗时以最快的速度从服务器提取数据,其受以下支持: Chrome&gt; = 58(使用fetch和WritableStream ) 下列...

    streamsets-帮助手册

    支持 GET 和 POST 等 HTTP 请求方法。 **2.9.2 负载和参数** 可以自定义发送给外部系统的负载内容和 URL 参数。 **2.9.3 示例** 例如,当数据流处理完一批数据后,可以发送一个 POST 请求给外部监控系统。 **...

    h5stream-master

    2. **Streaming Protocol Support**: 可能支持不同的流媒体协议,如HLS (HTTP Live Streaming)、DASH (Dynamic Adaptive Streaming over HTTP) 或RTMP (Real-Time Messaging Protocol),使得用户可以流畅地在线观看...

    stream插件上传文件

    Node.js中有许多流行的Stream库,如`multer`用于文件上传,`http2-streams`用于HTTP/2 Stream操作,以及`fs.createReadStream`和`fs.createWriteStream`等内置API,可以帮助开发者轻松实现Stream插件的功能。...

    Stream跨域上传文件

    在Web开发中,跨域(Cross-...总的来说,Stream跨域上传文件涉及到了前端的文件处理、HTTP请求、CORS策略,以及后端的数据接收和保存等多个环节。通过Stream2和Stream3这两个工程,开发者可以深入理解并实践这一过程。

    ngx_stream_ssl_preread_module调研.docx

    用户可以通过官方文档 http://nginx.org/en/docs/stream/ngx_stream_ssl_preread_module.html 查看该模块的所有指令。 使用示例 以反向代理 www.baidu.com 为例,百度 resolve 出来的地址是 39.156.66.18:443。...

    http-client-stream

    http-客户端-流用法使用 http 请求作为流: var http = require ( 'http-client-stream' )var endpoint = http ( 'http://www.google.com' )var stream = endpoint . createStream ( )process . stdin . pipe ( ...

    H5stream 中文文档

    H5stream的配置文件位于conf/h5ss.conf,配置文件中定义了HTTP、RTSP、RTMP、FLV、HLS、WEBRTC等服务器的配置,例如RTSP服务器配置中SSL代表RTSP over TCP/TLS。配置文件还支持日志和线程池的设置,以及视频源的配置...

    WCF Stream 传输

    本示例主要关注的是WCF中的Stream传输模式,这是一种高效的数据传输方式,特别适用于处理大容量数据,如文件上传或下载。 Stream传输模式在WCF中允许数据以流的形式进行处理,而不是一次性加载到内存中。这种方式...

    Laravel开发-stream-parser

    当我们处理大量数据时,例如读取大型文件或处理HTTP流,传统的逐行读取或一次性加载整个文件的方式可能会消耗大量内存,这时"stream-parser"的概念就显得尤为重要。本文将详细探讨在Laravel开发中如何使用stream-...

    go-http-stream-reader:在 Go 中使用长时间运行的 HTTP 流(例如 Twitter 流 API)

    go-http-stream-reader 使 Go 应用程序能够使用长时间运行的 HTTP 请求流,例如 。 如果出现错误(例如临时网络中断、停止或关闭连接等),它会自动重新连接,并以尊重远程主机退避规则的可扩展方式重新连接。 ...

    SocketStream.zip

    1. **实时通信**:SocketStream利用WebSocket协议提供双向实时通信,使得服务器和客户端可以即时交换数据,无需频繁地发送HTTP请求,从而提高了效率和用户体验。 2. **模块化**:SocketStream采用了模块化设计,...

    前端开源库-stream-concat

    在实际应用中,`stream-concat`常用于文件操作,比如合并多个文件内容,或者在读取HTTP响应流时将不同响应部分组合在一起。它还可以与Node.js的其他流工具如`zlib`(压缩/解压缩)或`crypto`(加密/解密)配合使用,...

    springboot整合stream使用rabbitmq作为消息中间件

    - **请求/响应模式**: 通过使用RabbitMQ的TTL(Time To Live)和回执机制,可以实现类似HTTP的请求-响应模型。 在实际开发中,使用Spring Boot Stream和RabbitMQ的组合可以极大地提升微服务架构的灵活性和可靠性,...

    adodb.stream

    在编程中,特别是VBScript、VBA或者Classic ASP中,`ADODB.Stream`经常被用来读取、写入和操作文件,进行数据传输或处理HTTP请求。以下将详细阐述`ADODB.Stream`的相关知识点: 1. **什么是ADODB.Stream?** `...

Global site tag (gtag.js) - Google Analytics