`
xiangdong_li
  • 浏览: 7436 次
  • 性别: Icon_minigender_2
  • 来自: 南京
社区版块
存档分类
最新评论

spring cloud stream kafka

阅读更多

pom.xml

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<spring-cloud.version>Dalston.SR1</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

yml文件

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        issueInPutMessage:
          destination: devops-cca
          content-type: application/json
          group: default
        issueOutPutMessage:
          destination: devops-cca
          content-type: application/json
          producer:
            partitionKeyExpression: payload.id
            partitionCount: 2
      kafka:
        binder:
          brokers: 10.40.64.53
          zkNodes: 10.40.64.53
          autoAddPartitions: true
server:
  port: 9050

SenderApplication.java

@SpringBootApplication
public class SenderApplication {
	public static void main(String[] args) {
		SpringApplication.run(SenderApplication.class, args);
	}
}

  MainController.java

@RestController
@EnableBinding({IssueInputMessage.class, IssueOutputMessage.class})
public class MainController {

    @Autowired
    IssueOutputMessage issueOutputMessage;

    @Value("${spring.cloud.stream.bindings.issueOutPutMessage.producer.partitionCount}")
    private String partitionCount;

    private void sendMore() {
        for (int _index = 0; _index < 10; _index++) {
            int index = Integer.parseInt(String.valueOf(Math.round(Math.random() * Math.pow(10, partitionCount.length())) / Integer.parseInt(partitionCount)));
            IssueMessage chatMessage = new IssueMessage();
            chatMessage.setMessage(String.format("message %s", _index));
            chatMessage.setId(index);
            issueOutputMessage.issueOutPutMessage().send(MessageBuilder.withPayload(chatMessage).build());
        }
    }

    private void sendOnlyOne() {
        int index = Integer.parseInt(String.valueOf(Math.round(Math.random() * Math.pow(10, partitionCount.length())) / Integer.parseInt(partitionCount)));
        IssueMessage chatMessage = new IssueMessage();
        chatMessage.setMessage(String.format("message %s", Math.round(Math.random() * 10)));
        chatMessage.setId(index);
        issueOutputMessage.issueOutPutMessage().send(MessageBuilder.withPayload(chatMessage).build());
    }

    @RequestMapping("/")
    public String index() {
        sendOnlyOne();
        return "ok";
    }


    @StreamListener(IssueInputMessage.ISSUE_INPUT_MESSAGE)
    public void analyzingCancel(Message<IssueMessage> message) {
        IssueMessage chatMessage = message.getPayload();
        System.out.println(chatMessage.getMessage());
    }

}

 

public interface IssueInputMessage {

    String ISSUE_INPUT_MESSAGE = "issueInPutMessage";

    @Input
    SubscribableChannel issueInPutMessage();

}

 

public class IssueMessage<T> {

    private int id;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    private T message;

    public T getMessage() {
        return message;
    }

    public void setMessage(T message) {
        this.message = message;
    }
}

 

public interface IssueOutputMessage {

    @Output
    MessageChannel issueOutPutMessage();

}

 

 

分享到:
评论

相关推荐

    spring cloud stream kafka 消息驱动集成

    Spring Cloud Stream 和 Apache Kafka 的集成是构建分布式系统中消息驱动架构的一个重要实践。Spring Cloud Stream 是 Spring 生态系统中的一个模块,它为构建输入/输出绑定到消息中间件的微服务应用提供了抽象层,...

    spring-cloud-stream结合kafka dome

    标题 "spring-cloud-stream结合kafka dome" 描述的是一个关于如何使用Spring Cloud Stream与Apache Kafka进行集成并创建示例应用的教程。在这个场景中,我们首先启动两个消费者(标记为kafka和kafka1),然后启动名...

    spring-cloud-stream-kafka:Spring Cloud Streams Kafka Avro

    《Spring Cloud Stream Kafka Avro深度解析》 在现代微服务架构中,数据交换与处理扮演着至关重要的角色。Spring Cloud Stream作为一个强大的框架,为构建消息驱动的应用提供了便利,而Kafka作为分布式流处理平台,...

    kafka-spring-cloud-stream:Apache Kafka的Spring Cloud Stream展示

    在"Kafka-Spring Cloud Stream"项目中,我们将探讨如何利用Spring Cloud Stream来简化与Kafka的交互。 1. **Spring Cloud Stream介绍** Spring Cloud Stream提供了一个编程模型,允许开发者定义输入和输出绑定,将...

    spring-cloud-stream结合kafka使用详解

    Spring Cloud Stream 结合 Kafka 使用详解 Spring Cloud Stream 是一个基于 Spring Boot 的消息驱动微服务框架,它提供了统一的消息处理模型,可以轻松地与各种消息中间件集成。在本文中,我们将详细介绍 Spring ...

    Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程

    Spring Cloud系列教程 Spring Boot Spring Cloud Stream 和 Kafka案例教程 SpringCloud系列教程、SpringBoot、 Stream、Kafka、案例教程

    spring-cloud-stream-app-descriptor-Celsius.SR3

    spring-cloud-stream-app-descriptor-Celsius.SR3.stream-apps-kafka-10-docker

    Spring Cloud Stream 体系及原理.zip

    Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它为开发者提供了在Spring Boot应用中构建消息处理管道的能力。这个框架充分利用了Java的强大力量,使得开发人员能够轻松地处理分布式系统中的输入和输出...

    springcloud-stream-demo-master.zip

    2. 配置消息代理:SpringCloud Stream默认支持RabbitMQ和Kafka,但也可以通过配置支持其他消息中间件。配置信息通常在`application.yml`或`application.properties`中定义。 3. 编写消息处理器:SpringCloud Stream...

    SpringCloud项目实战各组件源代码案例

    包含内容: Spring Cloud系列教程 Spring Boot Spring Cloud Stream...springcloud-config-oracle-bus-kafka.zipspringcloud-feign.zip springcloud-producer.zip springcloud-producer-consumer.zip springcloudstudy.

    SpringCloud与Kafka消息中间件集成教程

    ### SpringCloud与Kafka消息中间件集成教程 #### 一、SpringCloud概述 Spring Cloud 是一个基于Spring Boot的框架,旨在提供一系列开箱即用的工具和服务,帮助开发者轻松构建和部署微服务架构的应用程序。它集合了...

    springCloud集成kafak

    接下来,让我们看看如何在Spring Boot应用中配置Spring Cloud Stream以支持Kafka。首先,在`pom.xml`中添加Spring Cloud Stream和Kafka的依赖: ```xml &lt;groupId&gt;org.springframework.cloud &lt;artifactId&gt;spring-...

    基于Spring Cloud Stream和Apache Kafka的云端消息系统.zip

    这是一个基于Spring Cloud Stream和Apache Kafka的云端应用,作为一个消息系统,用于实现云端的实时消息传输和处理。项目利用Spring Cloud Stream进行微服务间的消息通信,并结合Apache Kafka实现高吞吐量、高可靠性...

    ddd-springcloud-stream.zip

    在描述中没有提供具体信息,因此我们将基于标签 "springcloudstream" 进行详细的知识点讲解。 **Spring Cloud Stream** Spring Cloud Stream 是一个轻量级的框架,它允许开发者轻松地创建消息处理应用程序。它提供...

    spring cloud finchely eureka ,message服务

    spring cloud finchely euraka discovery ,message服务,注册到注册中心,集成spring cloud stream kafka,提供可供其他服务访问的接口,并且划分为不同的模块,欢迎下载学习和交流。

    Spring boot,springCloud精选视频教程

    .使用Spring Cloud搭建服务注册中心 2.使用Spring Cloud搭建高可用服务注册中心 ...28.Spring Cloud Bus整合Kafka 29.Spring Cloud Stream初窥 30.Spring Cloud Stream使用细节 31.Spring Cloud系列勘误

    SpringCloud微服务架构笔记(四

    Spring Cloud Stream支持多种Binder实现,包括RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs等。 3. 发布/订阅模型 Spring Cloud Stream采用发布/订阅模式进行消息...

    Spring Cloud 中文文档.pdf

    - **Apache Kafka Binder**:Apache Kafka 是一种流行的分布式流处理平台,Spring Cloud Stream 提供了 Kafka Binder 的实现。 - **Apache Kafka Binder 概述**:Kafka Binder 的实现基于 Apache Kafka。 - **配置...

    SpringCloud.pdf

    6. **分布式消息传递**:Spring Cloud Stream 支持集成消息中间件(如 RabbitMQ、Kafka),实现服务间的异步通信和解耦。 云原生应用程序是Spring Cloud 支持的一种开发范式,鼓励采用持续交付和以价值为导向的开发...

    SpringCloud 15个完整例子

    10. **Spring Cloud Stream**:Stream用于构建消息驱动的微服务,支持RabbitMQ或Kafka等消息中间件,实现服务间的数据通信。 11. **Spring Boot**:Spring Boot是构建微服务的基础,它简化了Spring应用的初始搭建...

Global site tag (gtag.js) - Google Analytics