Flink操作Hive数据库
DataStream<String> text = streamExecutionEnvironment.addSource(flinkKafkaConsume);
text .addSink(new SinkHIve());
public class SinkHive extends RichSinkFunction<String> implements SinkFunction<String> {
private static String driverName = "org.apache.hive.jdbc.HiveDriver"; //驱动名称
private static String url = "jdbc:hive2://10.10.82.137:10000/xxx";
private static String user ="";
private static String password ="";
private Connection connection;
private PreparedStatement statement;
// 1,初始化
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName(driverName);
connection = DriverManager.getConnection(url, user, password);
}
// 2,执行
@Override
public void invoke(String value, Context context) throws Exception {
//####invoke######{"name":"gaojs0","id":"0010"}
System.out.println("##########invoke#############" + value);
Map<String,Object> map = JSONObject.parseObject(value);
String tableName = "test";
String sql = "INSERT INTO " + tableName + "( ";
String sql2 = " VALUES( ";
for (Map.Entry entry : map.entrySet()) {
sql += entry.getKey() + ", ";
sql2 += cover(entry.getValue()) + ", ";
}
System.out.println("######sql#######" + sql);
System.out.println("######sql2#######" + sql2);
String s1 = sql.substring(0, sql.length() - 2);
String s2 = sql2.substring(0, sql2.length() - 2);
String s3 = s1 + ") " + s2 + ")";
System.out.println("######s3#######" + s3);
if (value != null && value != "") {
statement = connection.prepareStatement(s3);
statement.execute();
}
}
// 3,关闭
@Override
public void close() throws Exception {
super.close();
if (statement != null)
statement.close();
if (connection != null)
connection.close();
}
public static String cover(Object value) {
if (value instanceof String || value instanceof Character) {
return "'" + value + "'";
} else {
return value + "";
}
}
此种方式操作的数据库,Hive会产生很多小文件,需要解决
相关推荐
flink-connector-hive_2.12-1.13.1.jar 是 Apache Flink 的一个 Hive 连接器 JAR...元数据同步:Flink 可以读取 Hive 的元数据,包括数据库、表和分区的信息,从而在 Flink SQL 中直接使用这些表。 数据读取和写入:Fl
【标题】:“基于Hadoop平台Hive数据库处理电影数据”的文档详细介绍了如何在Hadoop分布式环境中利用Hive进行大规模电影数据的分析。该系统的主要目标是建立一个分布式Hadoop集群,并在此基础上对电影数据进行深入...
7. **HBase**:HBase是基于Hadoop的分布式列式数据库,适合半结构化数据的存储。面试中会讨论HBase的模型(行、列族、列、版本)、RegionServer、Zookeeper的作用、读写流程以及HBase与Hadoop的集成。 8. **Kafka**...
6. **MySQL驱动**:`mysql-connector-java-5.1.47.jar` 是MySQL数据库的JDBC驱动,如果Hive Metastore使用MySQL作为后端数据库,这个驱动是必须的。 7. **Jackson库**:`jackson-core-2.10.1.jar` 和 `javax.jdo-...
本文档主要探讨了字节跳动如何使用Apache Flink进行MQ(消息队列)到Hive的实时数据集成。字节跳动在2016年建立了一站式数据中台Dataleap,专注于大规模数据下的分布式计算和存储,提供高效可靠的数据集成解决方案。...
例如,Hadoop 的数据块大小设置、Zookeeper 的会话超时机制、HBase 的 Region 分裂、Hive 的分区表设计、Spark 的容错机制、Kafka 的生产者消费者模型、Flink 的状态持久化和ClickHouse 的索引策略等都是常见的面试...
在大数据领域,构建一个完整的生态系统是至关重要的,其中包括多个组件,如Hadoop、Spark、Hive、HBase、Oozie、Kafka、Flume、Flink、Elasticsearch和Redash。这些组件协同工作,提供了数据存储、处理、调度、流...
Hive是一个数据仓库基础构架,建立在Hadoop之上,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。ZooKeeper是一个开源的分布式协调服务,它为分布式应用提供一致性服务。 为了整合这些组件,首先...
### Flink 同步数据+mongo<->hive+支持复杂类型 #### 一、Flink 简介 Apache Flink 是一个用于处理无界和有界数据流的开源流处理框架。它提供了高吞吐量、低延迟的数据处理能力,并且支持事件时间处理、状态管理等...
FlinkSql提供了多种方式来连接外部系统,例如文件系统、Kafka、ElasticSearch、MySql、HBase、Hive等。连接外部系统需要传入一个ConnectorDescriptor,也就是connector描述器。对于文件系统的connector而言,Flink...
还汇集了多款热门连接器,如flink-connector-jdbc-1.17针对数据库的高效集成,flink-sql-connector-hive-3.1.3_2.12-1.17.0与Hive的无缝对接,flink-sql-connector-kafka-1.17.0实现Kafka数据流的高效处理,以及...
在描述中提及了顺丰业务介绍、顺丰大数据业务全景图、顺丰IOT大数据应用全景图、顺丰科技大数据技术矩阵、顺丰数据应用架构、数据库实时化、Hudi数仓宽表方案以及Flink替换Hive演进等内容。这意味着顺丰在运用大数据...
Flink-to-Hive是指使用Flink将数据从Kafka流式传输到Hive数据库中。这个过程需要使用Flink的流处理能力和Hive的数据存储能力。 5.Flink-to-ClickHouse Flink-to-ClickHouse是指使用Flink将数据从Kafka流式传输到...
3. **Hive**:Hive是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,使得非程序员也能对大数据进行分析。Hive查询语言HQL与SQL类似,降低了大数据处理的门槛。 4. **Tez**...
再来看Hive,它是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。Hive的优势在于它的易用性和可扩展性,适合离线分析。面试中可能涉及HiveQL语法、Hive与传统SQL的区别、...
- OLAP系统如Hadoop、Impala和Hive常用于大数据分析,提供对大量历史数据的快速查询和复杂分析能力。 - 一种常见的OLAP实现方案是结合HBase和HDFS,但这种方案可能面临架构复杂、时效性低以及运维成本高等问题。 ...