`
Donald_Draper
  • 浏览: 971662 次
社区版块
存档分类
最新评论

kafka Connect导入/导出数据

阅读更多
Kafka目录结构:http://donald-draper.iteye.com/blog/2396760
Kafka配置文件:http://donald-draper.iteye.com/blog/2397000
Kafka Standy模式、创建主题,生产消费消息:http://donald-draper.iteye.com/blog/2397170
Kafka 集群环境搭建:http://donald-draper.iteye.com/blog/2397276
上面篇文章我们看了kafka集群环境的搭建,今天我们来使用kafka Connect导入/导出数据。
先启动kafka集群:
[donald@Donald_Draper bin]$ ./zookeeper-server-start.sh ../config/zookeeper.properties  &  
[1] 4145  
[donald@Donald_Draper bin]$  ./kafka-server-start.sh ../config/server.properties &  
[2] 4401
[donald@Donald_Draper bin]$ 
[donald@Donald_Draper bin]$  ./kafka-server-start.sh ../config/server1.properties &  
[3] 4947
[donald@Donald_Draper bin]$ 
[donald@Donald_Draper bin]$  ./kafka-server-start.sh ../config/server2.properties &  
[4] 5246
[donald@Donald_Draper bin]$ 
[donald@Donald_Draper ~]$ netstat -ntlp
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp        0      0 192.168.122.1:53        0.0.0.0:*               LISTEN      -                   
...                 
tcp6       0      0 :::9092                 :::*                    LISTEN      4401/java           
tcp6       0      0 :::9093                 :::*                    LISTEN      4947/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      4145/java           
tcp6       0      0 :::9094                 :::*                    LISTEN      5246/java           
[donald@Donald_Draper ~]$ 


从控制台写入和写回数据容易,但你可能想要从其他来源导入或导出数据到其他系统。
对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。Kafka Connect是导入和导出数据的一个工具。它是一个可扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。下面,我们将看到如何运行Kafka Connect用简单的连接器从文件导入数据到Kafka主题,再从Kafka主题导出数据到文件。
我们先简单看一下kafka connect文件source和sink配置:
[donald@Donald_Draper config]$ more connect-file-source.properties 
...
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
[donald@Donald_Draper config]$ more connect-file-sink.properties 
...
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
[donald@Donald_Draper config]$ 

从kafka connect文件source和sink配置可,猜想一下,kafka connect文件导入导出数据,实际上就是写入数据到source文件test,kafka connect将文件数据格式化,写到主题connect-test,kafka connect再讲数据写到sink文件中。


我们首先创建source 连接器的文件数据源test.txt:
[donald@Donald_Draper bin]$ vim test.txt
:wq

接下来,我们在connector standy模式下,启动2个连接器运行,这意味着它们运行在一个单一的,本地的,专用的进程。我们提供3个配置文件作为参数。第一个始终是kafka Connect进程,如kafka broker连接和数据库序列化格式,剩下的配置文件每个指定的连接器来创建,这些文件包括一个唯一的连接器名称,实例化连接器类和任何其他配置要求的。

启动connector standy模式

[donald@Donald_Draper bin]$ ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties &
[1] 7583
[donald@Donald_Draper bin]$ [2017-10-23 09:10:27,387] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-23 09:10:27,390] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,391] INFO Added plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,392] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.ExtractField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.MaskField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.SetSchemaMetadata$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.HoistField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampConverter$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,393] INFO Added plugin 'org.apache.kafka.connect.transforms.Cast$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.InsertField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,394] INFO Added plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,395] INFO Added plugin 'org.apache.kafka.connect.transforms.Flatten$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,395] INFO Added plugin 'org.apache.kafka.connect.transforms.ReplaceField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[2017-10-23 09:10:27,396] INFO Added aliases 'FileStreamSinkConnector' and 'FileStreamSink' to plugin 'org.apache.kafka.connect.file.FileStreamSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,396] INFO Added aliases 'FileStreamSourceConnector' and 'FileStreamSource' to plugin 'org.apache.kafka.connect.file.FileStreamSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,396] INFO Added aliases 'MockConnector' and 'Mock' to plugin 'org.apache.kafka.connect.tools.MockConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,396] INFO Added aliases 'MockSinkConnector' and 'MockSink' to plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,396] INFO Added aliases 'MockSourceConnector' and 'MockSource' to plugin 'org.apache.kafka.connect.tools.MockSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'SchemaSourceConnector' and 'SchemaSource' to plugin 'org.apache.kafka.connect.tools.SchemaSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'VerifiableSinkConnector' and 'VerifiableSink' to plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'VerifiableSourceConnector' and 'VerifiableSource' to plugin 'org.apache.kafka.connect.tools.VerifiableSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'ByteArrayConverter' and 'ByteArray' to plugin 'org.apache.kafka.connect.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'JsonConverter' and 'Json' to plugin 'org.apache.kafka.connect.json.JsonConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,397] INFO Added aliases 'StringConverter' and 'String' to plugin 'org.apache.kafka.connect.storage.StringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
[2017-10-23 09:10:27,398] INFO Added alias 'RegexRouter' to plugin 'org.apache.kafka.connect.transforms.RegexRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-10-23 09:10:27,399] INFO Added alias 'TimestampRouter' to plugin 'org.apache.kafka.connect.transforms.TimestampRouter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
[2017-10-23 09:10:27,399] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
connect standy 配置
[2017-10-23 09:10:27,423] INFO StandaloneConfig values: 
        access.control.allow.methods = 
        access.control.allow.origin = 
        bootstrap.servers = [localhost:9092]
        internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
        internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
        key.converter = class org.apache.kafka.connect.json.JsonConverter
        offset.flush.interval.ms = 10000
        offset.flush.timeout.ms = 5000
        offset.storage.file.filename = /tmp/connect.offsets
        plugin.path = null
        rest.advertised.host.name = null
        rest.advertised.port = null
        rest.host.name = null
        rest.port = 8083
        task.shutdown.graceful.timeout.ms = 5000
        value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:223)
[2017-10-23 09:10:27,684] INFO Logging initialized @5264ms (org.eclipse.jetty.util.log:186)
[2017-10-23 09:10:27,877] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:49)
[2017-10-23 09:10:27,877] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:70)
[2017-10-23 09:10:27,877] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:144)
[2017-10-23 09:10:27,878] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:59)
[2017-10-23 09:10:27,908] INFO Worker started (org.apache.kafka.connect.runtime.Worker:149)
[2017-10-23 09:10:27,908] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:72)
[2017-10-23 09:10:27,908] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2017-10-23 09:10:27,999] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Oct 23, 2017 9:10:28 AM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.
source连接器配置
[2017-10-23 09:10:28,752] INFO Started o.e.j.s.ServletContextHandler@393bd750{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2017-10-23 09:10:28,812] INFO Started ServerConnector@4ff4478{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2017-10-23 09:10:28,812] INFO Started @6392ms (org.eclipse.jetty.server.Server:379)
[2017-10-23 09:10:28,813] INFO REST server listening at http://192.168.126.128:8083/, advertising URL http://192.168.126.128:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2017-10-23 09:10:28,813] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2017-10-23 09:10:28,819] INFO ConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-23 09:10:28,819] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:28,819] INFO Creating connector local-file-source of type FileStreamSource (org.apache.kafka.connect.runtime.Worker:204)
[2017-10-23 09:10:28,820] INFO Instantiated connector local-file-source with version 0.11.0.1 of type class org.apache.kafka.connect.file.FileStreamSourceConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-10-23 09:10:28,825] INFO Finished creating connector local-file-source (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-23 09:10:28,826] INFO SourceConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:223)
[2017-10-23 09:10:28,826] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:28,829] INFO Creating task local-file-source-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-10-23 09:10:28,829] INFO ConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-23 09:10:28,830] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSource
        key.converter = null
        name = local-file-source
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:28,835] INFO TaskConfig values: 
        task.class = class org.apache.kafka.connect.file.FileStreamSourceTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-23 09:10:28,837] INFO Instantiated task local-file-source-0 with version 0.11.0.1 of type org.apache.kafka.connect.file.FileStreamSourceTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-23 09:10:28,874] INFO ProducerConfig values: 
        acks = all
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = null
        key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 9223372036854775807
        max.in.flight.requests.per.connection = 1
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 2147483647
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
 (org.apache.kafka.clients.producer.ProducerConfig:223)
sink 连接器配置
[2017-10-23 09:10:28,947] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-23 09:10:28,948] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-23 09:10:28,975] INFO Created connector local-file-source (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-23 09:10:28,978] INFO Source task WorkerSourceTask{id=local-file-source-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:143)
[2017-10-23 09:10:28,995] INFO ConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-23 09:10:28,996] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:28,997] INFO Creating connector local-file-sink of type FileStreamSink (org.apache.kafka.connect.runtime.Worker:204)
[2017-10-23 09:10:28,997] INFO Instantiated connector local-file-sink with version 0.11.0.1 of type class org.apache.kafka.connect.file.FileStreamSinkConnector (org.apache.kafka.connect.runtime.Worker:207)
[2017-10-23 09:10:28,997] INFO Finished creating connector local-file-sink (org.apache.kafka.connect.runtime.Worker:225)
[2017-10-23 09:10:28,998] INFO SinkConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        topics = [connect-test]
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.SinkConnectorConfig:223)
[2017-10-23 09:10:28,999] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        topics = [connect-test]
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:29,000] INFO Creating task local-file-sink-0 (org.apache.kafka.connect.runtime.Worker:358)
[2017-10-23 09:10:29,000] INFO ConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig:223)
[2017-10-23 09:10:29,000] INFO EnrichedConnectorConfig values: 
        connector.class = FileStreamSink
        key.converter = null
        name = local-file-sink
        tasks.max = 1
        transforms = null
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:223)
[2017-10-23 09:10:29,001] INFO TaskConfig values: 
        task.class = class org.apache.kafka.connect.file.FileStreamSinkTask
 (org.apache.kafka.connect.runtime.TaskConfig:223)
[2017-10-23 09:10:29,001] INFO Instantiated task local-file-sink-0 with version 0.11.0.1 of type org.apache.kafka.connect.file.FileStreamSinkTask (org.apache.kafka.connect.runtime.Worker:373)
[2017-10-23 09:10:29,027] INFO ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-local-file-sink
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:223)
[2017-10-23 09:10:29,065] INFO Kafka version : 0.11.0.1 (org.apache.kafka.common.utils.AppInfoParser:83)
[2017-10-23 09:10:29,065] INFO Kafka commitId : c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser:84)
[2017-10-23 09:10:29,071] INFO Created connector local-file-sink (org.apache.kafka.connect.cli.ConnectStandalone:91)
[2017-10-23 09:10:29,074] INFO Sink task WorkerSinkTask{id=local-file-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:247)
[2017-10-23 09:10:29,214] INFO Discovered coordinator Donald_Draper.server.com:9092 (id: 2147483647 rack: null) for group connect-local-file-sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:607)
[2017-10-23 09:10:29,217] INFO Revoking previously assigned partitions [] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-10-23 09:10:29,217] INFO (Re-)joining group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
[2017-10-23 09:10:29,261] INFO Successfully joined group connect-local-file-sink with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
[2017-10-23 09:10:29,262] INFO Setting newly assigned partitions [connect-test-0] for group connect-local-file-sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)


上述启动kafka连接器的配置文件,使用默认的本地集群配置,并创建了2个连接器:第一个是导入连接器,从source导入文件中按行读书数据,并生成消息发布到Kafka主题,第二个是导出连接器,从kafka主题以行读取消息,并输出到sink外部文件。在启动过程中,你会看到一些日志消息,包括一些连接器实例化的说明。一旦kafka Connect进程已经开始,导入连接器应该从test.txt读取每行数据写入到topic connect-test中,导出连接器从主题connect-test读取消息写入到sink输出文件test.sink.txt中。我们可以通过验证输出文件的内容来验证数据数据已经全部导出。

现在向source连接器的数据源文件test.txt,写入种子数据用来测试:

[donald@Donald_Draper ~]$ echo -e "name\ndonald" > test.txt
[donald@Donald_Draper ~]$ 


这个我们可以看到kafka connect控制有如下输出:
[2017-10-23 09:04:52,885] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 56: {connect-test-0=OffsetAndMetadata{offset=2, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:288)
[2017-10-23 09:04:53,079] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 74 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373)

从输出来看,连接器已经在处理文件数据流:


现在来查看sink输出文件
[donald@Donald_Draper bin]$ ls | grep test
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
test.sink.txt
test.txt
[donald@Donald_Draper bin]$ cat test.sink.txt
name
donald
[donald@Donald_Draper bin]$ 



注意,导入的数据也已经在Kafka主题connect-test中,所以我们可以在运行控制台消息者命令(或用消息者程序代码处理),
查看主题中的相关数据:

[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"string","optional":false},"payload":"name"}
{"schema":{"type":"string","optional":false},"payload":"donald"}
...



连接器在继续处理数据,因此我们可以添加数据到文件,观看文件数据在管道中的移动:

[donald@Donald_Draper bin]$ echo "your name?" >> test.txt


连接器控制台输出:
[donald@Donald_Draper bin]$ [2017-10-23 11:51:59,932] INFO Finished WorkerSourceTask{id=local-file-source-0} commitOffsets successfully in 122 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:373)
[2017-10-23 11:52:09,004] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 970: {connect-test-0=OffsetAndMetadata{offset=3, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:288)


查看sink文件:
[donald@Donald_Draper bin]$ cat test.sink.txt
name
donald
your name?
[donald@Donald_Draper bin]$ 


消费者终端:
[donald@Donald_Draper bin]$ ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
{"schema":{"type":"string","optional":false},"payload":"name"}
{"schema":{"type":"string","optional":false},"payload":"donald"}
{"schema":{"type":"string","optional":false},"payload":"your name?"}


从sink文件和消费者终端输出来看,kafka connect 成功处理文件数据。


再来简单看一下kafka Connect source和sink控制的的配置:
[donald@Donald_Draper config]$ more connect-console-source.properties 
...
name=local-console-source
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
topic=connect-test
[donald@Donald_Draper config]$ more connect-console-sink.properties 
...
name=local-console-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=connect-test
[donald@Donald_Draper config]$ 


这个与kafka Connect source和sink的文件不同的是,连接器类不同,同时数据源是从控制台输入的数据。

另外,kafka还有一个实时处理存储在kafka集群中的数据客户端库kafka Streams,我们引用官方的功能说明:

Kafka Streams is a client library for building mission-critical real-time applications and microservices, where the input and/or output data is stored in Kafka clusters. Kafka Streams combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's
server-side cluster technology to make these applications highly scalable, elastic, fault-tolerant, distributed, and much more. This quickstart example will demonstrate how to run a streaming application coded in this library.

实例可以在kafka的lib文件夹下的kafka-streams-examples-0.11.0.1.jar包中,有兴趣的可以研究一下。

有一篇中文文章有简单讲解,有兴趣可以查看:http://orchome.com/6



0
0
分享到:
评论

相关推荐

    kafka-connect-jdbc-4.1.1.zip

    JDBC(Java Database Connectivity)是 Kafka Connect 中的一个特定实现,它支持广泛的数据库系统,如 MySQL、PostgreSQL、Oracle 等,使得用户可以方便地将数据流导入或导出到这些数据库。 在 `kafka-connect-jdbc...

    藏经阁-BUILDING REALTIME DATA PIPELINES WITH KAFKA CONNECT AND SPAR

    它允许用户在 Kafka 集群与其他数据存储(如数据库、文件系统或云服务)之间进行大规模的数据导入和导出。Kafka Connect 的核心设计理念是实现关注点分离,确保数据摄入和分发过程的稳定性和可扩展性。它支持两种...

    kafka-connect-tools

    - JDBC 连接器:将 Kafka 与关系型数据库如 MySQL、PostgreSQL 连接,实现数据的导入导出。 - HDFS 连接器:与 Hadoop 分布式文件系统集成,进行大数据处理。 - Elasticsearch 连接器:将 Kafka 数据流实时同步到 ...

    kafka-connect-ftp:FTP服务器的Kafka连接源-监视FTP服务器上的文件并将更改输入到Kafka

    它提供了一种声明式方式来定义连接器(Connectors),这些连接器负责数据的导入和导出。`kafka-connect-ftp`是这样的一个连接器,专门针对FTP服务器的数据源。 在`kafka-connect-ftp`中,主要涉及以下几个关键概念...

    2、Kafka安装配置、快速启动及基本操作教程

    进程守护方法和前后台进程切换3、创建一个主题扩展知识:bootstrap-server和zookeeper使用区别4、删除主题5、发送消息6、消费消息7、设置多个broker集群8、使用Kafka connect来导入/导出数据 1、Kafka下载 下载Kafka...

    kafka-2.13-3.5.0+scala-2.13.2

    6. **Kafka Connect**:Kafka Connect允许轻松地集成其他系统,如数据库或Hadoop,进行数据导入和导出。 7. **Kafka Streams**:Kafka自带的轻量级流处理库,可以在Kafka主题之间转换和聚合数据,实现简单的流处理...

    kafka权威指南(中文版)

    8. **连接器(Connectors)**:Kafka Connect允许用户方便地将Kafka与其他系统(如数据库、日志系统)集成,实现数据的导入和导出。 9. **Zookeeper依赖**:Kafka使用Zookeeper作为其元数据协调器,管理集群配置、...

    kafka使用教程.rar

    Kafka Connect是一个用于将外部系统(如数据库、日志文件等)与Kafka集成的框架,可以方便地实现数据的导入和导出,实现数据湖或者数据管道的构建。 8. **监控与管理** Kafka提供Kafka Monitor、Kafka Connect ...

    kafka 可运行实例

    6. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,可以方便地导入和导出数据,比如从数据库到Kafka,或者从Kafka到数据仓库。 7. **Zookeeper**:Kafka依赖Zookeeper进行集群协调,包括管理broker、...

    kafka_2.12-2.0.0

    10. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,允许高效地导入和导出数据。 总的来说,"kafka_2.12-2.0.0"压缩包提供了完整的Kafka部署环境,包括可能的ZooKeeper配置,对于开发者和运维人员来说...

    Kafka源码剖析试读文章

    Kafka还支持通过Kafka Connect来导入和导出数据到其他系统。这个工具允许用户将数据轻松地迁移到Kafka中,或者将数据从Kafka导出到外部系统。它支持多种数据源和数据目的地,并且能够自动处理数据转换。 在Kafka的...

    kafka简介与kafka深入浅出两个资料.rar

    4. **Kafka Connect**:Kafka Connect允许无缝集成外部系统,如数据库、HDFS等,实现数据的导入导出。 5. **Kafka Streams**:Kafka Streams是Kafka提供的轻量级流处理库,可以在应用中直接处理数据流。 6. **安全性...

    kafka demo项目.zip

    Kafka Connect 是一个可扩展的框架,用于在 Kafka 与其他系统之间高效地导入导出数据。它提供了预定义的连接器(如 JDBC、HDFS 等),可以方便地将数据流入流出 Kafka,简化数据集成流程。 【Zookeeper 在 Kafka 中...

    Kafka_API_文档

    **3.7 使用kafka-connect导入导出数据** kafka-connect是一个工具,用于方便地导入导出数据到Kafka。 #### 四、Kafka生态系统 Kafka的生态系统包括多种工具和服务,如kafka-connect、Kafka Streams等,这些工具...

    crash-test-source-connector:崩溃控制的Kafka Connect连接器

    Kafka Connect 是Apache Kafka项目的一部分,它提供了一个分布式、高度可扩展的平台,用于在Kafka和其他系统之间高效地导入和导出数据。通过使用预定义的连接器(如`crash-test-source-connector`),可以方便地将...

    kafka 技术内幕 图文详解Kafka源码设计与实现

    11. **Kafka Connect**:允许用户轻松地将Kafka与其他系统(如数据库或Hadoop)集成,实现数据的导入和导出。 12. **Kafka MirrorMaker**:用于在不同的Kafka集群之间复制数据,实现灾难恢复和负载均衡。 13. **...

    Kafka技术内幕(带书签).pdf

    Kafka Connect是Kafka提供的一种用于集成外部系统的工具,它可以方便地导入和导出数据到各种数据源,如数据库、HDFS或ELK(Elasticsearch、Logstash、Kibana)堆栈。Kafka Streams是Kafka内置的轻量级流处理库,用于...

    kafka开发文档

    在使用Kafka时,可以通过Kafka Connect来导入导出数据,以及通过Kafka Streams来实现流处理应用。Kafka Connect是一种用于连接Kafka与外部系统的工具,而Kafka Streams是Kafka提供的客户端库,用于构建流处理应用。 ...

    Kafka全套学习笔记.zip

    1. Kafka Connect:用于简化数据源和Kafka之间的数据导入导出。 2. Kafka Streams:轻量级库,用于在Kafka之上构建复杂的应用程序。 3. KSQL:SQL-like查询语言,使Kafka数据流处理更加直观。 总之,Kafka以其高效...

    kafka-2.13-3.4.0.tgz

    Kafka Connect提供了一种标准接口,用于集成其他系统,如数据库、Hadoop等,实现数据的导入导出。它支持连接器(Connectors)的开发,方便开发者快速构建数据管道。 9. **Schema Registry**: 在Kafka中,消息...

Global site tag (gtag.js) - Google Analytics