前言
最近有个需求,在不同的系统中做数据同步。我们是java+mysql、他们是c#+sqlserver。需求是sqlserver提出的,并且他们提出要实时,并且要我们主动推数据给他们。他们接口都提供好了,说要我们对数据库表操作的时候调用他们的接口把数据传他们。咋看没有什么事,不就是一个接口的调用么。仔细想想,这样对我们的系统影响还是很大的,其他的不说。重要的一点是我们的系统都依赖他们的系统了,如果他们的系统问题或网络问题会影响我们系统的操作,这显然是不可行的。为了保卫我们系统的利益。这种事是绝对不能做的。
讨论了一下了解到,他们的需求无非就是需要实时能得到某个表的数据码。刚开始我提出,我们开一个接口,让你们查看我们从库数据不就好了,这样多省事。可是他们说自己要保存数据到sqlserver(当然还有其他原因)。他们要把事情搞复杂也没办法。当然,我们同样要保护自己的利益啊。这时候就想到了使用 MQ 消息队列的方案。我们只要在数据操作成功后吧数据传到 MQ 中,之后的处理就让他们自己做了。真的是费了好大的力气才说服让他们使用 MQ 啊~~~
下面就使用python来模拟一下我们的方案(希望大家来吐槽 :) )
软件介绍
在这里我们使用 zookeeper + kafka 的方案来做。
软件 |
版本 |
其他 |
zookeeper |
3.4.6 |
|
kafka |
2.10-0.9.0.0 |
|
pykafka |
2.1.2 |
python的kafka API |
zookeeper + kafka 基本使用教程
http://www.linuxidc.com/Linux/2014-07/104470.htm
先决条件
- 使用zookeeper、kafka创建一个topic名为 goods-topic
- 需要安装pykafka一个python的zookeeper、kafka API
- 一个goods示例数据库
|
# 启动zookeeper
/usr/local/zookeeper/bin/zkServer.sh start
# 启动kafka
/usr/local/kafka/bin/kafka-server-start.sh/usr/local/kafka/config/server.properties>/tmp/kafka-logs/kafka.out2>&1&
# 创建 goods-topic
/usr/local/kafka/bin/kafka-topics.sh\
--create\
--zookeeper localhost:2181\
--replication-factor1\
--partitions1\
--topic test
|
官网:http://readthedocs.org/projects/pykafka/
|
CREATE TABLE goods(
goods_id INTNOTNULLAUTO_INCREMENT,
goods_name VARCHAR(30)NOTNULL,
goods_price DECIMAL(13,2)NOTNULLDEFAULT0.00,
create_time DATETIME NOTNULL,
PRIMARY KEY(goods_id)
);
|
伪代码展示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import time,json
from pykafka import KafkaClient
# 相关的mysql操作
mysql_op()
# 可接受多个Client这是重点
client=KafkaClient(hosts="192.168.1.233:9092, \
192.168.1.233:9093, \
192.168.1.233:9094")
# 选择一个topic
topic=client.topics['goods-topic']
# 创建一个生产者
producer=topic.get_producer()
# 模拟接收前端生成的商品信息
goods_dict={
'option_type':'insert'
'option_obj':{
'goods_name':'goods-1',
'goods_price':10.00,
'create_time':time.strftime('%Y-%m-%d %H:%M:%S')
}
}
goods_json=json.dumps(goods_dict)
# 生产消息
producer.produce(msg)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
import time,json
from pykafka import KafkaClient
# 可接受多个Client这是重点
client=KafkaClient(hosts="192.168.1.233:9092, \
192.168.1.233:9093, \
192.168.1.233:9094")
# 选择一个topic
topic=client.topics['goods-topic']
# 生成一个消费者
balanced_consumer=topic.get_balanced_consumer(
consumer_group='goods_group',
auto_commit_enable=True,
zookeeper_connect='localhost:2181'
)
# 消费信息
formessage inbalanced_consumer:
ifmessage isnotNone:
# 解析json为dict
goods_dict=json.loads(message)
# 对数据库进行操作
ifgoods_dict['option_type']=='insert':
mysql_insert()
elif goods_dict['option_type']=='update':
mysql_update()
elif goods_dict['option_type']=='delete':
mysql_delete()
else:
order_option()
|
作者信息
昵称:HH
QQ:275258836
分享到:
相关推荐
基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 基于Kafka的管理系统源码 ...
作为 Kafka 审计系统,Chaperone 监控数据流的完整性和延迟。审计指标持久存储在数据库中,供 Kafka 用户量化其主题的损失。Chaperone 的目标是在数据流经数据管道的每个阶段,能够抓住每个消息,统计一定时间段内的...
**KAFKA分布式消息系统在Windows环境下的搭建与应用** KAFKA是一个高吞吐量的分布式消息系统,由LinkedIn开发并开源,现在是Apache软件基金会的顶级项目。它主要设计用于处理实时流数据,允许应用程序发布和订阅...
本使用kafka,spark,hbase开发日志分析系统。 ![architecture](/docs/images/architecture.png "architecture") ### 软件模块 * Kafka:作为日志事件的消息系统,具有分布式,可分区,可冗余的消息服务功能。...
Kafka 使用手册 Kafka 是一个高性能、分布式的消息队列系统,广泛应用于大数据处理、实时数据处理和流式数据处理等领域。本文档将详细介绍 Kafka 的安装步骤、基本操作命令和配置文件的修改,以便让初学者快速入门...
**Kafka Tool 2.0.7 在 Linux 系统中的使用详解** Kafka Tool 是一款功能强大的 Apache Kafka 管理工具,适用于监控、管理、以及数据迁移等任务。在 Linux 系统中,我们可以方便地利用此工具进行各种 Kafka 相关的...
使用 Kafka-Eagle 可以帮助运维人员更有效地管理 Kafka 集群,及时发现和解决问题,确保 Kafka 的稳定运行。对于大数据系统来说,这样的监控工具至关重要,因为它能够提高系统的可用性和性能,降低故障排查的成本。
(阿里云kafka使用时需要结合阿里云提供的认证部分,另外上传文件时,阿里云kafka还在公测期间);如果你够厉害,而且数据规模足够,可以使用storm+kafka,彼时kafka当然就不会这么简单的使用了。
当我们需要将日志数据发送到Apache Kafka这种分布式消息系统时,就需要配置一个特定的Appender,即Log4jKafkaAppender。这个Appender允许我们将日志事件实时发布到Kafka主题,从而实现日志的高效传输和处理。 为了...
《Kafka部署与使用详解》 Kafka是一种分布式流处理平台,广泛应用于大数据实时处理、日志收集、消息系统等领域。这份详尽的PDF文档详细介绍了如何在Linux环境下部署和使用Kafka,包括单机部署和集群部署。 一、...
在软件版本方面,系统使用了Kafka 3.x版本、Zookeeper 3.x版本、Elasticsearch 7.x版本、Kibana 7.x版本和FileBeat 7.x版本。 在Kafka集群的搭建中,需要关闭防火墙,并安装Kafka和Zookeeper。Zookeeper需要手动...
这个名为"Kafka使用到的所有jar包"的压缩文件包含了运行和开发Kafka应用所需的核心库和依赖。以下是其中可能包含的一些关键jar包及其作用: 1. **kafka_2.13-*.jar**: 这是Kafka的主要客户端库,包含了生产者、消费...
**Kafka使用与安装** Kafka是一款开源的分布式流处理平台,由LinkedIn开发并贡献给Apache软件基金会。它被设计为高吞吐量、低延迟的消息发布和订阅系统,广泛应用于大数据实时处理、日志聚合、用户行为追踪等多个...
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 Kafka 的特点: 1....
### Kafka使用说明文档知识点梳理 #### 一、Kafka简介 **1.1 Kafka基本介绍** Kafka是一款由LinkedIn开发并捐赠给Apache基金会的顶级开源项目。它是一种分布式消息系统,具备高吞吐量和易扩展性等特点。Kafka的...
Kafka多维度系统精讲,从入门到熟练掌握视频教程,完整版13章,2020新课。本课从搭建开始,全面剖析Kafka,解读和使用核心API,将底层实现和设计原理融合贯穿,同时结合案例,把原理落地。更有凝结老师心血的Kafka...
Kafka 集群搭建与使用详解 Kafka 是一种分布式流媒体平台,由 Apache 开源项目提供。它主要用来构建实时数据管道和流媒体处理系统。本文档将详细介绍 Kafka 集群的搭建和使用,包括创建、删除、生产者、消费者等...
### Kafka使用手册知识点详解 #### 一、Kafka概述与入门 **Kafka** 是一款高性能的分布式消息系统,主要用于处理大规模实时数据流。它基于发布/订阅模型,能够高效地支持消息的发布和订阅。Kafka 在数据处理领域,...
Apache Kafka是一个分布式的流处理平台,广泛应用于大数据实时处理、日志聚合、消息系统等多个领域。在Kafka的实际操作中,管理和监控集群是至关重要的任务,而Kafka Tool就是这样一款专为Kafka设计的图形化管理工具...
标题"使用netty实现TCP长链接消息写入kafka以及kafka批量消费数据"揭示了这个项目的核心内容:通过Netty接收TCP长连接的数据,并将这些数据存储到Kafka中,同时利用Kafka的批量消费功能对数据进行处理。下面我们将...