`
rcfalcon
  • 浏览: 227992 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

一个基于socket的资源共享平台的实现(二)

阅读更多

继续上次说的,其实任务调度宏观上普遍分为两种,实现上总的来说就是一个串行、一个并行,上次我们介绍的TCP传送服务我们是使用并行的实现的(任务池),这次我们再来一个串行的,这就简单多了,就是一个队列,FIFO,我们用它来实现下载任务(假设我们下载任务只能单独进行)。

 

我们用一个NS_Download_Pool类来封装对其的管理。

 

#ifndef NETSHARE_DOWNLOAD_POOL_H_
#define NETSHARE_DOWNLOAD_POOL_H_

#include "StdioFileEx/StdioFileEx.h"
#include "NetShareIndexManager.h"
#include "DownloadListDlg.h"
#include <string>
#include <vector>
#include <algorithm>

#include "cpplibbase.h"

using namespace std;

CString const DOWNLOAD_FILELIST_FILENAME("c:\\download_filelistfile");

//文件状态
enum FileStatus
{
	Ready = 0, //未下载
	Downloading, //下载中
	Failed, //下载失败
	Success, //完成
	Paused //暂停
};

//下载文件信息类
struct DownloadFileInfoType
{
	DownloadFileInfoType()
		: filename(_T(""))
		, size(_T(""))
		, ipaddr(_T(""))
		, status( Ready )
		, percent( 0 )
		, finish_size( 0 )
	{}

	bool operator==( DownloadFileInfoType& opt )
	{
		return ( filename == opt.filename &&
			size == opt.size &&
			ipaddr == opt.ipaddr &&
			targetdir == opt.targetdir );
	}

	CString filename;//文件名
	CString size;//大小
	CString ipaddr;//源IP
	CString targetdir;//保存地址
	FileStatus status;//文件状态
	double percent;//下载完成百分比
	DWORD finish_size;//完成的字节数
};

namespace
{
	bool IsComplete( DownloadFileInfoType const& unit )
	{
		return ( unit.status == Success );
	}
}

//下载池类
class NS_Download_Pool
{
public:
	NS_Download_Pool()
		: m_pDlg( NULL )
		, m_pList( NULL )
	{ Load(); }

	~NS_Download_Pool(){ Save(); }

	//增加
	void Add( DownloadFileInfoType info )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		if( Exist( info ) )
			return;
		else
			files_.push_back( info );

	}

	void Save()
	{
		CStdioFileEx fp;
		fp.Open(DOWNLOAD_FILELIST_FILENAME,CFile::modeWrite|CFile::modeCreate);   //|CFile::modeCreate
		for( size_t i=0;i<files_.size();++i)
		{
			DownloadFileInfoType& opt = files_[i];
			fp.WriteString( opt.filename );
			fp.WriteString(_T("\n"));
			fp.WriteString( opt.size );
			fp.WriteString(_T("\n"));
			fp.WriteString( opt.ipaddr );
			fp.WriteString(_T("\n"));
			fp.WriteString( opt.targetdir );
			fp.WriteString(_T("\n"));
			CString write("");
			write.Format(_T("%d"),opt.status);
			fp.WriteString( write );
			fp.WriteString(_T("\n"));
			write.Format(_T("%.2lf"),opt.percent);
			fp.WriteString( write );
			fp.WriteString(_T("\n"));
			write.Format(_T("%d"),opt.finish_size);
			fp.WriteString( write );
			fp.WriteString(_T("\n"));
		}
		fp.Close(); 
	}

	void Load()
	{
		CStdioFileEx fp;
		if(fp.Open(DOWNLOAD_FILELIST_FILENAME,CFile::modeRead))   
		{
			CString test;
			while( NULL!= fp.ReadString( test ) )
			{
				if( test == "" )
					break;
				DownloadFileInfoType tmp;
				tmp.filename = test;
				fp.ReadString( tmp.size );
				fp.ReadString( tmp.ipaddr );
				fp.ReadString( tmp.targetdir );

				fp.ReadString( test );
				char* p = NetShareIndexManager::UnicodeToAnsi(test.GetBuffer(0));
				tmp.status = (FileStatus)(atoi( p ));
				delete p;

				fp.ReadString( test );
				p = NetShareIndexManager::UnicodeToAnsi(test.GetBuffer(0));
				tmp.percent = atof( p );
				delete p;

				fp.ReadString( test );
				p = NetShareIndexManager::UnicodeToAnsi(test.GetBuffer(0));
				tmp.finish_size = atoi( p );
				delete p;

				Add( tmp );
			}
			fp.Close();
		}
		else
		{
			Clear();
		}
	}

	//删除
	void Remove( DownloadFileInfoType& info )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		vector<DownloadFileInfoType>::iterator iter;
		for( iter = files_.begin(); iter != files_.end(); ++iter )
		{
			if( *iter == info )
			{
				files_.erase( iter );
				return;
			}
		}
		Save();

	}

	//删除容器src内指定序号的元素
	template<class T>
	void Remove( vector<T>& src, vector<size_t> const& indexs )
	{
		vector<T> result;
		int count = 0;
		for( vector<T>::iterator iter = src.begin();
			 iter != src.end();
			 ++iter )
		{
			if( find( indexs.begin(), indexs.end(), count ) == indexs.end() )
			{
				result.push_back( *iter );
			}
			count++;
		}
		src = result;
	}

	//根据索引删除
	void Remove( vector<size_t> delete_indexs )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		Remove( files_, delete_indexs );
		Save();
	}



	//移除所有已完成的下载
	void RemoveAllCompleted()
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		files_.erase( remove_if( files_.begin(), files_.end(), IsComplete ), files_.end() );
		Save();
		Show();
	}


	//清空
	void Clear()
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		files_.clear();
	}

	//弹出第一个元素
	DownloadFileInfoType Pop()
	{
		for( vector<DownloadFileInfoType>::iterator iter = files_.begin(); iter != files_.end(); ++iter )
		{
			//找未下的文件
			if( iter->status == Ready )
			{
				DownloadFileInfoType tmp = *iter;
				iter->status = Downloading;
				curr_iter_ = iter;
				return tmp;
			}
		}
		assert( TRUE );//不可能发生
		DownloadFileInfoType tmp;
		return tmp;
	}

	//下载成功
	void DownloadSuccess()
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		curr_iter_->status = Success;
		Save();
	}

	//下载失败
	void DownloadFailed( DWORD CompleteSize )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		curr_iter_->status = Failed;
		curr_iter_->finish_size = CompleteSize;
		Save();
	}

	//设置当前下载已完成字节数
	void CurrFinish( DWORD size )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		curr_iter_->finish_size = size;
	}

	//检测是否为空
	bool Empty()
	{
		for( vector<DownloadFileInfoType>::iterator iter = files_.begin(); iter != files_.end(); ++iter )
		{
			//找未下的文件
			if( iter->status == Ready )
			{
				return FALSE;
			}
		}
		return TRUE;
	}

	//检测是否存在
	bool Exist( DownloadFileInfoType info )
	{
		vector<DownloadFileInfoType>::iterator iter;
		for( iter = files_.begin(); iter != files_.end(); ++iter )
		{
			if( *iter == info )
			{
				return true;
			}
		}
		return false;
	}

	void SetDlgPtr( DownloadListDlg* p_dlg, CListCtrl* pList )
	{
		m_pDlg = p_dlg;
		m_pList = pList;
	}

	//重置下载
	void SetReady( vector<size_t> select_indexs )
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		//若用户没有点选具体文件
		if( select_indexs.size() == 0 )
		{
			for( size_t i = 0; i < files_.size(); ++i )
				if( files_[ i ].status != Success && files_[ i ].status != Downloading )
					files_[ i ].status = Ready;
			return;
		}

		//对具体文件操作
		for( size_t i = 0; i < select_indexs.size(); ++i )
		{
			if( files_[ select_indexs[i]].status != Success && files_[ select_indexs[i]].status != Downloading )
				files_[ select_indexs[i]].status = Ready;
		}
	}

	//停止
	void StopAll()
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		for( vector<DownloadFileInfoType>::iterator iter = files_.begin(); iter != files_.end(); ++iter )
		{
			//找未下的文件
			if( iter->status != Success )
			{
				iter->status = Paused;
			}
		}
	}

	//显示
	void Show()
	{
		cpplib::resource::MutexLock MyMutex(FileListMutex);
		try
		{
			if( !m_pDlg || !m_pList )
				return;	
			/*if( ::IsWindowVisible( *m_pDlg ) == FALSE )
				return;*/

			int theItemCount = m_pList->GetItemCount();
			vector<size_t> select_indexs;

			for( int i=0; i < theItemCount; i++ )
			{
				if( m_pList->GetItemState(i, LVIS_SELECTED) == LVIS_SELECTED )
				{
					select_indexs.push_back(i);
				}
			}


			UINT ListIndex = 0;
			wchar_t* pTmp;

			m_pList->DeleteAllItems();

			vector<DownloadFileInfoType>::iterator iter;
			for( iter = files_.begin(); iter != files_.end(); ++iter )
			{
				DownloadFileInfoType const& MyUnit = *iter;
				m_pList->InsertItem(ListIndex,MyUnit.filename);//文件名
				m_pList->SetItemText(ListIndex,1,MyUnit.size);//文件大小
				m_pList->SetItemText(ListIndex,2,MyUnit.ipaddr);//IP地址
				m_pList->SetItemText(ListIndex,3,MyUnit.targetdir);//保存路径
				CString status("");
				switch( MyUnit.status )
				{
				case Ready:
					status = "待下载";
					break;
				case Downloading:
					status = "下载中";
					break;
				case Failed:
					status = "停止";
					break;
				case Success:
					status = "已完成";
					break;
				case Paused:
					status = "暂停";
					break;
				default:
					status = "格式错误";
					break;
				}
				m_pList->SetItemText(ListIndex,4,status);//状态
 
				if( Success == MyUnit.status )
				{
					m_pList->SetItemText(ListIndex,5,CString("100.00"));//百分比
				}
				else
				{
					CString process( "" );

					CString tmp = MyUnit.size;
					char* p = NetShareIndexManager::UnicodeToAnsi( tmp );
					double size = atof( p );
					delete p;
					double percent = 0;
					if( size < 0.01 )
						percent = 0;
					else
						percent = (double)MyUnit.finish_size * 100 / (double)(size * 1024 * 1024 );
					process.Format(_T("%.2lf%%"),percent );

					m_pList->SetItemText(ListIndex,5,process);//百分比
				}
				ListIndex++;
			}

			for( size_t i=0; i < select_indexs.size(); i++ )
			{
				m_pList->SetItemState(select_indexs[i], LVIS_SELECTED,LVIS_SELECTED );
			}

			m_pDlg->UpdateData( FALSE );
		}//try
		catch(...)
		{}
	}

private:
	vector<DownloadFileInfoType> files_;
	vector<DownloadFileInfoType>::iterator curr_iter_; //当前下载的文件结构迭代器

	cpplib::resource::Mutex FileListMutex;
	DownloadListDlg* m_pDlg;
	CListCtrl* m_pList;
};
#endif

 

 

接下来我们针对资源传送过程中限速进行分析和实现。

如果需要将发送速度限制在一个值,我们可以这么理解,单位时间内最多允许发送数据为N,若超过之,就需要降低速度,若不足,则需要提高速度。

如何控制速度?这里我们采用最朴素的方法,sleep,只要将sleep的具体值控制好,我们是可以控制速度的。

 

1. 若超过限制的速度,则增加sleep的时间;

2. 若不足限制的速度,则减少sleep的时间;

 

那么sleep的变化值怎么考虑?我们希望速度变化越快趋近于预期值越好,这里我们采用 计算机网络里提到的网络带宽探测的模型。

假设刚开始速度为0,采用一个指数型计算公式提升速度,当发现有一点超速了,退回到上一个状态,采用直线逼近。

横坐标t为时间轴,纵坐标s为速度,1为起始位置,速度为0,2点之后探测到速度超过限制,退回到2点,然后采用直线逼近,直到逼近速度极限。

 

细心的用户能发现,使用迅雷等下载软件时,也是呈现类似于这种的现象,先速度猛增,然后在到一个值之后,回退一点,开始线性缓慢增长,直到问题。

 

未完待续……

 

分享到:
评论

相关推荐

    基于socket的文件传输

    总的来说,这个项目提供了一个基础的Socket文件传输模型,通过实践可以加深对Java网络编程的理解,为构建更复杂的应用,如聊天室、文件共享系统等打下坚实基础。对于开发者来说,熟练掌握Socket编程不仅可以增强解决...

    Linux上实现基于Socket_的多进程实时通信

    在Linux系统中,实现基于Socket的多进程实时通信是一项重要的技术,它允许不同的进程间高效地交换数据。本文将详细介绍如何创建一个通信服务器(server)来作为中介,处理多个客户端(client)的连接和通信请求。 ...

    用基于Socket做的类似资源管理器的文件共享

    本项目“用基于Socket做的类似资源管理器的文件共享”旨在创建一个简易的文件共享系统,其设计灵感来源于Windows资源管理器,使用户能够方便地进行文件的上传和下载,同时在操作过程中提供进度显示。 首先,Socket...

    基于Socket的IPC消息平台设计

    基于Socket的IPC消息平台为进程间通信提供了一个灵活、高效且易于集成的解决方案。通过线程池、消息队列和消息传输通道等关键技术的应用,不仅提高了系统的性能和吞吐量,还实现了真正的跨平台和跨语言特性,极大地...

    Linux下基于socket多线程并发通信的实现

    总结来说,Linux下的多线程并发通信基于socket实现,结合TCP/IP协议,能够高效地处理多个客户端的并发请求。开发者需要理解套接字的概念、类型以及编程原理,同时掌握多线程编程技巧,包括线程安全、线程池等,以...

    QT实现的基于TCP Socket的共享白板.7z

    在本项目中,“QT实现的基于TCP Socket的共享白板”是一个利用QT库构建的、通过TCP Socket进行通信的多人协作工具。下面将详细介绍QT库、TCP Socket以及共享白板的相关知识点。 1. QT库: - QT库提供了一整套的C++...

    基于SOCKET和多线程的应用程序间通信技术的研究.pdf

    基于SOCKET和多线程的应用程序间通信技术,不仅可以实现跨平台的数据通信,还能在保证数据准确性和完整性的同时,提高系统的响应速度和处理能力。通过对多线程的合理利用,可以有效地分配系统资源,优化数据处理流程...

    基于Socket的多用户网络游戏

    本项目“基于Socket的多用户网络游戏”是一个典型的利用Socket实现的多人在线交互游戏,以井字游戏(Tic-Tac-Toe)为例,展示了如何通过网络连接让多个玩家在同一平台上进行实时对战。 首先,我们要理解什么是...

    Linux下用Socket和多线程实现简单聊天室

    - **线程概念**:线程是进程中的一个执行流,同一进程内的多个线程可以并发执行,共享进程资源,提高程序并发性能。 - **创建线程**:在Linux中,可以使用`pthread_create()`函数创建新的线程,传入线程函数指针和...

    Java基于Socket文件传输示例

    Java基于Socket文件传输示例是一种常见的网络编程应用场景,主要用于实现两个网络节点间的文件共享和交换。Socket编程是Java网络通信的基础,它提供了低级别的、面向连接的、双向的数据传输通道。在这个示例中,我们...

    SOCKET实现远程算术运算(简洁经典实用)

    综合以上,这个项目通过运用SOCKET实现网络通信,利用多线程提升并发处理能力,结合非阻塞模式确保服务的实时性,以及使用STL优化数据管理和算法执行,构建了一个高效且实用的远程算术运算系统。对于想要学习网络...

    Android 基于Socket 的IPC通信

    以上就是基于Socket的Android IPC通信的基本原理和实现步骤。通过这种方式,不同Android进程可以相互通信,实现复杂的功能,例如文件传输、实时消息推送等。但需要注意的是,Socket通信可能会消耗较多的系统资源,...

    C#多线程与Socket聊天室的实现

    本文将深入探讨如何使用C#语言实现一个基于TCP/IP协议的多线程Socket聊天室。这个聊天室功能允许局域网内的多台计算机进行实时信息交互,虽然没有在广域网环境下测试,但在局域网内能有效工作。 首先,我们要理解...

    LabVIEW环境下远程资源共享及Dat Socket实现

    【LabVIEW环境下的远程资源共享与DataSocket实现】 虚拟仪器技术,以其灵活性和用户自定义性,正在逐步替代传统的硬件仪器。随着互联网的普及,虚拟仪器的网络化趋势日益凸显,LabVIEW作为虚拟仪器的重要开发平台,...

    应用SOCKET实现网络通信

    SOCKET编程中,创建SOCKET后需要绑定一个IP地址和端口号,端口号可以唯一标识网络中的一个进程,IP地址则用于标识不同计算机之间的通信。在基于Windows平台的SOCKET编程中,通常会使用Winsock(Windows Sockets)API...

    基于socket聊天工具

    总结来说,这个基于Socket的聊天工具项目涵盖了网络编程的核心概念,如TCP/IP协议、Socket接口、即时通讯实现、文件传输机制和屏幕截图共享。同时,由于提供了源代码,它对于开发者而言,是一个实践和学习网络编程的...

    基于socket并发网络通信

    在IT行业中,网络通信是计算机科学的一个重要领域,特别是在分布式系统和互联网应用中。Socket编程是一种基础且核心的技术,它提供了进程间通信(IPC)的能力,使得运行在不同机器上的程序能够相互通信。本篇将详细...

    Python Tornado实现WEB服务器Socket服务器共存并实现交互的方法

    要实现Web服务器和Socket服务器之间的通信,你可以创建一个共享的数据结构,如全局变量或队列,两者都可以访问。例如,Web服务器接收到一个HTTP请求,处理后将数据放入队列,Socket服务器定期检查队列并发送数据给...

    基于Socket的java网络编程

    Java网络编程的核心是基于Socket进行通信,Socket是TCP/IP协议栈的一种实现,它允许两个网络节点(通常是客户端和服务器)通过TCP或UDP进行数据交换。Socket接口为应用程序提供了低级别的网络通信控制,允许开发者...

Global site tag (gtag.js) - Google Analytics