`

flink入门

 
阅读更多

main方法

 

ConfigManager.initProp(configFilePath);

        topName = ConfigManager.initKafkaConfigV2By(args);
        // 初始化kafka信息
        // 初始化配置
        Map<String, KafkaWrite2SrConfig> configs = KafkaWrite2SrUtils.initConfigs2(topName);
        logger.info("当前环境" + ConfigManager.getProperty("hbase.keyPrefix"));

        ParameterTool parameter = ParameterTool.fromMap((Map) ConfigManager.getProp());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameter);
        // 默认4线程对齐消费者
        env.setParallelism(4);
        // 开启检查点,每10秒缓存一次
        env.enableCheckpointing(1000 * 60 * 1);// start a checkpoint every 1000 ms
        // 至少处理一次(满足条件)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 60);
        env.getCheckpointConfig().setCheckpointTimeout(CheckpointParamUtil.getCheckpointTimeout(topName) * 60 * 1000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 在job终止后任然保留缓存
        env.getCheckpointConfig()
                .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 当程序关闭的时候,会触发额外的checkpoints
        String evn = parameter.get("hbase.keyPrefix");
        if (!"dev".equals(evn)) {
			env.setStateBackend(new FsStateBackend(System.getProperty("ckDir"), true));// checkpoint保存地址
        }

        String jstormEnv = ConfigManager.getProperty("jstorm.env");

        DataStreamSource<String> xxxSource = FlinkCalcUtil.getNewKafkaDataStreamSource(env, "OrderInfoDetailCkSource", new SimpleStringSchema());
        SingleOutputStreamOperator<String> ckProcess = xxxSource
        		.flatMap(new CkOrderInfoFaltMap()).setParallelism(orderInfoDetailCkSource.getParallelism())
                .keyBy(k -> k.getShipmentId()).process(new KeyedProcessFunction<String, JsonToCkOrderInfoDetailResult, String>() {
					@Override
					public void processElement(JsonToCkOrderInfoDetailResult jsonToCkOrderInfoDetailResult, KeyedProcessFunction<String, JsonToCkOrderInfoDetailResult, String>.Context ctx, Collector<String> out)
							throws Exception {
						OrderInfoDetailCkResult result = new OrderInfoDetailCkResult();
						BeanCopyUtil.copyPropertyWithSourceLongToDateAndStringToNull(jsonToCkOrderInfoDetailResult, result);
                       // 将订单状态转换为对应的各个环节字段
                        OrderStatusHandlerManager.fillOrderStatusCalc(result);
						result.setRedisTm(result.getEffectiveTime());
						out.collect(JSON.toJSONString(result));
					}
				}).name("ckProcess").setParallelism(OrderCalcUtils.loadParallelism("ckProcess", 32));
        // 算子压力比较大,资源隔离
        if (!StrUtil.equals(jstormEnv, "sit")) {
            ckProcess.slotSharingGroup("process");
        }

        ckProcess.rescale()
        .addSink(StarRocksSinkHelper.getJsonSink("ads_index_order_info_detail_new_pk", OrderInfoDetailCkResult.class,"4", false))
        .name("ckSink")
        .setParallelism(OrderCalcUtils.loadParallelism("ckSink", 8));

        DataStreamSource<String> orderInfoDetailZlSource = FlinkCalcUtil.getNewKafkaDataStreamSource(env, "OrderInfoDetailZlSource", new SimpleStringSchema());
        SingleOutputStreamOperator<String> zlProcess = orderInfoDetailZlSource
		        .flatMap(new ZlOrderInfoFaltMap()).setParallelism(orderInfoDetailZlSource.getParallelism())
		        .keyBy(k -> k.getShipmentId()).process(new KeyedProcessFunction<String, JsonToZlOrderInfoDetailResult, String>() {
					@Override
					public void processElement(JsonToZlOrderInfoDetailResult jsonToZlOrderInfoDetailResult, KeyedProcessFunction<String, JsonToZlOrderInfoDetailResult, String>.Context ctx, Collector<String> out)
							throws Exception {
						OrderInfoDetailZlResult result = new OrderInfoDetailZlResult();
				        BeanCopyUtil.copyPropertyWithSourceLongToDateAndStringToNull(jsonToZlOrderInfoDetailResult, result);
						result.setRedisTm(result.getEffectiveTime());
						out.collect(JSON.toJSONString(result));
					}
				}).name("zlProcess").setParallelism(OrderCalcUtils.loadParallelism("zlProcess", 32));
        // 算子压力比较大,资源隔离
        if (!StrUtil.equals(jstormEnv, "sit")) {
            zlProcess.slotSharingGroup("process");
        }

        zlProcess.rescale()
                .addSink(StarRocksSinkHelper.getJsonSink("xxx_pk", XXX.class,"4", false))
                .name("xxx")
                .setParallelism(OrderCalcUtils.loadParallelism("xxx", 8));;

        env.execute();

  kafka source

 

private static <T> FlinkKafkaConsumer<T> getNewConsumer(String SourceName, DeserializationSchema<T> valueDeserializer,boolean ifRerun){
        String url = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_MONITORURL));
        //<groupId:clientToken>
        String clientAndToken = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_CLIENT_ID));
        String cluster = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_CLUSTERNAME));
        //Topic token
        String tokens = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_TOKENS));
        String topic = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_TOPIC));
        String brokers = ConfigManager.getProperty(getKafkaConfigKey(SourceName, Constants.KAFKA_BROKERS));

        logger.info("kafka config: url:"+url+"---clientAndToken:"+clientAndToken+"---cluster:"+cluster+"---tokens:"+tokens+"---topic:"+topic);
        //需要topic token
        String topicToken = getTopicToken(topic, tokens);
        //获取client的groupId和token
        String[] clientSplit = clientAndToken.split(Constants.COLON);
//        String clientToken = clientSplit[1];
        String groupId = clientSplit[0];

//        String zkhost = AuthUtil.getZkServers(cluster, clientToken,groupId, url);
        // 如果配置了brokers直接使用,没有配置通过topicToken获取
        brokers = getBrokers(brokers, cluster, topicToken, url, topic);

        Properties kafkaProperties = new Properties();
        kafkaProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        kafkaProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //自动发现新分区的间隔时间
        kafkaProperties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "12000");
        // 这里设置为 earliest,当发现新分区时,从新分区的最早位置开始消费, 建议结合业务需要合理配置,可选值有 (earliest、latest、none)
        kafkaProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        kafkaProperties.setProperty("client.dns.lookup","resolve_canonical_bootstrap_servers_only");
//        kafkaProperties.setProperty("zookeeper.connect", zkhost);
        FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<>(topic, valueDeserializer, kafkaProperties);
        /*if(ifRerun){ //重跑时从头消费
            kafkaConsumer.setStartFromEarliest();
        }else{ //根据消费者组的offset消费
            kafkaConsumer.setStartFromGroupOffsets();
        }*/
        // kafkaConsumer.setStartFromEarliest()已被禁止,现在只允许setStartFromGroupOffsets
        String starFlag = ConfigManager.getProperty("starFlag");
        if("earliest".equals(starFlag)) {// 从头消费
            kafkaConsumer.setStartFromEarliest();
        } else if("lastest".equals(starFlag)) { // 从最后消费
            kafkaConsumer.setStartFromLatest();
        } else { // 按偏移量消费
            kafkaConsumer.setStartFromGroupOffsets();
        }
        //提交偏移量开启缓存
        kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        return kafkaConsumer;
    }

 sink sr

public static SinkFunction<String> getJsonSink(String tableName, Class clazz, String parallelism, boolean ifTranslate) {
        String jdbcUrl = ConfigManager.getProperty("starrocks.jdbc.url");
        String loadUrl = ConfigManager.getProperty("starrocks.load.url");
        String userName = ConfigManager.getProperty("starrocks.username");
        String passWord = ConfigManager.getProperty("starrocks.password");
        String databaseName = ConfigManager.getProperty("starrocks.database.name");
        String flushInterval = ConfigManager.getProperty("starrocks.flush.interval");
        String maxFilterRatio = ConfigManager.getProperty("starrocks.max.filter.ratio", "0.2");
        List<String> orderedFiledNames = getOrderedFiledNames(clazz);
        String columns = getOrderedColumns(orderedFiledNames, ifTranslate);
        String jsonPaths = getOrderedJsonPaths(orderedFiledNames);
        StarRocksSinkOptions starRocksSinkBuilder = StarRocksSinkOptions.builder().withProperty("jdbc-url", jdbcUrl).withProperty("load-url", loadUrl).withProperty("username", userName).withProperty("password", passWord).withProperty("table-name", tableName).withProperty("database-name", databaseName).withProperty("sink.properties.format", "json").withProperty("sink.properties.columns", columns).withProperty("sink.properties.jsonpaths", jsonPaths).withProperty("sink.properties.strip_outer_array", "true").withProperty("sink.properties.ignore_json_size", "true").withProperty("sink.buffer-flush.max-bytes", "94371840").withProperty("sink.properties.max_filter_ratio", maxFilterRatio).withProperty("sink.parallelism", parallelism).withProperty("sink.semantic", StarRocksSinkSemantic.AT_LEAST_ONCE.getName()).withProperty("sink.buffer-flush.interval-ms", flushInterval).withProperty("sink.properties.column_separator", "\u0001").withProperty("sink.properties.row_delimiter", "\u0002").build();
        return StarRocksSink.sink(starRocksSinkBuilder);
    }

 

 

分享到:
评论

相关推荐

    flink入门文档.pdf

    Flink 入门文档 Flink 是一个大数据处理框架,具有批流一体、高容错、高吞吐、低延迟、大规模计算、多平台部署等核心特点。本文档将对 Flink 的核心概念、架构、API 层次进行详细介绍。 1. 核心概念 Flink 是一个...

    Flink入门及实战V1.6.1-2018最新

    Flink入门及实战最新内容分享,包含Flink基本原理及应用场景、Flink vs storm vs sparkStreaming、Flink入门案例-wordCount、Flink集群安装部署standalone+yarn、Flink-HA高可用、Flink scala shell代码调试

    最全面的flink入门编程案例

    【标题】:“最全面的Flink入门编程案例” 在大数据处理领域,Apache Flink是一个流行的开源流处理框架,它提供了一种低延迟、高吞吐量的数据处理能力,支持批处理和流处理两种模式。本篇文章将深入浅出地引导初学...

    Flink示例源码-Flink入门

    "Flink示例源码-Flink入门"是一个旨在帮助初学者理解并掌握Flink核心概念和功能的资源包。在这个压缩包中,我们可能会找到一些精心设计的Flink示例代码,这些代码将有助于我们了解如何使用Flink进行数据处理。 首先...

    Flink入门教程

    ### Flink入门教程知识点解析 #### 一、Flink开发环境搭建 **1. 必备工具** - **Java1.8版本**:Apache Flink 支持 Java 和 Scala 编程语言,并且需要 Java 环境来运行。官方推荐使用 Java 1.8 或更高版本。 - *...

    Flink入门宝典

    ### Flink入门宝典知识点详解 #### 一、Flink开发环境搭建 **Apache Flink** 是一个开源的流处理框架,它支持批量处理和事件驱动的实时数据流处理。为了能够有效地学习并使用Flink,搭建一个合适的开发环境至关...

    flink入门到精通视频和课件.txt

    flink入门到精通视频和课件,欢迎下载 flink入门到精通视频和课件,欢迎下载 flink入门到精通视频和课件欢迎下载

    Apache Flink 入门及进阶 (1).pdf

    Apache Flink 是一个开源的流处理框架,专为实时大数据分析设计。它的核心特性包括对事件流的处理、状态管理、事件时间和快照,能够高效地执行有状态的复杂计算,适用于事件驱动型、数据分析型以及数据管道(ETL)型...

    Flink入门及实战-上.pptx

    一份关于Flink的经典教程,对flink技术做全面详细的讲解,对Flink讲解深入,是难得一度的好资料

    Flink入门案例程序(中华石杉的课堂代码)

    **Flink入门案例程序概述** 本教程主要针对的是Apache Flink初学者,通过中华石杉提供的课堂代码,我们将深入理解Flink的核心概念和技术。Apache Flink是一个开源的流处理和批处理框架,它以强大的实时数据处理能力...

    2020版Flink入门到实战.txt

    教程主要分为两大部分:Flink理论基础和基于Flink的电商用户行为分析项目实战。 第一部分,是Flink基础理论的讲解,以Flink 1.10.1版本为例,涉及到各种重要概念、原理和API的用法,并且会有大量的示例代码实现; 第...

    flink零基础入门.pdf

    Apache Flink 进阶(一):Runtime 核心机制剖析 4 Apache Flink 进阶(二):时间属性深度解析 18 Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 30 Apache Flink 进阶(四):Flink on Yarn/K8s 原理...

    flink入门文档.docx

    《Flink入门详解》 Apache Flink是一款强大的实时大数据计算框架,因其强大的批流一体、高容错性、高吞吐、低延迟以及多平台部署等特性,成为了新一代的流处理首选。本文主要针对Flink的核心特点、容错机制、高吞吐...

    Flink入门与实战配套java源码

    《Flink入门与实战配套Java源码》是一个针对Apache Flink初学者和实践者的宝贵资料,它包含了使用Java编程语言实现的各种Flink示例。Apache Flink是一个流行的开源流处理和批处理框架,专为实时数据处理而设计,提供...

    Flink入门及实战教程(上+下).txt

    上集:课程主要讲述了Flink实时框架的基本操作使用以及案例实战开发。通过本课程的教学,使学生既能学习Flink的理论知识,也能掌握Flink的实战内容。为后期工作中的实时处理业务提供多一种技术选择 下集:课程主要...

    Flink入门及实战-下.pptx

    ### Flink 入门及实战知识点详解 #### 一、Flink API 抽象级别概述 Flink 提供了多种 API 抽象级别,以适应不同的应用场景和需求。主要包括: - **DataStream API**:适用于批处理和流处理,提供丰富的转换操作。 ...

    Flink 全网最全资源(视频、博客、PPT、入门、原理、实战、性能调优、源码解析、问答等持续更新)

    Flink入门首先要理解其基本概念,如DataStream API、批处理与流处理的统一模型、事件时间与处理时间等。Flink的核心在于它的DataStream API,它提供了丰富的操作符来处理数据流,包括转换、过滤、分组、窗口等。此外...

    flink入门资料

    flink基本概念与部署,DataStreamAPI,Druid数据存储,Druid基本概念。Flink应用案例教程,Flink状态管理与恢复及容错设计与指标设计。

    flink入门到精通视频教程

    10. Flink运行架构-Flink并行数据流 11. Flink运行架构-Task和Operator chain 12. Flink运行架构-任务调度与执行 13. Flink运行架构-任务槽与槽共享 第四章 Dataset开发 01.入门案例 02.入门案例-构建工程、log4j....

    Anthony-Dong#note#flink 入门1

    Flink 入门public static void main(String[] args) throws Exception {如何启动呢?首先termina

Global site tag (gtag.js) - Google Analytics