`

Databus架构分析与初步实践

阅读更多

1. 简介

Databus是一个低延迟、可靠的、支持事务的、保持一致性的数据变更抓取系统。由LinkedIn于2013年开源。Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更并进行其他业务逻辑。

Databus有以下特点:

  • 数据源和消费者之间的隔离。
  • 数据传输能保证顺序性和至少一次交付的高可用性。
  • 从变化流的任意时间点进行消费,包括通过bootstrap获取所有数据。
  • 分区消费
  • 源一致性保存,消费不成功会一直消费直到消费成功

2. 功能&特性

  • 来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。
  • 可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。
  • 事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。
  • 低延迟、支持多种订阅机制:数据源变更完成后,Databus能在毫秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。
  • 无限回溯:对消费者支持无限回溯能力,例如当消费者需要产生数据的完整拷贝时,它不会对数据库产生任何额外负担。当消费者的数据大大落后于来源数据库时,也可以使用该功能。

3. 使用场景举例

BUSSINESS1 和 BUSSINESS2 是两个不同的业务逻辑,他们的变更需要同时写入到 DB 和 CACHE ,那么当他们同时修改同一个数据的时候是否能保证数据的一致性呢?可以发现如果按照下图标明的顺序进行操作并不能保证数据的一致性!

1-1

还有一个问题是变更完DB之后,更新CACHE失败怎么办?如果忽略的话,会造成后续读取到CACHE中旧的数据,如果重试的话,业务代码会写得更加复杂。针对这些场景,如果没有一个强一致协议是很难解决掉的。如果要业务逻辑去实现这些晦涩的一致性协议,却又是不现实的。

现在,有了Databus,上面提到的这些一致性问题就都没有了,并且那些冗长的双写逻辑也可以去掉了,如下图所示:

1-1

4. 系统整体架构与主要组件

4.1 系统整体架构

1-1

上图中介绍了Databus系统的构成,包括Relays、bootstrap服务和Client lib等。Bootstrap服务中包括Bootstrap Producer和Bootstrap Server。快速变化的消费者直接从Relay中取事件。如果一个消费者的数据更新大幅落后,它要的数据就不在Relay的日志中,而是需要请求Bootstrap服务,返回的将会是自消费者上次处理变更之后的所有数据变更快照。

  • Source Databases:MySQL以及Oracle数据源
  • Relays:负责抓取和存储数据库变更,全内存存储,也可配置使用mmap内存映射文件方式
  • Schema Registry:数据库数据类型到Databus数据类型的一个转换表
  • Bootstrap Service:一个特殊的客户端,功能和Relays类似,负责存储数据库变更,主要是磁盘存储
  • Application:数据库变更消费逻辑,从Relay中拉取变更,并消费变更
  • Client Lib:提供挑选关注变更的API给消费逻辑
  • Consumer Code:变更消费逻辑,可以是自身消费或者再将变更发送至下游服务

4.2 主要组件及功能

上图系统整体架构图画的比较简略,下载源码观察项目结构后不难发现databus的主要由以下四个组件构成:

  • Databus Relay
    • 从源数据库中的Databus源中读取变化的行并序列化为Databus变化事件保存到内存缓冲区中。
    • 监听Databus客户端的请求(包括引导程序的请求)并传输Databus数据变化事件。
  • Databus Client
    • 在Relay上检查新的数据变化事件和处理特定的业务逻辑的回调。
    • 如果它们在relay后面落下太远,到引导程序服务运行一个追溯查询。
    • 单独的客户端可以处理全部的Databus流,它们也可以作为集群的一部分而每个客户端处理一部分流。
  • Databus Bootstrap Producer
    • 只是一个特殊的客户端。
    • 检查Relay上的新的数据变化事件。
    • 保存数据变化事件到Mysql数据库,Mysql数据库用于引导程序和为了客户端追溯数据。
  • Databus Bootstrap Server
    • 监听来自Databus客户端的请求并为了引导和追溯返回一个超长的回溯的数据变化事件。

5. Databus Relay和Databus Client详细分析

5.1 Databus Relay

5.1.1 架构与组件功能

1-1
  • Databus Event Producer(DBEP):定期从数据库中查询变更,如果检测到变更,它将读取数据库中的所有已更改的行,并将其转换为Avro记录。因为数据库数据类型和Databus数据类型不一致,因此需要 Schema Registry 做转换。

  • SCN(System Change Number):系统改变号,是数据库中非常重要的一个数据结构。SCN用以标识数据库在某个确切时刻提交的版本。在事务提交时,它被赋予一个唯一的标识事务的SCN。

  • Event Buffers:按照SCN的顺序存储databus事件,buffer可以是纯内存的,也可以是mmap到文件系统的。每个buffer在内存中还有一个对应的SCN Index和一个MaxSCN reader/writer,SCN Index可以加快查询指定事件的速度。

  • Request Processor:通过监听Netty的channel,实现收发client的请求。

  • MaxSCN Reader/Writer:用于跟踪DBEP的处理进度;Reader在Databus启动的时候会读取存储的文件上一次DBEP处理的位置,当Databus从DBEP中读取变更存储到Event Buffers时,Writer就会最后一个SCN写入到文件中存储,这样就能保证下次启动可以从正确的位置读取数据库变更。

  • JMX(Java Management Extensions):支持标准的Jetty容器,databus提供了多个Mbean来监控relay

    • ContainerStatsMBean
    • DbusEventsTotalStatsMBean
    • DbusEventsStatisticsCollectorMBean
  • RESTFul Interface:Realy提供了相关http接口供外部调用,Client与Relay建立http长连接,并从Relay拉取Event。

5.1.2 源码分析

  • ServerContainer._globalStatsThread:统计信息的线程

  • OpenReplicatorEventProducer.EventProducerThread:针对mysql binlog日志的Event生产者线程,每个source一个线程,持有_orListener,管理和数据库的连接,将变更写入到Event Buffer里。

  • EventProducerThread启动后会初始化类型为OpenReplicator的日志解析对象开始解析日志,同时初始化类型为ORListener的_orListener开始监听,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Override
    public void run()
    {
    _eventBuffer.start(_sinceScn);
    _startPrevScn.set(_sinceScn);
     
    initOpenReplicator(_sinceScn);
    try
    {
    boolean started = false;
    while (!started) {
    try {
    _or.start();
    started = true;
    }
    catch (Exception e) {
    _log.error("Failed to start OpenReplicator: " + e);
    _log.warn("Sleeping for 1000 ms");
    Thread.sleep(1000);
    }
    }
    _orListener.start();
    } catch (Exception e)
    {
    _log.error("failed to start open replicator: " + e.getMessage(), e);
    return;
    }
    }

    初始化方法如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void initOpenReplicator(long scn)
    {
    int offset = offset(scn);
    int logid = logid(scn);
     
    String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid);
    // we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction.
    _orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap,
    _tableUriToSrcNameMap, _schemaRegistryService, 200, 100L);
     
    _or.setBinlogFileName(binlogFile);
    _or.setBinlogPosition(offset);
    _or.setBinlogEventListener(_orListener);
     
    //must set transport and binlogParser to null to drop the old connection environment in reinit case
    _or.setTransport(null);
    _or.setBinlogParser(null);
     
    _log.info("Connecting to OpenReplicator " + _or.getUser() + "@" + _or.getHost() + ":" + _or.getPort() + "/"
    + _or.getBinlogFileName() + "#" + _or.getBinlogPosition());
    }

    EventProducerThread._orListener:监听数据库变更,将变更转换为Avro记录,写入到transaction里面,最终调用_producerThread的onEndTransaction()方法将事务里的事件写入到Event Buffer里,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Override
    public void onEndTransaction(Transaction txn) throws DatabusException
    {
    try
    {
    addTxnToBuffer(txn);
    _maxSCNReaderWriter.saveMaxScn(txn.getIgnoredSourceScn()!=-1 ? txn.getIgnoredSourceScn() : txn.getScn());
    }
    catch (UnsupportedKeyException e)
    {
    _log.fatal("Got UnsupportedKeyException exception while adding txn (" + txn + ") to the buffer", e);
    throw new DatabusException(e);
    }
    catch (EventCreationException e)
    {
    _log.fatal("Got EventCreationException exception while adding txn (" + txn + ") to the buffer", e);
    throw new DatabusException(e);
    }
    }

    FileMaxSCNHandler负责读写SCN,注意在写入文件时会将原有文件重命名为XXX.temp,原因是为了防止在更新文件的时候发生错误,导致SCN丢失,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private void writeScnToFile() throws IOException
    {
    long scn = _scn.longValue();
     
    File dir = _staticConfig.getScnDir();
    if (! dir.exists() && !dir.mkdirs())
    {
    throw new IOException("unable to create SCN file parent:" + dir.getAbsolutePath());
    }
     
    // delete the temp file if one exists
    File tempScnFile = new File(_scnFileName + TEMP);
    if (tempScnFile.exists() && !tempScnFile.delete())
    {
    LOG.error("unable to erase temp SCN file: " + tempScnFile.getAbsolutePath());
    }
     
    File scnFile = new File(_scnFileName);
    if (scnFile.exists() && !scnFile.renameTo(tempScnFile))
    {
    LOG.error("unable to backup scn file");
    }
     
    if (!scnFile.createNewFile())
    {
    LOG.error("unable to create new SCN file:" + scnFile.getAbsolutePath());
    }
    FileWriter writer = new FileWriter(scnFile);
    writer.write(Long.toString(scn));
    writer.write(SCN_SEPARATOR + new Date().toString());
    writer.flush();
    writer.close();
    LOG.debug("scn persisted: " + scn);
    }
  • 以源码例子中PersonRelayServer的主类启动为起点,大致的启动流程如下:

    PersonRelayServer主方法 -> new DatabusRelayMain实例 -> 调用initProducers方法初始化生产者->根据配置调用addOneProducer增加生产者->new DbusEventBufferAppendable获得Event Buffer->new EventProducerServiceProvider实例->
    调用createProducer获得OpenReplicatorEventProducer->OpenReplicatorEventProducer中包含
    EventProducerThread->启动线程开始获取Event

5.2 Databus Client

5.2.1 架构与组件功能

1-1
  • Relay Puller:负责从relay拉取数据,具体工作有挑选relay,请求source,请求Register,校验schema,设置dispatcher等。

  • Dispatcher:从event buffers中读取事件,调用消费逻辑的回调,主要职责有:

    • 判断回调是否正确,回调失败后会进行重试,重试次数超限后抛出异常
    • 监控错误和超时
    • 持久化checkpoint
  • Checkpoint persistence Provider:checkpoint是消费者消费变更记录点的位置,负责将checkpoint持久化到本地,保证下次启动后可以从正常的位置pull event。

  • Event Callback:调用消费者自定义业务逻辑代码。

  • Bootstrap Puller:负责从Bootstrap servers拉取数据,功能类似Relay Puller。

5.2.2 源码分析

执行Client的启动脚本后会调用main方法,main方法会根据命令行参数中指定的属性文件创建StaticConfig类,然后配置类创建dbusHttpClient实例来与Relay进行通信,参数defaultConfigBuilder为默认配置类信息,可以为空,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static DatabusHttpClientImpl createFromCli(String[] args, Config defaultConfigBuilder) throws Exception
{
Properties startupProps = ServerContainer.processCommandLineArgs(args);
if (null == defaultConfigBuilder) defaultConfigBuilder = new Config();
 
ConfigLoader<StaticConfig> staticConfigLoader =
new ConfigLoader<StaticConfig>("databus.client.", defaultConfigBuilder);
 
StaticConfig staticConfig = staticConfigLoader.loadConfig(startupProps);
 
DatabusHttpClientImpl dbusHttpClient = new DatabusHttpClientImpl(staticConfig);
 
return dbusHttpClient;
}

设置要连接的Relay信息,然后通过参数defaultConfigBuilder传递给dbusHttpClient,代码如下:

1
2
3
4
5
DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();
configBuilder.getRuntime().getRelay("1").setHost("localhost");
configBuilder.getRuntime().getRelay("1").setPort(11115);
configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);
}

启动databus client过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
protected void doStart()
{
_controlLock.lock();
try
{
// 绑定并开始接收来到的连接
int portNum = getContainerStaticConfig().getHttpPort();
_tcpChannelGroup = new DefaultChannelGroup();
_httpChannelGroup = new DefaultChannelGroup();
 
_httpServerChannel = _httpBootstrap.bind(new InetSocketAddress(portNum));
InetSocketAddress actualAddress = (InetSocketAddress)_httpServerChannel.getLocalAddress();
_containerPort = actualAddress.getPort();
 
// 持久化端口号 (文件名对容器来说必须唯一)
File portNumFile = new File(getHttpPortFileName());
portNumFile.deleteOnExit();
try {
FileWriter portNumFileW = new FileWriter(portNumFile);
portNumFileW.write(Integer.toString(_containerPort));
portNumFileW.close();
LOG.info("Saving port number in " + portNumFile.getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException(e);
}
 
_httpChannelGroup.add(_httpServerChannel);
LOG.info("Serving container " + getContainerStaticConfig().getId() +
" HTTP listener on port " + _containerPort);
 
if (_containerStaticConfig.getTcp().isEnabled())
{
int tcpPortNum = _containerStaticConfig.getTcp().getPort();
_tcpServerChannel = _tcpBootstrap.bind(new InetSocketAddress(tcpPortNum));
_tcpChannelGroup.add(_tcpServerChannel);
 
LOG.info("Serving container " + getContainerStaticConfig().getId() +
" TCP listener on port " + tcpPortNum);
}
 
_nettyShutdownThread = new NettyShutdownThread();
Runtime.getRuntime().addShutdownHook(_nettyShutdownThread);
 
// 5秒后开始producer线程
if (null != _jmxConnServer && _containerStaticConfig.getJmx().isRmiEnabled())
{
try
{
_jmxShutdownThread = new JmxShutdownThread(_jmxConnServer);
Runtime.getRuntime().addShutdownHook(_jmxShutdownThread);
 
_jmxConnServer.start();
LOG.info("JMX server listening on port " + _containerStaticConfig.getJmx().getJmxServicePort());
}
catch (IOException ioe)
{
if (ioe.getCause() != null && ioe.getCause() instanceof NameAlreadyBoundException)
{
LOG.warn("Unable to bind JMX server connector. Likely cause is that the previous instance was not cleanly shutdown: killed in Eclipse?");
if (_jmxConnServer.isActive())
{
LOG.warn("JMX server connector seems to be running anyway. ");
}
else
{
LOG.warn("Unable to determine if JMX server connector is running");
}
}
else
{
LOG.error("Unable to start JMX server connector", ioe);
}
}
}
 
_globalStatsThread.start();
}
catch (RuntimeException ex)
{
LOG.error("Got runtime exception :" + ex, ex);
throw ex;
}
finally
{
_controlLock.unlock();
}
}

6. Databus for Mysql实践

6.1 相关解释

  • 实现原理:通过解析mysql的binlog日志来获取变更事件,解析过程利用Java开源工具OpenReplicator,Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用,所有的Event实现了BinlogEventV4接口。

  • binlog 格式:Databus设计为针对Row格式日志进行解析

    • Statement:基于SQL语句的复制(statement-based replication,SBR)
    • Row:基于行的复制(row-based replication,RBR)
    • Mixed:混合模式复制(mixed-based replication,MBR)
  • SCN的确定:64bits组成,高32位表示binlog的文件序号,低32位代表event在binlog文件的offset,例如在 mysql-bin.000001文件中 offset为 4的scn表示为(1 << 32) | 4 = 4294967300

6.2 数据库环境配置

  • 安装mysql数据库,本次使用mysql-5.5.56版本。

  • 查看数据库是否开启binlog,如果binlog没有开启,可以通过set sql_log_bin=1命令来启用;如果想停用binlog,可以使用set sql_log_bin=0。

    1-1

  • 配置数据库binlog_format=ROW, show variables like ‘binlog_format‘可查看日志格式, set globle binlog_format=ROW’可设置,通过修改my.cnf文件也可以,增加或修改行binlog_format=ROW即可。

    1-1

  • binlog_checksum设置为空,show global variables like ‘binlog_checksum’命令可查看,set binlog_checksum=none可设置。

    1-1

  • 在mysql上创建名为or_test的数据库,or_test上创建表名为person的表,定义如下:

    1-1

6.3 Demo配置与运行

6.3.1 下载源码

  • Databus官网下载源码,下载地址https://github.com/linkedin/databus.git,我们需要用到databus目录下的databus2-example文件夹,在此基础上改造并运行,目录结构及介绍如下:

    1-1

    • database:数据库模拟相关的脚本和工具
    • databus2-example-bst-producer-pkg:bootstrap producer的属性配置文件夹,包括bootstrap producer和log4j属性文件,build脚本以及bootstrap producer的启动和停止脚本。
    • databus2-example-client-pkg:client的属性配置文件夹,包括各种属性文件和启动和停止脚本。
    • databus2-example-client:client源代码,包含启动主类和消费者代码逻辑。
    • databus2-example-relay-pkg:relay的属性配置文件夹,包含监控的表的source信息和Avro schema。
    • databus2-example-relay:relay的启动主类。
    • schemas_registry:存放表的avsc文件。

6.3.2 Relay端的操作

  • 配置Relay属性文件:databus2-example-relay-pkg/conf/relay-or-person.properties的内容如下配置,包括端口号,buffer存储策略,maxScn存放地址等信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    databus.relay.container.httpPort=11115
    databus.relay.container.jmx.rmiEnabled=false
    databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
    databus.relay.eventBuffer.queuePolicy=OVERWRITE_ON_WRITE
    databus.relay.eventLogReader.enabled=false
    databus.relay.eventLogWriter.enabled=false
    databus.relay.schemaRegistry.type=FILE_SYSTEM
    databus.relay.schemaRegistry.fileSystem.schemaDir=./schemas_registry
    databus.relay.eventBuffer.maxSize=1024000000
    databus.relay.eventBuffer.readBufferSize=10240
    databus.relay.eventBuffer.scnIndexSize=10240000
    databus.relay.physicalSourcesConfigsPattern=../../databus2-example/databus2-example-relay-pkg/conf/sources-or-person.json
    databus.relay.dataSources.sequenceNumbersHandler.file.scnDir=/tmp/maxScn
    databus.relay.startDbPuller=true
  • 配置被监控表的source信息:databus2-example-relay-pkg/conf/sources-or-person.json的内容如下配置,其中URI format:mysql://username/password@mysql_host[:mysql_port]/mysql_serverid/binlog_prefix,注意%2F为转义字符,用户名为root,数据库密码为123。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    {
    "name" : "person",
    "id" : 1,
    "uri" : "mysql://root%2F123@localhost:3306/1/mysql-bin",
    "slowSourceQueryThreshold" : 2000,
    "sources" :
    [
    {
    "id" : 40,
    "name" : "com.linkedin.events.example.or_test.Person",
    "uri": "or_test.person",
    "partitionFunction" : "constant:1"
    }
    ]
    }
  • databus2-example-relay-pkg/schemas_registry/下定义person的Avro schema文件
    com.linkedin.events.example.or_test.Person.1.avsc,其中1表示版本(Databus目前没有针对mysql提供生成Avro schema文件的工具,所以只能手工编写)具体内容如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    {
    "name" : "Person_V1",
    "doc" : "Auto-generated Avro schema for sy$person. Generated at Dec 04, 2012 05:07:05 PM PST",
    "type" : "record",
    "meta" : "dbFieldName=person;pk=id;",
    "namespace" : "com.linkedin.events.example.or_test",
    "fields" : [ {
    "name" : "id",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=ID;dbFieldPosition=0;"
    }, {
    "name" : "firstName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=FIRST_NAME;dbFieldPosition=1;"
    }, {
    "name" : "lastName",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=LAST_NAME;dbFieldPosition=2;"
    }, {
    "name" : "birthDate",
    "type" : [ "long", "null" ],
    "meta" : "dbFieldName=BIRTH_DATE;dbFieldPosition=3;"
    }, {
    "name" : "deleted",
    "type" : [ "string", "null" ],
    "meta" : "dbFieldName=DELETED;dbFieldPosition=4;"
    } ]
    }
  • 注册Avro schema到index.schemas_registry文件,databus2-example-relay-pkg/schemas_registry/index.schemas_registry文件中添加行com.linkedin.events.example.or_test.Person.1.avsc ,每定义一个Avro schema都需要添加进去,relay运行时会到此文件中查找表对应的定义的Avro schema。

6.3.3 Client端的操作

  • 配置Client属性文件:databus2-example-client-pkg/conf/client-person.properties的内容如下配置,包括端口号,buffer存储策略,checkpoint持久化等信息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    databus.relay.container.httpPort=11125
    databus.relay.container.jmx.rmiEnabled=false
    databus.relay.eventBuffer.allocationPolicy=DIRECT_MEMORY
    databus.relay.eventBuffer.queuePolicy=BLOCK_ON_WRITE
    databus.relay.schemaRegistry.type=FILE_SYSTEM
    databus.relay.eventBuffer.maxSize=10240000
    databus.relay.eventBuffer.readBufferSize=1024000
    databus.relay.eventBuffer.scnIndexSize=1024000
    databus.client.connectionDefaults.pullerRetries.initSleep=1
    databus.client.checkpointPersistence.fileSystem.rootDirectory=./personclient-checkpoints
    databus.client.checkpointPersistence.clearBeforeUse=false
    databus.client.connectionDefaults.enablePullerMessageQueueLogging=true
  • databus2-example-client/src/main/java下的PersonConsumer类是消费逻辑回调代码,主要是取出每一个event后依次打印每个字段的名值对,主要代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private ConsumerCallbackResult processEvent(DbusEvent event, DbusEventDecoder eventDecoder)
    {
    GenericRecord decodedEvent = eventDecoder.getGenericRecord(event, null);
    try {
    Utf8 firstName = (Utf8)decodedEvent.get("firstName");
    Utf8 lastName = (Utf8)decodedEvent.get("lastName");
    Long birthDate = (Long)decodedEvent.get("birthDate");
    Utf8 deleted = (Utf8)decodedEvent.get("deleted");
     
    LOG.info("firstName: " + firstName.toString() +
    ", lastName: " + lastName.toString() +
    ", birthDate: " + birthDate +
    ", deleted: " + deleted.toString());
    } catch (Exception e) {
    LOG.error("error decoding event ", e);
    return ConsumerCallbackResult.ERROR;
    }
     
    return ConsumerCallbackResult.SUCCESS;
    }
  • databus2-example-client/src/main/java下的PersonClient类是relay的启动主类,主要是设置启动Client的配置信息,将消费者实例注册到监听器中,后续可对其进行回调,主要代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public static void main(String[] args) throws Exception
    {
    DatabusHttpClientImpl.Config configBuilder = new DatabusHttpClientImpl.Config();
     
    //Try to connect to a relay on localhost
    configBuilder.getRuntime().getRelay("1").setHost("localhost");
    configBuilder.getRuntime().getRelay("1").setPort(11115);
    configBuilder.getRuntime().getRelay("1").setSources(PERSON_SOURCE);
     
    //Instantiate a client using command-line parameters if any
    DatabusHttpClientImpl client = DatabusHttpClientImpl.createFromCli(args, configBuilder);
     
    //register callbacks
    PersonConsumer personConsumer = new PersonConsumer();
    client.registerDatabusStreamListener(personConsumer, null, PERSON_SOURCE);
    client.registerDatabusBootstrapListener(personConsumer, null, PERSON_SOURCE);
     
    //fire off the Databus client
    client.startAndBlock();
    }

6.3.4 build-启动-测试

  • Build:Databus采用gradle进行编译,所以需要安装gradle环境,安装安成后进入databus根目录执行命令gradle -Dopen_source=true assemble即可完成build,成功后在databus根目录下生成名为build的文件夹

  • 启动Relay:

    1. cd build/databus2-example-relay-pkg/distributions
    2. tar -zxvf databus2-example-relay-pkg.tar.gz解压
    3. 执行启动脚本 ./bin/start-example-relay.sh or_person -Y ./conf/sources-or-person.json
    4. 执行命令 curl -s http://localhost:11115/sources返回如下内容说明启动成功:

      1-1

  • 启动Client:

    1. cd build/databus2-example-client-pkg/distributions
    2. tar -zxvf databus2-example-client-pkg.tar.gz解压
    3. 执行启动脚本 ./bin/start-example-client.sh person
    4. 执行命令 curl http://localhost:11115/relayStats/outbound/http/clients返回如下内容说明启动成功:

      1-1

  • 测试:

    Relay和Client启动成功后,就已经开始对person表进行数据变更捕获了,现在向person表插入一条如下记录:

    1-1

    databus2-example-relay-pkg/distributions/logs下的relay.log记录如下:

    1-1

    databus2-example-client-pkg/distributions/logs下的client.log记录如下:

    1-1

    可以看到已经可以抓取到改变的数据了!

7. 总结

遇到的问题

  • 主要是属性文件的配置问题,包括source-or-person.json, schemas_registry的文件缺失或配置错误。
  • 脚本方式启动时JVM无法创建,由于脚本启动时包含了自定义的JVM参数,与系统环境不符导致启动失败,去掉相关参数后正常启动。
  • Relay可以获取增删改查的Event,但Client只能解析到更新操作的Event,主要原因是Mysql默认的binlog_format=MIXED,而databus的设计是针对ROW格式的binlog,修改格式后可正常解析。
  • Windows平台无法使用,启动方式是用脚本启动,脚本启动时包含命令行参数较多,启动后无法进行调试,只能通过对日志观察的方式来进行。

需要进一步实验

  • 使用bootstrap produces和bootstrap servers模式来进行大批量事件的获取
  • 配置多个relay进行事件抓取
  • 结合zookeeper来配置客户端集群进行消费

 

 

 

http://tech.lede.com/2017/05/24/rd/server/databus/

分享到:
评论

相关推荐

    databus-master.zip

    3. **高可用与一致性**:通过消息队列和分布式架构,Databus确保在节点故障时数据不丢失,保持服务的连续性。 4. **易于集成**:Databus提供了丰富的API和SDK,方便与其他系统或应用集成,构建数据管道。 5. **灵活...

    databus数据库

    4. **微服务间的数据同步**:在微服务架构中,Databus可以作为数据同步的桥梁,确保各个服务之间数据的一致性。 #### 五、总结 通过对Databus及其与MySQL二进制日志之间的关系进行深入探讨,我们可以看到,Databus...

    服务集成总线,DataBus

    下面将详细阐述与这个主题相关的知识点。 首先,服务集成是现代软件系统中的关键部分,尤其是对于大型企业级应用。它涉及到将来自不同来源、具有不同接口和协议的服务连接在一起,以实现更高效的数据交换和业务流程...

    实时数据分析服务的架构与实践.pptx

    以下是该架构的主要组成部分及其实践: 1. **数据分发**:日志数据按照不同的维度进行分拆,确保数据的有效组织和管理,方便后续的处理和分析。这通常通过消息队列(如Kafka集群)来实现,它可以提供高吞吐量和低...

    分布式数据同步系统Databus.zip

    LinkedIn最近发布了一套源无关的分布式数据同步系统Databus。简单说,就是把交易数据同步到各个不同的应用中。一个大的特色是这系统采用pull模式从log中取得数据,以达到对生产系统最小影响。 标签:...

    数据库服务化平台架构实践.pptx

    综上所述,汽车之家的数据库服务化平台架构实践涵盖了从资源管理、运维自动化、数据安全到智能分析的全方位解决方案,为企业提供了高效、智能的数据管理服务。这种架构可以为其他企业借鉴,以优化自身的数据库管理和...

    3-5+美团外卖实时数仓建设实践.pdf

    该实践涵盖了实时场景、实时技术、业务痛点、数据特点与应用场景、实时数仓架构设计、实时平台化建设等方面。 实时场景中,美团外卖涵盖了搜索推荐、用户标签、实时交互、客服、风险识别、异常交易、商家异常行为、...

    著名社交网站LinkedIn的Java架构技术.doc

    【LinkedIn的Java架构技术详解】 LinkedIn是全球知名的社交网络平台,尤其在专业领域具有广泛的影响力。该平台在2008年的JavaOne会议上展示了其基于Java技术构建的架构设计。以下是对LinkedIn架构技术的深入解析: ...

    4-8+流式数据的平台化实践与挑战.pdf

    1. **总体架构**:这部分可能介绍了贝壳找房的数据处理系统是如何设计和组织的,包括不同的组件如ETL(提取、转换、加载)过程,数据流的路径,以及与API和OLAP(在线分析处理)的交互。 2. **数据流流程**:显示了...

    DMETL_Databus

    与发布流程类似,用户也需要指定数据集和数据集列信息。 #### 2.4 数据总线路由 - **数据总线配置文件**:数据总线服务器的配置文件 (`databus.conf`) 包含了所有必要的路由信息。这些信息定义了如何将一个发布队列...

    流式数据的平台化实践与挑战.pdf

    流式数据平台化实践与挑战涉及大数据技术在大型互联网公司中的应用。流式数据处理指的是对实时产生的连续数据流进行连续处理,以达到快速响应和即时分析的目的。本文档通过分享某大厂(暂用“贝壳”指代)的实际案例...

    流式数据的平台化实践与挑战(18页).pdf

    流式数据的平台化实践与挑战是现代大数据处理领域中的一个重要议题,特别是在互联网巨头如贝壳找房这样的企业中,高效处理流式数据对于业务决策、实时监控和用户体验有着至关重要的作用。本文将从贝壳找房的实践案例...

    数据处理架构

    LinkedIn的数据处理架构围绕着三个主要方面展开:产品与服务、数据生态系统以及数据基础设施解决方案。 ### 产品与服务范围 LinkedIn提供广泛的产品和服务,覆盖了从用户个人资料到企业招聘的各个方面。例如: - ...

    Elysium-DataBus

    极速数据总线执照Copyright 2021 ZhupfLicensed under the Apache License, Version 2.0 (the "License");... softwaredistributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES O

    LinkedIn开源低延时变化数据捕获系统Databus源代码

    LinkedIn开源了其低延时变化数据捕获系统Databus,该系统可以在MySQL以及Oracle数据源上捕获数据,当下LinkedIn只开源了Oracle上的连接器。Databus作为LinkedIn生态系统中的一致性保障组件,在低延时的情况下仍然...

    SnappyData在美团酒店实时数据分析中的应用.pdf

    - 在情报分析领域,美团需要对比自身与竞争对手的数据,进行实时分析和自动跟进。初始方案是通过批处理Job结合MySQL、Hive和Kylin,但这种方法的计算逻辑复杂,且存在15分钟的滑动窗口,无法满足实时性要求。 - ...

    美团技术沙龙04-Kv Tair best practise .72d12e50-4fe7-11e6-ae32-999541cb

    本次技术沙龙主要探讨了Kv Tair的最佳实践,涵盖了从数据库领域到物理架构设计,再到SLA(服务级别协议)优化等多个方面。 1. NoSQL领域与CAP理论: - NoSQL数据库通常选择在ACID(原子性、一致性、隔离性、持久性...

    搜索实时更新系统架构介绍.pptx

    - 实时更新则是在全量更新基础上,对新增或修改的数据进行即时处理,如使用Databus和DataService来处理实时流数据,保证信息的时效性。 3. **数据融合与去重**: - 数据融合是将来自不同来源的数据整合在一起,...

    孙英男-B站大规模计算负载云原生化实践1

    此外,还集成了各种中间件,包括Discovery服务发现、Paladin配置中心、Databus消息队列、Notify消息投递、Overlord缓存代理和Akso数据库代理,以实现高效、统一的PaaS服务。 2. **微服务生命周期管理**: 微服务...

Global site tag (gtag.js) - Google Analytics