`
gaojingsong
  • 浏览: 1197426 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
文章分类
社区版块
存档分类
最新评论

Flink操作Hive数据库

 
阅读更多

Flink操作Hive数据库

DataStream<String> text = streamExecutionEnvironment.addSource(flinkKafkaConsume);

text .addSink(new SinkHIve());

 

public class SinkHive extends RichSinkFunction<String> implements SinkFunction<String> {

    private static String driverName = "org.apache.hive.jdbc.HiveDriver";  //驱动名称

    private static String url = "jdbc:hive2://10.10.82.137:10000/xxx";

    private static String user ="";

    private static String password ="";

 

    private Connection connection;

    private PreparedStatement statement;

 

    // 1,初始化

    @Override

    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        Class.forName(driverName);

        connection = DriverManager.getConnection(url, user, password);

    }

 

    // 2,执行

    @Override

    public void invoke(String value, Context context) throws Exception {

    //####invoke######{"name":"gaojs0","id":"0010"}

        System.out.println("##########invoke#############" + value);

        

        Map<String,Object> map = JSONObject.parseObject(value);

        String tableName = "test";

        String sql = "INSERT INTO " + tableName + "( ";

        String sql2 = " VALUES( ";

        for (Map.Entry entry : map.entrySet()) {

            sql += entry.getKey() + ", ";

            sql2 += cover(entry.getValue()) + ", ";

        }

        

        System.out.println("######sql#######" + sql);

        System.out.println("######sql2#######" + sql2);

        

        String s1 = sql.substring(0, sql.length() - 2);

        String s2 = sql2.substring(0, sql2.length() - 2);

        String s3 = s1 + ") " + s2 + ")";

        System.out.println("######s3#######" + s3);

 

        

        if (value != null && value != "") {

        statement = connection.prepareStatement(s3);

            statement.execute();

        }

    }

 

    // 3,关闭

    @Override

    public void close() throws Exception {

        super.close();

        if (statement != null)

            statement.close();

        if (connection != null)

            connection.close();

    }

    

    public static String cover(Object value) {

        if (value instanceof String || value instanceof Character) {

            return "'" + value + "'";

        } else {

            return value + "";

        }

    }

 

此种方式操作的数据库,Hive会产生很多小文件,需要解决

分享到:
评论

相关推荐

    flink-connector-hive-2.12-1.13.1.jar

    flink-connector-hive_2.12-1.13.1.jar 是 Apache Flink 的一个 Hive 连接器 JAR...元数据同步:Flink 可以读取 Hive 的元数据,包括数据库、表和分区的信息,从而在 Flink SQL 中直接使用这些表。 数据读取和写入:Fl

    基于hadoop平台hive数据库处理电影数据(8965字数32页).doc

    【标题】:“基于Hadoop平台Hive数据库处理电影数据”的文档详细介绍了如何在Hadoop分布式环境中利用Hive进行大规模电影数据的分析。该系统的主要目标是建立一个分布式Hadoop集群,并在此基础上对电影数据进行深入...

    java 大数据 spark flink redis hive hbase kafka 面试题 数据结构 算法 设计模式.zip

    7. **HBase**:HBase是基于Hadoop的分布式列式数据库,适合半结构化数据的存储。面试中会讨论HBase的模型(行、列族、列、版本)、RegionServer、Zookeeper的作用、读写流程以及HBase与Hadoop的集成。 8. **Kafka**...

    flink非官方jar.zip

    6. **MySQL驱动**:`mysql-connector-java-5.1.47.jar` 是MySQL数据库的JDBC驱动,如果Hive Metastore使用MySQL作为后端数据库,这个驱动是必须的。 7. **Jackson库**:`jackson-core-2.10.1.jar` 和 `javax.jdo-...

    字节跳动基于Flink的MQ Hive实时数据集成.pdf

    本文档主要探讨了字节跳动如何使用Apache Flink进行MQ(消息队列)到Hive的实时数据集成。字节跳动在2016年建立了一站式数据中台Dataleap,专注于大规模数据下的分布式计算和存储,提供高效可靠的数据集成解决方案。...

    大数据基础面试题hadoop,zookeeper,hbase,hive,spark,kafka,flink,clickhouse

    例如,Hadoop 的数据块大小设置、Zookeeper 的会话超时机制、HBase 的 Region 分裂、Hive 的分区表设计、Spark 的容错机制、Kafka 的生产者消费者模型、Flink 的状态持久化和ClickHouse 的索引策略等都是常见的面试...

    Hadoop+Spark+Hive+HBase+Oozie+Kafka+Flume+Flink+ES+Redash等详细安装部署

    在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...

    hadoop,hbase,hive版本整合兼容性最全,最详细说明【适用于任何版本】

    Hive是一个数据仓库基础构架,建立在Hadoop之上,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。ZooKeeper是一个开源的分布式协调服务,它为分布式应用提供一致性服务。 为了整合这些组件,首先...

    Flink 同步数据+mongo<->hive+支持复杂类型

    ### Flink 同步数据+mongo&lt;-&gt;hive+支持复杂类型 #### 一、Flink 简介 Apache Flink 是一个用于处理无界和有界数据流的开源流处理框架。它提供了高吞吐量、低延迟的数据处理能力,并且支持事件时间处理、状态管理等...

    【FlinkSql篇02】FlinkSql之Table操作1

    FlinkSql提供了多种方式来连接外部系统,例如文件系统、Kafka、ElasticSearch、MySql、HBase、Hive等。连接外部系统需要传入一个ConnectorDescriptor,也就是connector描述器。对于文件系统的connector而言,Flink...

    Apache Flink 1.17 完整版资料包,含最新JDBC、Hive、Kafka连接器,加速数据驱动决策

    还汇集了多款热门连接器,如flink-connector-jdbc-1.17针对数据库的高效集成,flink-sql-connector-hive-3.1.3_2.12-1.17.0与Hive的无缝对接,flink-sql-connector-kafka-1.17.0实现Kafka数据流的高效处理,以及...

    Hudi on Flink在顺丰的实践应用.pdf

    在描述中提及了顺丰业务介绍、顺丰大数据业务全景图、顺丰IOT大数据应用全景图、顺丰科技大数据技术矩阵、顺丰数据应用架构、数据库实时化、Hudi数仓宽表方案以及Flink替换Hive演进等内容。这意味着顺丰在运用大数据...

    基于Flink ClickHouse构建实时数据平台.pdf

    Flink-to-Hive是指使用Flink将数据从Kafka流式传输到Hive数据库中。这个过程需要使用Flink的流处理能力和Hive的数据存储能力。 5.Flink-to-ClickHouse Flink-to-ClickHouse是指使用Flink将数据从Kafka流式传输到...

    本项目为大数据基础镜像组件,其中包括Hadoop、Spark、Hive、Tez、Hue、Flink、MySQL等

    3. **Hive**:Hive是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,使得非程序员也能对大数据进行分析。Hive查询语言HQL与SQL类似,降低了大数据处理的门槛。 4. **Tez**...

    大数据hadoop,spark,hive等等面试汇总

    再来看Hive,它是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。Hive的优势在于它的易用性和可扩展性,适合离线分析。面试中可能涉及HiveQL语法、Hive与传统SQL的区别、...

    Flink大数据讲义1

    - OLAP系统如Hadoop、Impala和Hive常用于大数据分析,提供对大量历史数据的快速查询和复杂分析能力。 - 一种常见的OLAP实现方案是结合HBase和HDFS,但这种方案可能面临架构复杂、时效性低以及运维成本高等问题。 ...

Global site tag (gtag.js) - Google Analytics