- 浏览: 70191 次
- 性别:
- 来自: 北京
最新评论
-
onlinetomcat:
创建java工程这个jar可以和冲突的jar使用吗
elasticsearch与spark,hbase等jar包冲突导致报错问题 -
字母哥:
hae 写道你的输入文件是从哪里来的,格式是什么样的。已经上传 ...
hadoop处理手机流量小例子 -
字母哥:
lvwenwen 写道文件格式是什么样。已经上传
hadoop处理手机流量小例子 -
lvwenwen:
文件格式是什么样。
hadoop处理手机流量小例子 -
hae:
你的输入文件是从哪里来的,格式是什么样的。
hadoop处理手机流量小例子
文章列表
//設置sparkconf參數
val sparkConf = new SparkConf() //.setAppName("DirectKafka").set("spark.task.maxFailures", "1")
sparkConf.set("spark.rpc.askTimeout", "1200") //设置20分钟
//spark.network.timeout
sparkConf.set("spark.network.timeout&qu ...
这里说一下spark源码的编译,可以修改一些源码,进行编译,这里我们修改一下spark-shell启动时输出消息的代码,这地方不用多说,使用 idea导入spark官网 下载的spark1.6源码,然后修改,回到spark源码解压目录,这里首先配置maven,jdk等环境变量。./build/mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
先执行上面的命令,会下载各种依赖jar和pom什么的,此过程比较长,大约1小时左右。成功后,再执行下面的命令./make-distribution.sh --na ...
在scala写spark程序的时候,为了方便控制,会调用java的api进行操作elasticsearch,这时候,需要一个map,接收具体的字段,这地方有会一个问题,就是接受的这个map必须是AnyRef类型的,比如我们需要存入一个long类型的值,比如时间是个long,那么直接写入就会有问题,因为scala里面AnyRef不包含数值类型,而是属于AnyVal类型。遇到这种情况,我们使用下面的方式,进行强制转换就能解决这个问题
val map=scala.collection.mutable.Map[String,AnyRef]()
map.put("age" ...
linux解决软件托盘不显示问题
- 博客分类:
- linux
ubuntu利用了crossover安装了qq,需要解决乱码问题
把win7中的宋体文件拷贝到/opt/cxoffice/support/apps.com.qq.im/drive_c/windows/Fonts里面去,重启,解决了乱码问题。
还有一个问题就是最小化后托盘不显示,找不到qq了,解决办法是sudo add-apt-repository ppa:timekiller/unity-systrayfix
sudo apt-get update
sudo apt-get upgrade然后重启系统,再次启动qq就可以最小化了
基于standalone模式
这里,我们主要关注最主要的2个地方的初始化,首先是TaskScheduler的创建初始化。// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean] ...
首先定义一个类,继承Actorclass akka001 extends Actor{
override def preStart() ={
println("this is preStart()")
}
def receive = LoggingReceive{
case "hello" => println("hello world")
}
override def postStop()={
println("this is postStop()&q ...
spark版本里面用到的就是akka通信,2.0版本 已经不再使用akka了。这里写了一个简单的akka程序。
首先定义2个消息类object MyRequest {
var message:String=null
}
object MyResponse {
var message:String=null
}
定义2个通信的actorclass Student(teacherRef:ActorRef) extends Actor{
val log=Logging(context.system,this)
def receive={
case MyReques ...
spark重要的几个算子
- 博客分类:
- spark
spark中有几个算子比较重要,开发中不是很常用,但很多算子的底层都是依靠这几个算子实现的,比如CombineByKey,像reduceByKey底层是combineByKey实现的。
首先介绍combineByKey
这个算子 主要需要三个参数,第一个是对每个分区中每个key的第一个值 进行初始化,也就是每个分区内,有多少个key就会执行多少次这个初始化object CombineByKeyTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster(&qu ...
在原有的spark程序中,其中包含hhase,spark等,会出现jar包冲突导致有异常的问题,程序中都是使用maven。
在异常中,包含nosuchmethod的异常错误信息,可以看出是guava版本与hadoop版本不一致导致,Hadoop中使用的是12版本,而es2.3.1默认使用18版本。这样才程序执行的时候使用的是12版本,会导致类中的方法找不到的异常。
解决办法是,es的依赖不使用maven,把es以及依赖搞成一个单独的jar,给程序调用。首先,创建一个单独的maven项目,xml配置文件如下:
<project xmlns="http://maven.apache ...
hadoop1版本中提供了获取文件名的功能,就是在map阶段可以获取每一行记录属于哪个文件,可以得到这个文件名,代码如下://获取文件名
InputSplit inputSplit=(InputSplit)context.getInputSplit();
String filename=((FileSplit)inputSplit).getPath().getName();
这是hadoop1版本提供的方法。
对于spark也可以实现这个功能,使用的方式是本地测试的代码,spark在本地执行的,代码如下:object Mytest3 {
def main(args: Array[St ...
时间同步这个需求在很多地方都有。比如安装cm和cdh的话,需要ntp时间同步,否则会出现红色警告
这里主要是设置一台服务器作为主服务器,让其他机器同步这台机器的时间,而且是配置的本地时间,没有同步internet时间,因为很多时候服务器不能联网
首先我们这里设置2台机器,主机器为192.168.5.102,另外一台为192.168.5.103
首先设置主机器
编辑/etc/ntp.conf文件
在里面加上restrict 127.0.0.1 # 开启内部递归网络接口 lo
restrict 192.168.5.0 mask 255.255.255.0 nomodify #在内部子网里面的 ...
主要是数据从flume进去kafka,然后交给sparkstreaming处理的流程
本文依旧以单词计数例子为例
首先,flume使用1.6版本,如果是1.6以下的话,没带有官方的kafkasink,需要自己实现,自己实现也不难实现,写一个自定义的sink,在里面方法调用kafka生产者代码,把数据发送到指定的kafka的broker的topic即可。
此处使用1.6版本,直接使用kafkaSink即可
agent4.channels.ch1.type = memory
agent4.sources.avro-source1.channels = ch1
agent4.sources. ...
这里写一个flume整合sparkstreaming的例子
我这里使用scala ide和maven的方式
spark用的1.2版本,scala是2.10版本,flume使用最新版的1.6
整合的第一步,要加上flume的maven依赖 <dependency>
<groupId>org.apache.spark</groupId>
<artifac ...
搭建一个flume集群,设置2个节点,如下图这样的集群
2个节点分别为192.168.5.220和192.168.5.221
设置成这样的情景,第一个接受日志传过来的数据(这里使用配置log4j往里面写数据),
第一个节点的sink指向第二个节点的source,第二个节点sink配制成hdfs
首先,配置第一个节点,
这里面source的port设置成41414,log4j输出的port也必须配制成41414,才能把数据传到flume
sink配置的port必须和第二个节点source配置的一样才行
配置第二个节点
source的bind配置本机ip,端口配置和第一个节点的sin ...
hadoop求最大值问题,代码比求最值前N个要简单一些,因为直接使用LongWritable类型,不需要自定义hadoop对象进行比较,所以直接覆盖map和reduce方法,并且覆盖cleanup方法,这是在map和reduce都执行完成之后才会执行的方法,只需要把最大值写入即可
public class MySuper {
public static void main(String[] args) throws Exception {
final String INPUT_PATHs = "hdfs://chaoren:9000/seq100w.txt";
...