- 浏览: 720005 次
- 性别:
- 来自: 大连
最新评论
-
lixuanbin:
iteye已经快要tj了吧。。
iteye为什么不支持markdown? -
haorengoodman:
Tachyon 能在做数据分类吗?例如我有一坨hdfs文件,将 ...
tachyon与hdfs,以及spark整合 -
lee3836:
求源码,大牛
clover分布式任务调度系统 -
cfan37:
...
sparksql与hive整合 -
greemranqq:
9.9 送上,希望博客长久~。~
【【【【【#####>>>>>【关于我】【您·的·支·持·是·我·最·大·的·动·力】<<<<<#####】】】】】
文章列表
、下载scala2.11.5版本,下载地址为:http://www.scala-lang.org/download/2.11.5.html
2、安装和配置scala:
第一步:上传scala安装包 并解压
第二步 配置SCALA_HOME环境变量到bash_profile
第三步 source 使配置环境变量生效:
第四步 验证scala:
由于个人需要在自己的笔记本上搭建hadoop伪分布环境,为了方便自己使用,如想看机器也看之前的一篇博客:hadoop2.6.0版本集群环境搭建
一台虚拟机,配置信息如下:
内存:1G,cpu:一个core,硬盘:15G
1、修改下主机名为master
sudo vi /etc/sysconfig/network
修改结果后:
重启电脑后再查看结果:
我们在hdfs的/data/join创建两个文件:
上传第一个文件名称为1.txt
内容第一列是日期,第二列uid(普通用户id)
上传第二个文件名称为2.txt
内容第一列是日期,第二列uid(普通用户id)
执行上传到hdfs:
hdfs命令行查询:
1.临时修改主机名
显示主机名:spark@master:~$ hostnamemaster修改主机名:spark@master:~$ sudo hostname hadoopspark@master:~$ hostnamehadoop
PS:以上的修改只是临时修改,重启后就恢复原样了。
2.永久修改主机名
redhat/centos上永久修改
[root@localhost ~]# cat /etc/sysconfig/networkNETWORKING=yesHOSTNAME=localhost.localdomainGATEWAY=19 ...
1、clover分布式调度介绍
clover分布式任务调度是完全使用Java技术自主开发
特点如下:
1、防单点故障
2、job可部署多台,但任务调度时,只有一台参执行。如果一台下线,
clover选择其他已在zookeeper注册job来执行。
3、可管理监控程序 ,相关负责人的job不可用会发送邮件通知
4、提供
本节中所用到的内容是来自搜狗实验室,网址为:http://www.sogou.com/labs/dl/q.html
我们使用的是迷你版本的tar.gz格式的文件,其大小为87K,下载后如下所示:
上传到服务器后,解压并查看:
查看Sogou文件内容:
该文件的格式如下所示:访问时间 \t 用户ID \t 查询词 \t 该URL在返回结果中的
.tar解包:tar xvf FileName.tar打包:tar cvf FileName.tar DirName(注:tar是打包,不是压缩!)———————————————.gz解压1:gunzip FileName.gz解压2:gzip -d FileName.gz压缩:gzip FileName.tar.gz 和 .tgz解压:tar zxvf FileName.tar.gz压缩:tar zcvf FileName.tar.gz DirName———————————————.bz2解压1:bzip2 -d FileName.bz2解压2:bunzip2 FileName.bz2压缩: ...
这次 我们以指定executor-memory参数的方式来启动spark-shell:
启动成功了
在命令行中我们指定了spark-shell运行暂用的每个机器上的executor的内存为1g大小,启动成功后参看web页面:
从hdfs上读取文件:
在命令行中返 ...
下面看下union的使用:
使用collect操作查看一下执行结果:
再看下groupByKey的使用:
执行结果:
join操作就是一个笛卡尔积操作的过程,如下示例:
对rdd3和rdd4执行join操作:
从前一篇文章中的wordcount的输出结果可以看出来结果是未经排序的,如何对spark的输出结果进行排序呢?
先对reduceByKey的结果进行key,value位置置换(数字,字符),然后再进行数字排序,再将key,value位置置换后就是排序后的结果了,最终将结果存储到HDFS中
可以发现我们成功对输出结果进行排序!
操作HDFS:先要保证HDFS启动了:
启动spark集群:
以spark-shell运行在spark集群上:
查看下之前上传到HDFS上的”LICENSE.txt“文件:
用spark读取这个文件:
使用count统计该文件的行数:
我们可以看到count 耗时为0.239708s
对该RDD进行cache操作并执行count使得缓存生效:
首先以spark的本地模式测试spark API,以local的方式运行spark-shell:
先从parallelize入手吧:
map操作后结果:
下面看下 filter操作:
filter执行结果:
我们用
问题1:reduce task数目不合适
解决方案:
需要根据实际情况调整默认配置,调整方式是修改参数spark.default.parallelism。通常的,reduce数目设置为core数目的2-3倍。数量太大,造成很多小任务,增加启动任务的开销;数目太小,任务运行缓慢。所以要合理修改reduce的task数目即spark.default.parallelism
问题2:shuffle磁盘IO时间长
解决方案:
设置spark.local.dir为多个磁盘,并设置磁盘的IO速度快的磁盘,通过增加IO来优化shuffle性能;
问题3:map|reduce数量大,造成shuff ...
进入Worker类源码:
可以看出Worker本身是Akka中的一个Actor。
进入Worker类的LaunchExecutor:
从源代码可以看出Worker节点上要分配CPU和Memory给新的Executor,首先需要创建一个ExecutorRunner:
ExecutorRunner是用于维护executor进程的:
1、进入ExecutorRunner 的start方法:
1.1、进入fetchAndRunExecutor()方法(核心方法):
注册Master有两种,一种是registerWithMaster方法,一种是tryRegisterAllMasters方法,前者是单Master的情况,后者是多Master,一般情况下是满足HA机制,我们看一下registerWithMaster方法:
此时会调用tryRegisterAllMasters方法:
此时通过Akka通过消息机制发送消息给Master来注册程序,RegisterApplication是一个case class,来封装消息:
我们进入Master的源代码: