package com.sohu.babyduncan;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author: guohaozhao
* @since: 13-7-10 21:41
*/
public class ThreadTest {
private static final byte[] lock = new byte[0];
static volatile boolean startProduce = true;
static volatile boolean startConsume = true;
private static final List<String> list = new ArrayList<String>();
private static class Producer implements Runnable {
@Override
public void run() {
while (true) {
if (!startProduce) {
continue;
}
synchronized (lock) {
String s = System.currentTimeMillis() + "";
System.out.println("porduce one " + s);
list.add(s);
if (list.size() > 10) {
startProduce = false;
startConsume = true;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
if (!startConsume) {
continue;
}
synchronized (lock) {
System.out.println("consume one " + list.get(0));
list.remove(0);
if (list.size() == 0) {
startProduce = true;
startConsume = false;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String... args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
countDownLatch.await();
}
}
运行结果:
porduce one 1373469039014
porduce one 1373469039014
consume one 1373469039014
consume one 1373469039014
porduce one 1373469039014
consume one 1373469039014
porduce one 1373469039014
porduce one 1373469039014
porduce one 1373469039014
consume one 1373469039014
porduce one 1373469039019
porduce one 1373469039053
porduce one 1373469039065
porduce one 1373469039065
porduce one 1373469040034
porduce one 1373469040034
porduce one 1373469040034
porduce one 1373469040038
porduce one 1373469040038
consume one 1373469039014
consume one 1373469039014
consume one 1373469039019
consume one 1373469039053
consume one 1373469039065
consume one 1373469039065
consume one 1373469040034
consume one 1373469040034
consume one 1373469040034
consume one 1373469040038
分享到:
相关推荐
至此,你已经在Windows上成功搭建了一个Kafka的单机环境,可以进行简单的生产和消费操作。对于更复杂的场景,如多节点集群、配置高可用性和数据持久化等,需要进一步学习和实践。如果你对此感兴趣,可以探索更多关于...
7. **生产与消费消息**:可以使用内置的`kafka-console-producer.sh`和`kafka-console-consumer.sh`命令进行消息生产和消费测试。 **2. Kafka集群搭建** Kafka集群提供高可用性和容错性。搭建步骤与单机类似,但需...
RocketMQ是一个基于发布/订阅模式的消息队列,支持高吞吐量、低延迟、高可用性和可扩展性。它主要包含四个组件:NameServer、Producer、Consumer和Broker。NameServer是服务发现和路由管理的角色,Producer负责生产...
一个 Kafka 的 Message 由一个固定长度的 header 和一个可变长度的消息体 body 组成。header 部分由一个字节的 magic 和四个字节的 CRC32 构成,用于判断 body 消息体是否正常。 Kafka 高吞吐原理 Kafka 实现高...
kafka集群类型: single broker(单节点单boker集群,亦即kafka只启一个broker消息中间件服务,producer、consumer、broker均通过zookeeper集群交换消息,具体可参考:http://exp-blog.com/2018/08/03/pid-2187/
3. **NameServer**:NameServer是一个轻量级的服务注册与发现组件,Producer和Consumer通过NameServer找到对应的Broker地址,不涉及具体的消息存储和传输。 4. **Broker**:Broker是消息的存储和转发节点,负责接收...
在单机测试环境中部署Zookeeper、Kafka和Redis是进行分布式系统开发和测试的基础工作。这里我们将详细介绍这三个组件的安装配置步骤。 首先,我们来看Zookeeper。Zookeeper是一个分布式的,开放源码的分布式应用...
Consumer Group是一种逻辑划分机制,可以确保一个Topic的消息被不同组的Consumer消费,而同一个组内的Consumer通常只消费消息的一部分。 #### 二、Kafka架构 Kafka采用了一种简单的Publisher/Subscriber模型,即...
RocketMQ支持集群模式,这里我们先启动一个简单的单机Broker。 ```bash nohup sh bin/mqbroker -n localhost:9876 & ``` 如果一切顺利,控制台会输出`Broker runing now...`,表示Broker已启动。 **6. 创建Topic**...
在单机部署中,我们通常会在一台机器上设置一个Broker,这台机器既是生产者也是消费者。 1. **配置文件**:Kafka的配置主要通过`server.properties`文件进行,这个文件位于Kafka安装目录的config子目录下。在这个...
1. NameServer:这是一个轻量级的注册中心,负责维护 Topic 与 Broker 的映射关系,不存储任何业务数据,高可用性通过集群部署来实现。 2. Broker:Broker 是 RocketMQ 的核心组件,负责接收 Producer 发送的消息,...
Producer,consumer 实现 Kafka 注册的接口,数据从 producer 发送到 broker,broker 承担一个中间缓存和分发的作用。broker 分发注册到系统中的 consumer。broker 的作用类似于缓存,即活跃的数据和离线处理系统...
Docker Compose是一个工具,用于定义和运行多容器Docker应用。通过YAML文件(docker-compose.yml)配置服务、网络和卷,可以一次性启动整个应用环境。 3. **Kafka与Zookeeper** Apache Kafka是一个分布式流处理...
每个Broker会在启动时向所有的NameServer注册自己的信息,Producer和Consumer在运行时会从NameServer获取Broker的列表,这样可以确保即使某个NameServer宕机,服务也不会中断。 **Broker集群搭建** Broker集群是...
2. 单机只能启动一个进程的问题:阐述解决单机只能启动一个RocketMQ进程的常见问题。 以上指南提供了一套完整的RocketMQ使用和部署的方法论,帮助开发者和运维人员能够快速上手并有效利用RocketMQ构建稳定高效的...
示例中创建了一个名为test的Topic,配置了1个partition和1个replica。 6. Kafka服务的测试:为了验证Kafka服务是否正常工作,可以通过执行bin/kafka-topics.sh脚本列出所有已创建的Topic,确认刚才创建的Topic test...
- 安装指南:详述RocketMQ的部署和配置步骤,包括单机、集群和云环境。 - API详解:介绍Producer、Consumer、Admin等接口的使用方法。 - 高级特性:如消息过滤、消息回溯、延时消息、定时消息等的实现与配置。 3...
- **Consumer Group**:当一个 Topic 有多个 Consumer 时,可以通过 Consumer Group 进行管理,确保每个消息至少被一个 Consumer 处理一次。 ### Kafka 的安装与部署 #### Kafka 安装部署步骤 1. **环境准备**:...
安装环境包括硬件环境和软件环境,硬件环境需要三台服务器,每个服务器拥有2个物理CPU、8个核心和32G内存。软件环境包括Kafka版本为kafka_2.10-0.8.2.0和Spark版本为1.3.0。 在安装Kafka单机环境时,需要解压缩...