`
kane_xie
  • 浏览: 145011 次
社区版块
存档分类
最新评论
文章列表
5月23日Confluent官方宣布Apache Kafka 0.10正式发布。该版本包含了很多新功能和优化,这里列出比较重要的几项:   Streams 如果你有这样的需求,从Kafka拉取数据进行流处理然后再推送回Kafka,那么你会喜欢0.10的Kafka Streams。Kafka Streams是一个类库,它实现了一系列流处理动作(例如join,filter,aggregate等),能够帮助你构建一个功能齐全的低延迟的流处理系统。它支持有状态或无状态的处理,并且能够被部署在各种框架和容器中(例如YARN,Mesos,Docker),也可以集成在Java应用里。   机 ...
最近准备使用Kafka Mirrormaker做两个数据中心的数据同步,以下是一些要点:   mirrormaker必须提供一个或多个consumer配置,一个producer配置,一个whitelist或一个blacklist(支持java正则表达式) 启动多个mirrormaker进程,单个进程启动多个consuemr streams, 可以提高吞吐量和提供容 mirrormaker部署在destination datacenter,这样如果kafka集群之间发生网络问题,也不至于从src cluster拿到了数据但发不到dest cluster导致数据丢失 mirrormak ...
  Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); p ...
先看一个简单的KafkaConsumer例子:   Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms&quo ...
Kafka0.9发布了新版consumer client。它与旧版本最大的区别是移除了基于zookeeper的high-level consumer和low-level SimpleConsumer,而代之于一个统一的consumer API,它集成了之前high-level consumer的group管理功能和low-level consumer的offset控制功能。   新的consumer实现了一套新的group管理机制,它使得consumer clients变得更简洁(真的比以前简洁很多。。。),并且获得更快的rebalancing。同时这个版本也完全解除了consumer c ...
Kafka Connect是Kafka0.9新增的模块。可以从名称看出,它可以和外部系统、数据集建立一个数据流的连接,实现数据的输入、输出。有以下特性:   使用了一个通用的框架,可以在这个框架上非常方面的开发、管理Kafka Connect接 ...
原生ES,只需要在启动时加上'-Xmx1g -Xms1g' Elasticsearch Docker,参数为 '-eES_MIN_MEM=1g-eES_MAX_MEM=1g' 注意: 1. mx和ms最好设为一样,避免GC 2. 为了保证最大效率,ES内存设为预留内存的一半,另外一半留给Lucene。
项目中碰到一个elasticsearch的purge需求,就是说在不删除index和type的前提下,清除其中的所有数据。   用es的delete by query api可以做到,尽管官方声明deprecated in 1.5.3,但是经过测试1.8还是可以用的(真不敢想象要是不能用了怎么办。。。连个purge的api都没有。。),语法如下:   DELETE /<index>/<type>/_query -d '{     "query" : {          "match_all" : {}     } ...
  最近在做一个spark项目,顺便分享一下我的Scala入门过程。这一系列文章假定读者有一定的java或者其他面向对象编程语言基础。本文主要简单介绍文件的操作。   按惯例先上代码 val file = Source.fromFile("/Users/xiejing/Desktop/javascript") for (line <- file.getLines()) { println(line) } file.close() val webFile = Source.fromURL("https://www.baidu.com/ ...
Elasticsearch Bulk API允许批量提交index和delete请求。   BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("index1", "type1", "id1").setSource(source); bulkRequest.add(client.prepareIndex("index2", "type2", "id2 ...
最近尝试用Proguard来混淆代码,以增加发布出去的代码安全性。今天运行混淆后的jar包发生如下异常 Caused by: org.springframework.beans.factory.BeanCreationException: Could not autowire field: public abc.service.Service test.rest.controller.Controller.service; neste ...
  最近在做一个spark项目,顺便分享一下我的Scala入门过程。这一系列文章假定读者有一定的java或者其他面向对象编程语言基础。本文主要简单介绍Scala的集合类型,包括Array,Tuple和Map。   照例先上一段代码 注:ITEYE不支持Sc ...
  最近在做一个spark项目,顺便分享一下我的Scala入门过程。这一系列文章假定读者有一定的java或者其他面向对象编程语言基础。本文主要介绍Scala的流程控制和异常处理。   先上一段代码 注:ITEYE不支持Scala语法高亮,所以为了看起来方便,我选择作为Java来展示,但这是Scala代码,这点不用怀疑。。   object TestScala2 { def main(args: Array[String]): Unit = { //test 'for' for (i <- 1 to 10) println(i) //test ...
最近在做一个spark项目,顺便分享一下我的scala入门过程。这一系列文章假定读者有一定的java或者其他面向对象编程语言基础。本文主要介绍scala的函数定义。   国际惯例,先来一段HelloWorld object TestScala1 { def main(args: Array[String]): Unit = { val s = "Hello World!"; show(s); } def show(s: String): Unit = { println(s) } }     函数的定义需 ...
在本地提交一个spark job,出现如下错误   WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory   在网上搜了一下,出现这种错误一般有两种原因, 1.内存不足。 2.主机名和IP配置不正确。   检查了一下,我只是跑一个简单的测试程序,内存是完全够用的,我也在spark web console上确认了内存是足够的。 ...
Global site tag (gtag.js) - Google Analytics