实例代码:
package com.bigdata.spark.hbase;
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import com.bigdata.spark.SparkManager;
import com..bigdata.spark.pojo.Customer;
import com.bigdata.spark.sql.DataFrameManger;
public class SparkHbaseTest {
private static final Pattern SPACE = Pattern.compile("\\|");
/**
* @param args
*/
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("sparkHbase");
sparkConf.setMaster("local");
try {
String tableName="customer";
//获取 JavaSparkContext
JavaSparkContext jsc = SparkManager.getInstance().getJavaSparkContxt(sparkConf);
//使用HBaseConfiguration.create()生成Configuration
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "4180");
conf.set("hbase.zookeeper.quorum", "192.168.68.84");
//conf.set("hbase.master", "192.168.68.84:60000");
conf.set(TableInputFormat.INPUT_TABLE, tableName);
createTable(conf,tableName);
final Broadcast<String> broadcastTableName = jsc.broadcast(tableName);
SQLContext sqlContext = SparkManager.getInstance().getSQLContext(jsc);
System.out.println("=== Data source: RDD ===");
//将文件内容信息转换为 java bean
JavaRDD<String> jrdd = SparkManager.getInstance().getJavaRDD(jsc,"hdfs://192.168.68.84:9000/storm/app_hdfsBolt-5-1-1434442141499.log");
JavaRDD<Customer> customerRdd = jrdd.map(
new Function<String, Customer>() {
@Override
public Customer call(String line) {
String[] parts = line.split("\\|");
Customer customer = new Customer();
customer.setId(parts[0]);
customer.setCustomcode(parts[1]);
customer.setCode(parts[2]);
customer.setPrice(Double.parseDouble(parts[3]));
return customer;
}
});
//将 JavaRDD<Customer> 对象 序列化一个 tcustomer 表
DataFrame schemaCustomer = DataFrameManger.getInstance().getDataFrameByFile(sqlContext, customerRdd, Customer.class);
schemaCustomer.registerTempTable("tcustomer");
// 从表 tcustomer获取 符合条件的数据
DataFrame teenagers = sqlContext.sql("SELECT id,customcode,code,price FROM tcustomer WHERE price >= 110 AND price <= 500");
//从结果做获取数据,
List<Object> clist = teenagers.toJavaRDD().map(new Function<Row, Object>() {
@Override
public Object call(Row row) throws Exception {
Customer cust = new Customer();
cust.setId(row.getString(0));
cust.setCustomcode(row.getString(1));
cust.setCode(row.getString(2));
cust.setPrice(row.getDouble(3));
return cust;
}
}).collect();
//将数据转换为 javapairRDD 对象
JavaPairRDD<String, Customer> jpairCust = teenagers.toJavaRDD().mapToPair(new PairFunction<Row, String, Customer>() {
@Override
public Tuple2<String, Customer> call(Row row) throws Exception {
Customer cust = new Customer();
cust.setId(row.getString(0));
cust.setCustomcode(row.getString(1));
cust.setCode(row.getString(2));
cust.setPrice(row.getDouble(3));
return new Tuple2<String, Customer>(row.getString(0), cust);
}
});
jpairCust.foreach(new VoidFunction<Tuple2<String,Customer>>() {
@Override
public void call(Tuple2<String, Customer> t) throws Exception {
insertData(broadcastTableName.value(), t._2());
System.out.println("key:"+t._1()+"==code:"+t._2().getCustomcode());
}
});
for (Object obj: clist) {
Customer ct = (Customer) obj;
System.out.println(ct.getId()+" "+ct.getCustomcode()+" "+ct.getCode()+" "+ct.getPrice());
}
jsc.stop();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 创建表
* @param conf
* @param tableName
*/
private static void createTable(Configuration conf,String tableName){
System.out.println("start create table ......");
try {
HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建
hBaseAdmin.disableTable(tableName);
hBaseAdmin.deleteTable(tableName);
System.out.println(tableName + " is exist,detele....");
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
tableDescriptor.addFamily(new HColumnDescriptor("column1"));
tableDescriptor.addFamily(new HColumnDescriptor("column2"));
tableDescriptor.addFamily(new HColumnDescriptor("column3"));
tableDescriptor.addFamily(new HColumnDescriptor("column4"));
hBaseAdmin.createTable(tableDescriptor);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("end create table ......");
}
/**
* 插入数据
* @param tableName
* @throws IOException
*/
public static void insertData(String tableName,Customer cust) throws IOException {
System.out.println("start insert data ......");
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort", "4180");
conf.set("hbase.zookeeper.quorum", "192.168.68.84");
conf.set(TableInputFormat.INPUT_TABLE, tableName);
HTable table = new HTable(conf,tableName);
Put put = new Put("Customer".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值
put.add("column1".getBytes(), null, cust.getId().getBytes());// 本行数据的第一列
put.add("column2".getBytes(), null, cust.getCustomcode().getBytes());// 本行数据的第二列
put.add("column3".getBytes(), null, cust.getCode().getBytes());// 本行数据的第三列
put.add("column4".getBytes(), null, Double.toString(cust.getPrice()).getBytes());// 本行数据的第四列
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("end insert data ......");
}
}
相关推荐
**Kafka、Spark Streaming与HBase的集成**...6. **数据写入HBase**:将处理后的数据转换为Put操作,然后使用`hbaseContext.bulkPut`将数据批量写入HBase表。 **参考链接** 对于更详细的实现步骤,可以参考以下链接: ...
使用spark读取hbase中的数据,并插入到mysql中
这个示例代码是用 Scala 编写的,用于演示如何使用 Spark Streaming 消费来自 Kafka 的数据,并将这些数据存储到 HBase 数据库中。Kafka 是一个分布式流处理平台,而 HBase 是一个基于 Hadoop 的非关系型数据库,...
本篇文章将详细探讨如何使用 Scala 和 Spark 的 Resilient Distributed Datasets (RDDs) 与 HBase 进行交互,包括读取、写入以及删除数据。 首先,我们需要理解 Spark RDD。RDD 是 Spark 的基本数据抽象,它是不可...
在IT行业中,Java、Hive、HBase以及Spark是大数据处理和分析领域的重要工具。本压缩包"javaApi_sparkhiveAPI_hbaseAPI.zip"包含了2019年8月至10月期间针对这些技术的Java版API实现,以及与Spark相关的Hive和HBase ...
处理后的结果将被写入HBase。Spark提供了与HBase的连接库(spark-hadoop-hbase),使得我们可以直接将DataFrame或RDD的数据保存到HBase表中。这一步骤通常涉及将数据转化为HBase的行键(RowKey)和列族(Column ...
例如,使用PySpark创建DataFrame并写入HBase: ```python from pyspark.sql import SparkSession from pyspark.sql.functions import input_file_name spark = SparkSession.builder \ .appName("HBase ...
4. 使用`DataFrameWriter`的`format("org.apache.spark.sql.execution.datasources.hbase")`方法写入HBase数据。 在实际应用中,可能还需要考虑数据转换、数据清洗、数据分析等操作。例如,使用Spark SQL进行查询,...
在Java API方面,Spark提供了丰富的类库,使得开发者能够利用Java语言便捷地构建分布式数据处理应用。本篇文章将深入探讨Spark 2.1.0中的Java API及其关键知识点。 1. **SparkContext**: Spark的核心组件,它是所有...
本项目 "mongo-all-hbase" 提供了一个解决方案,使用 Apache Spark 来批量读取 MongoDB 中的所有数据库和集合,并将数据全量导入到 HBase 中。这个工具对于需要在大数据场景下进行数据迁移或整合的用户来说非常实用...
在进行数据写入HBase的操作时,首先需要创建一个SparkSession,然后通过SparkSession创建DataFrame或Dataset。接着,你可以使用HBase的连接器将DataFrame或Dataset转化为HBase表。数据读取的过程类似,先从HBase中...
然后,使用 HBase API 将处理后的数据写入表中。 5. **提交与运行**:将 Spark Streaming 应用打包成 JAR 文件,提交到 Spark 集群执行。 项目中可能包含以下关键类和方法: - `KafkaInputDStream`: Spark ...
在这篇文章中,我们使用 Spark 中的数据读取和写入方式来读取数据和写入数据。 代码实现: ```scala object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) ...
Spark 1.0.0支持多种数据源,如HDFS、Cassandra、HBase等。Java API提供`SparkContext`的`textFile()`方法读取文本文件。 ```java JavaRDD<String> textFile = sc.textFile("hdfs://path/to/input"); ``` ### 5. ...
Spark可以方便地与Hbase集成,将处理后的数据直接写入或读出,实现高效的数据存储和查询。 在Java编程环境下,这些组件可以无缝协作,提供一个完整的实时日志分析流程。开发人员可以通过Java API来编写Spark作业,...
Hbase提供了实时读写的能力,使得Spark可以快速将结果数据写入并检索,从而满足了系统的实时性要求。 5. **Java编程语言**:由于所有这些组件都提供了Java API,因此本项目使用Java进行开发,使得代码的编写和维护...
4. **Spark Streaming**:Apache Spark的实时处理模块,用于消费Kafka中的消息,对数据进行处理和转换,然后写入HBase。 5. **HBase**:一个分布式的、面向列的NoSQL数据库,适合大规模数据存储和快速随机访问。 6. ...
本文将深入探讨一个经过严格测试的HBase 1.2.2 Java测试最小依赖包,该包适用于进行基本的数据写入和读取操作。 首先,我们来理解为什么需要最小依赖包。在开发过程中,往往为了减少环境配置的复杂性,开发者会寻找...
- **数据写入**:Spark提供了多种数据输出方式,例如保存到HDFS、HBase、Cassandra等。Java版本会模拟这些操作,实现数据的持久化。 - **任务调度**:在核心模块中,调度器(Scheduler)是关键。Java实现可能包括...