- 浏览: 693819 次
- 性别:
- 来自: 长沙
文章分类
- 全部博客 (364)
- quick start (57)
- bboss aop (43)
- bboss mvc (48)
- bboss persistent (96)
- bboss taglib (30)
- bboss event (10)
- bbossgroups (52)
- bboss (32)
- bboss会话共享 (17)
- bboss rpc (7)
- bboss 国际化 (5)
- bboss 序列化 (9)
- bboss cxf webservice (8)
- bboss hessian (3)
- bboss 安全认证SSO (15)
- bboss 工作流 (6)
- 平台 (18)
- bboss quartz (3)
- 杂谈 (5)
- 大数据 (1)
- bboss elastic (24)
- bboss http (1)
- bboss kafka (1)
- Elasticsearch Scroll和Slice Scroll查询API使用案例 (1)
最新评论
-
qianhao123:
...
采用gradle构建和发布bboss方法介绍 -
qianhao123:
[img][/img]
采用gradle构建和发布bboss方法介绍 -
yin_bp:
欢迎大家参与working
高性能elasticsearch ORM开发库使用介绍 -
qq641879434:
万分感谢
bboss 持久层sql xml配置文件编写和加载方法介绍 -
yin_bp:
qq641879434 写道怎么设置配置文件 可以查看执行的S ...
bboss 持久层sql xml配置文件编写和加载方法介绍
bboss kafka组件使用介绍
本文使用的实例对应的gradle源码工程git访问地址:
http://git.oschina.net/bboss/bestpractice
testkafka子工程地址
http://git.oschina.net/bboss/bestpractice/tree/master/testkafka
bboss kafka组件作用
1.导入bboss kafka组件
maven坐标
gradle坐标
其中kafka2x可以是1.1.0,2.3.0.2.7.0,2.8.0等,具体根据kafka服务端版本来选择
2.使用kafka producer,发送消息
2.1 kafka producer配置
编写kafka.xml配置文件,放到classpath跟路径下面
相关配置说明:
bootstrap.servers kafka服务器地址配置
value.serializer kafka消息序列化插件配置
key.serializer kafka消息key序列化插件配置
f:sendDatatoKafka="true" 是否启动消息发送功能,false 禁用,true 启用
f:sendAsyn="true" 控制组件是否异步发送消息,默认为true
workerThreadSize 异步发送消息线程池,默认100
workerThreadQueueSize 异步发送消息队列,默认10240
2.2 发送kafka消息
发送kafka消息相关组件:
org.frameworkset.plugin.kafka.KafkaUtil
org.frameworkset.plugin.kafka.KafkaProductor
KafkaUtil组件加载配置文件并获取KafkaProductor ,通过KafkaProductor 发送kafka消息
异步方式发送消息
<property name="workerThreadSize" value="100"/>
<property name="workerThreadQueueSize" value="10240"/>
<property name="kafkaproductor"
class="org.frameworkset.plugin.kafka.KafkaProductor"
init-method="init"
f:sendDatatoKafka="true"
f:sendAsyn="true"
f:productorPropes="attr:productorPropes"/>
通过api控制是否异步发送消息:
//异步方式发送消息
productor.send("blackcat",3l,"aaa",true);
productor.send("blackcat",4l,"bbb",true);
//同步方式发送消息
productor.send("blackcat",5l,"aaa",false);
productor.send("blackcat",6l,"bbb",false);
3.接收和处理kafka消息
3.1 kafka consumer配置
新建kafkaconsumer.xml文件,放到classpath根路径下面
配置说明:
storeService 配置消息处理组件
zookeeper.connect 配置管理kafka服务器和消息的zookeeper集群地址
f:topic="blackcat" 消费的kafka topic
f:partitions="4" topic对应的分区数,决定并行处理消息的工作线程
f:batchsize="-1" 批处理消息条数,-1禁用批处理,>0时按照批处理方式按批次提交消息给storeservice组件
f:checkinterval="10000" 指定批处理消息接收最大等待时间,单位毫秒。按照批处理方式时,如果超过checkinterval指定的时间,到达的消息没有到达batchsize,则强制提交处理当前批次的数据到storeservice组件
3.2 接收和处理消息
接收和处理消息相关组件:
org.frameworkset.plugin.kafka.KafkaConsumer
org.frameworkset.plugin.kafka.StoreService
编写消息处理组件,处理组件需要实现接口
org.frameworkset.plugin.kafka.StoreService
//按条处理数据
public void store(MessageAndMetadata<byte[], byte[]> message) throws Exception ;
public void closeService();
//按批处理消息
public void store(List<MessageAndMetadata<byte[], byte[]>> messages) throws Exception
StoreServiceTest实现:
3.3 加载kafka consumer配置并启动消息接收线程
本文使用的实例对应的gradle源码工程git访问地址:
http://git.oschina.net/bboss/bestpractice
testkafka子工程地址
http://git.oschina.net/bboss/bestpractice/tree/master/testkafka
bboss kafka组件作用
- 快速配置kafka客户端和消费者
- 发送数据到kafka
- 从kafka接收和处理数据(支持批量消息处理和按条处理)
1.导入bboss kafka组件
maven坐标
<dependency> <groupId>com.bbossgroups.plugins</groupId> <artifactId>bboss-plugin-kafka2x</artifactId> <version>6.1.0</version> </dependency> 参考下面gradle补充kafka依赖包...
gradle坐标
api 'com.bbossgroups.plugins:bboss-plugin-kafka:6.1.0' api ( [group: 'org.apache.kafka', name: 'kafka_2.12', version: "${kafka2x}", transitive: true], ){ exclude group: 'log4j', module: 'log4j' exclude group: 'org.slf4j', module: 'slf4j-log4j12' } api ([group: 'org.apache.kafka', name: 'kafka-tools', version: "${kafka2x}", transitive: true],){ exclude group: 'log4j', module: 'log4j' exclude group: 'org.slf4j', module: 'slf4j-log4j12' exclude group: 'org.eclipse.jetty', module: 'jetty-server' exclude group: 'org.eclipse.jetty', module: 'jetty-servlets' exclude group: 'org.eclipse.jetty', module: 'jetty-servlet' exclude group: 'org.glassfish.jersey.containers', module: 'jersey-container-servlet' } api ([group: 'org.apache.kafka', name: 'kafka-clients', version: "${kafka2x}", transitive: true],){ exclude group: 'log4j', module: 'log4j' exclude group: 'org.slf4j', module: 'slf4j-log4j12' } api ([group: 'org.apache.kafka', name: 'kafka-streams', version: "${kafka2x}", transitive: true],){ exclude group: 'log4j', module: 'log4j' exclude group: 'org.slf4j', module: 'slf4j-log4j12' }
其中kafka2x可以是1.1.0,2.3.0.2.7.0,2.8.0等,具体根据kafka服务端版本来选择
2.使用kafka producer,发送消息
2.1 kafka producer配置
编写kafka.xml配置文件,放到classpath跟路径下面
<properties> <property name="productorPropes"> <propes> <property name="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"> <description> <![CDATA[ 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] ]]></description> </property> <property name="key.serializer" value="org.apache.kafka.common.serialization.LongSerializer"> <description> <![CDATA[ 指定序列化处理类,默认为kafka.serializer.DefaultEncoder,即byte[] ]]></description> </property> <property name="compression.type" value="gzip"> <description> <![CDATA[ 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定]]></description> </property> <property name="bootstrap.servers" value="hadoop85:9092,hadoop86:9092,hadoop88:9092"> <description> <![CDATA[ 指定kafka节点列表,用于获取metadata(元数据),不必全部指定]]></description> </property> </propes> </property> <property name="workerThreadSize" value="100"/> <property name="workerThreadQueueSize" value="10240"/> <property name="kafkaproductor" class="org.frameworkset.plugin.kafka.KafkaProductor" init-method="init" f:sendDatatoKafka="true" f:sendAsyn="true" f:productorPropes="attr:productorPropes"/> </properties>
相关配置说明:
bootstrap.servers kafka服务器地址配置
value.serializer kafka消息序列化插件配置
key.serializer kafka消息key序列化插件配置
f:sendDatatoKafka="true" 是否启动消息发送功能,false 禁用,true 启用
f:sendAsyn="true" 控制组件是否异步发送消息,默认为true
workerThreadSize 异步发送消息线程池,默认100
workerThreadQueueSize 异步发送消息队列,默认10240
2.2 发送kafka消息
发送kafka消息相关组件:
org.frameworkset.plugin.kafka.KafkaUtil
org.frameworkset.plugin.kafka.KafkaProductor
KafkaUtil组件加载配置文件并获取KafkaProductor ,通过KafkaProductor 发送kafka消息
KafkaProductor productor = KafkaUtil.getKafkaProductor("kafkaproductor"); productor.send("blackcat",//kafka topic 1l, //message key "aaa");//message productor.send("blackcat", //kafka topic "bbb"); //message
异步方式发送消息
<property name="workerThreadSize" value="100"/>
<property name="workerThreadQueueSize" value="10240"/>
<property name="kafkaproductor"
class="org.frameworkset.plugin.kafka.KafkaProductor"
init-method="init"
f:sendDatatoKafka="true"
f:sendAsyn="true"
f:productorPropes="attr:productorPropes"/>
通过api控制是否异步发送消息:
//异步方式发送消息
productor.send("blackcat",3l,"aaa",true);
productor.send("blackcat",4l,"bbb",true);
//同步方式发送消息
productor.send("blackcat",5l,"aaa",false);
productor.send("blackcat",6l,"bbb",false);
3.接收和处理kafka消息
3.1 kafka consumer配置
新建kafkaconsumer.xml文件,放到classpath根路径下面
<properties> <property name="consumerPropes"> <propes> <property name="group.id" value="test"> <description> <![CDATA[ 指定kafka group id]]></description> </property> <property name="zookeeper.session.timeout.ms" value="30000"> <description> <![CDATA[ 指定kafkazk会话超时时间]]></description> </property> <property name="auto.commit.interval.ms" value="3000"> <description> <![CDATA[ 指定kafka自动提交时间间隔]]></description> </property> <property name="auto.offset.reset" value="smallest"> <description> <![CDATA[ ]]></description> </property> <property name="zookeeper.connect" value="hadoop85:2181,hadoop86:2181,hadoop88:2181"> <description> <![CDATA[ 指定kafka节点列表,用于获取metadata(元数据),不必全部指定]]></description> </property> </propes> </property> <property name="kafkaconsumer" class="org.frameworkset.plugin.kafka.KafkaBatchConsumer" init-method="init" f:batchsize="-1" f:checkinterval="10000" f:productorPropes="attr:consumerPropes" f:topic="blackcat" f:storeService="attr:storeService" f:partitions="4" /> <property name="storeService" class="org.frameworkset.plugin.kafka.StoreServiceTest" /> </properties>
配置说明:
storeService 配置消息处理组件
zookeeper.connect 配置管理kafka服务器和消息的zookeeper集群地址
f:topic="blackcat" 消费的kafka topic
f:partitions="4" topic对应的分区数,决定并行处理消息的工作线程
f:batchsize="-1" 批处理消息条数,-1禁用批处理,>0时按照批处理方式按批次提交消息给storeservice组件
f:checkinterval="10000" 指定批处理消息接收最大等待时间,单位毫秒。按照批处理方式时,如果超过checkinterval指定的时间,到达的消息没有到达batchsize,则强制提交处理当前批次的数据到storeservice组件
3.2 接收和处理消息
接收和处理消息相关组件:
org.frameworkset.plugin.kafka.KafkaConsumer
org.frameworkset.plugin.kafka.StoreService
编写消息处理组件,处理组件需要实现接口
org.frameworkset.plugin.kafka.StoreService
//按条处理数据
public void store(MessageAndMetadata<byte[], byte[]> message) throws Exception ;
public void closeService();
//按批处理消息
public void store(List<MessageAndMetadata<byte[], byte[]>> messages) throws Exception
StoreServiceTest实现:
package org.frameworkset.plugin.kafka; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import kafka.message.MessageAndMetadata; public class StoreServiceTest extends BaseStoreService { StringDeserializer sd = new StringDeserializer(); LongDeserializer ld = new LongDeserializer(); @Override public void store(List<MessageAndMetadata<byte[], byte[]>> messages) throws Exception { for(MessageAndMetadata<byte[], byte[]> message:messages){ String data = sd.deserialize(null,message.message()); long key = ld.deserialize(null, message.key()); System.out.println("key="+key+",data="+data); } } @Override public void closeService() { sd.close(); ld.close(); } @Override public void store(MessageAndMetadata<byte[], byte[]> message) throws Exception { String data = sd.deserialize(null,message.message()); long key = ld.deserialize(null, message.key()); System.out.println("key="+key+",data="+data); } }
3.3 加载kafka consumer配置并启动消息接收线程
BaseApplicationContext context = DefaultApplicationContext.getApplicationContext("kafkaconfumer.xml"); KafkaListener consumer = context.getTBeanObject("kafkaconsumer", KafkaListener.class); Thread t = new Thread(consumer); t.start();
发表评论
-
bboss ioc快速入门教程
2017-08-13 11:28 1273bboss是一个非常不错的ioc框架,功能类似于spr ... -
bboss log4j滚动日志文件扩展插件使用介绍
2017-06-25 11:05 1438bboss扩展了log4j滚动切割文件插件org.apache ... -
bboss redis组件使用实例
2017-03-04 16:57 1183在工程中导入bboss redis组件 gradle comp ... -
bboss 与ecipse gradle buildship插件结合使用方法
2017-02-23 09:48 1193本文介绍bboss 与ecipse gradle builds ... -
如何快速高效地开发和调试基于gradle管理的web应用
2016-09-28 23:39 1471本文探讨如何高效快速地开发和调试基于gradle管理的web应 ... -
bboss wordpdf构建部署介绍
2016-09-02 15:47 568bboss wordpdf构建部署介绍 下载 源码下载地址: ... -
bboss gradle工程导入eclipse介绍
2016-07-24 14:01 2932bboss gradle工程导入eclipse介绍(本文适用于 ... -
采用gradle构建和发布bboss方法介绍
2016-05-01 23:23 4650采用gradle构建和发布bboss版本及从maven中央库下 ... -
bboss文件缓存组件FileContentCache介绍
2016-02-27 15:23 700bboss文件缓存组件FileContentCache介绍 ... -
bboss自动代码生成工具使用指南
2015-11-15 21:09 10136本文介绍bboss自动代码生成工具使用方法 工具在线试用: ... -
bboss应用程序运行容器使用介绍
2015-06-22 16:15 1935bboss微服务运行容器使 ... -
bboss框架配置监控介绍
2015-01-12 14:26 1668bboss框架配置监控介绍 ... -
bboss开发、模块工程目录结构及功能说明
2014-10-15 19:46 4859基于bboss开发项目说明 ... -
bboss最佳实践gradle工程清单及其作用介绍
2014-09-27 09:13 2269基于bboss开发项目说明 ... -
bboss版activiti 5.12扩展动态
2014-09-20 18:19 5260继扩展Activiti-5.12轻松实 ... -
bboss跨站攻击白名单和脚本攻击防火墙配置
2014-08-01 09:56 1105本文详细介绍bboss跨站攻击白名单和跨站脚本攻击防火墙配置 ... -
bboss会话共享demo使用指南
2014-07-01 17:57 5559为了方便应用系统集成b ... -
bboss序列化cglib代理对象方法介绍
2014-06-22 15:14 1153本文介绍bboss序列化cglib代理对象方法。经过cglib ... -
bboss session共享架构及特点
2014-06-14 12:33 6412bboss会话共享架构 bboss session共 ... -
bboss自定义类对象序列化机制介绍
2014-05-26 23:32 1468bboss自定义类对象序列 ...
相关推荐
本文将详细介绍如何在BBoss的IOC配置文件中引用外部属性文件,以便更好地管理和动态配置应用。 首先,BBoss的IOC配置文件通常是一个XML文件,比如`bboss-ioc.xml`,在这个文件中我们可以声明并配置各种bean。当需要...
在“bboss mvc开发手册.doc”中,我们可能会找到关于如何使用BBoss MVCDemo进行开发的详细指南。 首先,让我们深入了解BBoss MVCDemo的核心特性: 1. **模型(Model)**:BBoss MVCDemo支持多种数据访问方式,如JDBC...
BBoss安全认证过滤器是Java Web开发中一种用于实现用户身份验证和权限控制的重要组件。在Web应用程序中,过滤器(Filter)是Servlet规范的一部分,它允许开发者在请求到达目标Servlet或JSP之前进行预处理,以及在...
1. **组件化开发**:BBoss支持模块化和组件化的开发模式,允许开发者根据项目需求选择必要的功能组件,避免了传统框架中“大而全”的问题,降低了系统的复杂度。 2. **数据库操作**:BBoss提供了便捷的数据访问层...
例如,在一个域下的多个子应用之间,使用bboss可以共享认证信息,用户登录其中一个应用后,无需再次登录即可访问其他应用。 文档中提到了会话共享需要解决的五大技术难题: 1. session数据序列化问题,即如何有效...
本实例是一个基于bboss es spring boot starter的demo maven工程,可供spring boot项目集成bboss elasticsearch rest client参考 展示了通过spring boot管理单集群功能和管理多集群功能 单集群测试用例:...
http负载均衡组件-HttpRequestProxy 使用参考文档 负载均衡组件特点: 1.服务负载均衡(目前提供RoundRobin负载算法) 2.服务健康检查 3.服务容灾故障恢复 4.服务自动发现(zk,etcd,consul,eureka,db,其他第三...
2. **BBoss核心组件讲解**:详细解析BBoss的各个模块,如DAO、Service、权限控制等的使用方法。 3. **EasyUI组件使用**:讲解如何在页面中引入EasyUI,以及各种组件的属性设置和事件绑定。 4. **实战示例**:提供...
Elasticsearch rest client bboss介绍-Elastic2018中国开发者大会演讲稿
bboss-datatran 支持多种数据源的接入,如关系型数据库(MySQL、Oracle等)、NoSQL数据库(HBase、MongoDB等)、消息队列(Kafka、RabbitMQ等)以及文件系统(HDFS、FTP等)。通过内置的适配器,用户可以方便地将...
3. `bboss-util-5.5.0.jar`和`bboss-persistent-5.5.0.jar`:这是BBoss框架的核心组件,提供了许多实用工具类和持久化操作支持,如数据库连接、事务管理等,为Elasticsearch的数据导入提供底层支持。 4. `...
**二、BBoss介绍** BBoss 是一个针对 Elasticsearch 的企业级开发框架,它提供了丰富的 Java API,使得开发者可以更简单地进行索引管理、查询构建、结果分析等操作。BBoss 还支持多版本兼容,方便应对 Elasticsearch...
在`demo`这个文件中,可能包含了使用bboss操作Elasticsearch的基本示例代码。例如,创建索引、插入文档、执行查询等。通常,这些示例会展示如何初始化bboss客户端,如何构造和执行DSL查询,以及如何处理返回结果。 ...
springboot集成ElasticsearchBboss调用Elasticsearch的案例分享
bboss,全称为Business Basic Open Source,是一个开源的企业级开发框架,它为构建企业级应用提供了丰富的功能和组件。 首先,我们来详细探讨环境搭建的过程。bboss新版平台的开发环境搭建通常包括以下几个关键步骤...
压缩包中的`db-elasticsearch-tool`可能包含了框架的核心库、示例代码、文档和配置文件,帮助用户快速理解和使用这个框架。通过阅读源码、查看示例和文档,开发人员可以深入理解BBoss-DB-Elasticsearch-Tool的工作...
本培训文档将介绍集群session管理的发展历史、bbosssession的架构及特点、性能指标、与应用的集成方式、部署模式以及场景演示等,旨在帮助技术人员理解并掌握bbosssession的使用方法和优势。 一、集群session管理...
BBoss Persistent是一个轻量级的持久层框架,它提供了方便的方式来处理这些大字段。本文将深入探讨在BBoss Persistent 1.0.2中如何高效地管理CLOB和BLOB数据。 首先,CLOB用于存储大量的文本数据,如长篇文章或XML...