using System;
using System.Runtime.CompilerServices;
using System.Threading;
namespace Common.Engine
{
/// <summary>
/// 限流器
/// 改写自:hadoop\src\hdfs\org\apache\hadoop\hdfs\server\datanode\BlockTransferThrottler.java
/// </summary>
public class Throttler
{
private readonly int _period; // period over which bw is imposed
private readonly int _periodExtension; // Max period over which bw accumulates.
private int _bytesPerPeriod; // total number of bytes can be sent in each period
private int _curPeriodStart; // current period starting time
private int _curReserve; // remaining bytes can be sent in the period
private int _bytesAlreadyUsed;
/** Constructor
* @param bandwidthPerSec bandwidth allowed in bytes per second.
*/
public Throttler(int bandwidthPerSec): this(500, bandwidthPerSec)
{ // by default throttling period is 500ms
}
/**
* Constructor
* @param period in milliseconds. Bandwidth is enforced over this
* period.
* @param bandwidthPerSec bandwidth allowed in bytes per second.
*/
public Throttler(int period, int bandwidthPerSec)
{
this._curPeriodStart = CurrentTimeMillis();
this._period = period;
this._curReserve = this._bytesPerPeriod = bandwidthPerSec * period / 1000;
this._periodExtension = period * 3;
}
/// <summary>
/// 当前的时间戳
/// </summary>
/// <returns></returns>
[MethodImpl(MethodImplOptions.Synchronized)]
public int CurrentTimeMillis()
{
return Environment.TickCount;
}
/**
* @return current throttle bandwidth in bytes per second.
*/
[MethodImpl(MethodImplOptions.Synchronized)]
public long GetBandwidth()
{
return _bytesPerPeriod * 1000 / _period;
}
/**
* Sets throttle bandwidth. This takes affect latest by the end of current
* period.
*
* @param bytesPerSecond
*/
[MethodImpl(MethodImplOptions.Synchronized)]
public void SetBandwidth(int bytesPerSecond)
{
if (bytesPerSecond <= 0)
{
throw new Exception("" + bytesPerSecond);
}
_bytesPerPeriod = bytesPerSecond * _period / 1000;
}
/** Given the numOfBytes sent/received since last time throttle was called,
* make the current thread sleep if I/O rate is too fast
* compared to the given bandwidth.
*
* @param numOfBytes
* number of bytes sent/received since last time throttle was called
*/
[MethodImpl(MethodImplOptions.Synchronized)]
public void Throttle(int numOfBytes)
{
if (numOfBytes <= 0)
{
return;
}
_curReserve -= numOfBytes;
_bytesAlreadyUsed += numOfBytes;
while (_curReserve <= 0)
{
int now = CurrentTimeMillis();
int curPeriodEnd = _curPeriodStart + _period;
if (now < curPeriodEnd)
{
// Wait for next period so that curReserve can be increased.
try
{
Thread.Sleep(curPeriodEnd - now);
}
catch (Exception) { }
}
else if (now < (_curPeriodStart + _periodExtension))
{
_curPeriodStart = curPeriodEnd;
_curReserve += _bytesPerPeriod;
}
else
{
// discard the prev period. Throttler might not have
// been used for a long time.
_curPeriodStart = now;
_curReserve = _bytesPerPeriod - _bytesAlreadyUsed;
}
}
_bytesAlreadyUsed -= numOfBytes;
}
}
}
分享到:
相关推荐
其中,DataNode 是 HDFS 中负责存储数据块的节点,而 DataXceiver 是 DataNode 上负责处理数据流的组件。本文将对 HDFS 数据流进行深入分析,探讨 DataXceiverServer 和 DataXceiver 的实现机制。 ...
在动手实验“Oracle HDFS直接连接器”中,我们将会了解如何实现Oracle数据库与Apache Hadoop HDFS(Hadoop分布式文件系统)之间的直接连接,从而能够直接访问存储在HDFS中的数据文件。该实验的软件环境主要由Oracle ...
HDFS中对象序列化是指将Java对象转换为字节流的过程,以便于在分布式环境中传输和存储对象。HDFS使用Java的序列化机制来实现对象序列化,包括:Writable、WritableComparable和Serialization等接口。 在HDFS中,...
(1)向HDFS中上传任意文本文件,如果指定的文件在HDFS中已经存在,则由用户来指定是追加到原有文件末尾还是覆盖原有的文件 (2)从HDFS中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件...
在HDFS中,基本命令是最基础也是最常用的命令,掌握这些命令是使用HDFS的基础。本节我们将详细介绍HDFS中的基本命令。 -help命令 HDFS中的-help命令用于显示HDFS的帮助信息。该命令可以显示所有可用的命令和参数。...
在Hadoop生态系统中,HDFS(Hadoop Distributed File System)是核心组件之一,它提供了可靠的、可伸缩的分布式存储。在与HDFS交互时,无论是上传文件、创建文件夹,还是从HDFS中下载数据,都需要依赖特定的Java库...
在本文中,我们将探讨一个具体的NIFI应用场景,即如何将从MySQL数据库中查询得到的JSON数据转换成TXT格式,并存储到HDFS(Hadoop分布式文件系统)中。这个场景在大数据处理和分析中非常常见,因为MySQL是常用的关系...
HDFS 中 NameNode 节点的配置、备份和恢复 HDFS(Hadoop Distributed File System)是 Hadoop 生态系统中的分布式文件系统,它提供了高效、可靠、可扩展的文件存储解决方案。 NameNode 是 HDFS 集群中的中心服务器...
hdfs文件的查看 hdfs fs -cat /文件名
HDFS是Hadoop分布式计算的存储基础。HDFS具有高容错性,可以部署在通用硬件设备上,适合数据密集型应用,并且提供对数据读写的高吞 吐量。HDFS能 够提供对数据的可扩展访问,通过简单地往集群里添加节点就可以解决...
- **文件和目录操作**:HDFS Explorer提供类似Windows资源管理器的界面,可以进行文件的上传、下载、删除、重命名、创建目录等基本操作。 - **文件预览**:用户可以直接在浏览器中查看文本文件内容,无需下载到本地...
分布式存储系统:HDFS:HDFS数据流读取流程.docx
在Hadoop框架中,HDFS(Hadoop Distributed File System)是一个关键组件,它提供了一个可靠的、可扩展的分布式文件系统,用于存储大量数据。对于HDFS的管理与操作,shell命令是一个常用且实用的工具,它允许用户在...
自己实现的一个简单的HDFS查看器源码。 运行方式:将2个class文件拷贝到namenode的hadoop bin目录下,然后运行 JAVA Main 就可以打开该查看器。 注意事项: hadoop的dfs服务必须启动 必须放在hadoop的bin目录下运行...
Hadoop 分布式文件系统 (HDFS)是一个设计为用在普通硬件设备上的分布式文件系统。它与现有的分布式文件系统有很多近似的地方,但又和这些文件系统有很明显的不同。HDFS是高容错的,设计为部署在廉价硬件上的。HDFS对...
在这个“HDFS实例基本操作”中,我们将深入探讨如何在已经安装好的HDFS环境中执行基本操作,包括文件上传、下载以及创建文件夹。 一、HDFS的基本架构 HDFS基于主从结构,主要由NameNode和DataNode组成。NameNode...
HDFS(Hadoop Distributed File System)是 Hadoop 项目中的一部分,是一个分布式文件系统。HDFS Java API 是一组 Java 类库,提供了一组接口来操作 HDFS。下面我们将对 HDFS Java API 进行详细的介绍。 HDFS Java ...
分布式存储系统:HDFS:HDFS数据流写入流程技术教程.docx
实验二:“熟悉常用的HDFS操作”旨在帮助学习者深入理解Hadoop分布式文件系统(HDFS)在大数据处理中的核心地位,以及如何通过Shell命令和Java API进行高效操作。HDFS在Hadoop架构中扮演着存储大数据的核心角色,为...