`

Hadoop2.7.1+Hbase1.2.1集群环境搭建(9)spring-hadoop实战

阅读更多
(1)hadoop2.7.1源码编译 http://aperise.iteye.com/blog/2246856
(2)hadoop2.7.1安装准备 http://aperise.iteye.com/blog/2253544
(3)1.x和2.x都支持的集群安装 http://aperise.iteye.com/blog/2245547
(4)hbase安装准备 http://aperise.iteye.com/blog/2254451
(5)hbase安装 http://aperise.iteye.com/blog/2254460
(6)snappy安装 http://aperise.iteye.com/blog/2254487
(7)hbase性能优化 http://aperise.iteye.com/blog/2282670
(8)雅虎YCSBC测试hbase性能测试 http://aperise.iteye.com/blog/2248863
(9)spring-hadoop实战 http://aperise.iteye.com/blog/2254491
(10)基于ZK的Hadoop HA集群安装  http://aperise.iteye.com/blog/2305809

 

     1.http://spring.io/blog/2015/02/09/spring-for-apache-hadoop-2-1-released

      2.http://docs.spring.io/spring-hadoop/docs/current/reference/html/

      

       上面是两处比较好的文档,因项目没整完,整完再放所有项目源代码。这里贴两张图:


 



 

1.maven工程中添加对spring-data-hadoop的依赖

 

                <!--spring -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-beans</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context-support</artifactId>
			<version>4.1.6.RELEASE</version>
		</dependency>

		<!-- spring-hadoop -->
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-hadoop</artifactId>
			<version>2.2.0.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-hadoop-store</artifactId>
			<version>2.2.0.RELEASE</version>
			<exclusions>
				<exclusion>
					<groupId>javax.servlet</groupId>
					<artifactId>servlet-api</artifactId>
				</exclusion>
				<exclusion>
					<artifactId>netty</artifactId>
					<groupId>io.netty</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.xerial.snappy</groupId>
			<artifactId>snappy-java</artifactId>
			<version>1.1.0</version>
			<scope>runtime</scope>
		</dependency>

		<!-- hadoop -->
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.6.0</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jetty</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jetty-util</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jsp-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jsp-api-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>servlet-api-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>javax.servlet</groupId>
					<artifactId>servlet-api</artifactId>
				</exclusion>
				<exclusion>
					<groupId>javax.servlet.jsp</groupId>
					<artifactId>jsp-api</artifactId>
				</exclusion>
				<exclusion>
					<groupId>tomcat</groupId>
					<artifactId>jasper-compiler</artifactId>
				</exclusion>
				<exclusion>
					<groupId>tomcat</groupId>
					<artifactId>jasper-runtime</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-auth</artifactId>
			<version>2.6.0</version>
			<scope>compile</scope>
		</dependency>

		<!-- hbase -->
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-server</artifactId>
			<version>0.98.5-hadoop2</version>
			<exclusions>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jetty</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jetty-util</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jsp-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>jsp-api-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.mortbay.jetty</groupId>
					<artifactId>servlet-api-2.1</artifactId>
				</exclusion>
				<exclusion>
					<groupId>tomcat</groupId>
					<artifactId>jasper-compiler</artifactId>
				</exclusion>
				<exclusion>
					<groupId>tomcat</groupId>
					<artifactId>jasper-runtime</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>0.98.5-hadoop2</version>
			<scope>compile</scope>
			<exclusions>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-common</artifactId>
			<version>0.98.5-hadoop2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-protocol</artifactId>
			<version>0.98.5-hadoop2</version>
		</dependency>

		<!--zookeeper -->
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
			<exclusions>
				<exclusion>
					<artifactId>netty</artifactId>
					<groupId>io.netty</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<!--log -->
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
 

2.hadoop1.x namenode+secondarynamenode方式下spring-data-hadoop配置文件如下:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:hdp="http://www.springframework.org/schema/hadoop"
	xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/hadoop 
    http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

	<!-- 默认的hadoopConfiguration,默认ID为hadoopConfiguration,且对于file-system等不需指定ref,自动注入hadoopConfiguration -->
	<hdp:configuration>
		fs.defaultFS=hdfs://192.168.202.131:9000/
		dfs.replication=3
		dfs.client.socket-timeout=600000
	</hdp:configuration>

	<!-- hadoop hdfs 操作类FileSystem,用来读写HDFS文件 -->
	<hdp:file-system id="hadoop-cluster" uri="hdfs://192.168.202.131:9000/" />

	<!-- 配置zookeeper地址和端口 -->
	<hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="192.168.202.131,192.168.202.132,192.168.202.133" zk-port="2181">
		hbase.rootdir=hdfs://192.168.202.131:9000/hbase
		dfs.replication=3
		dfs.client.socket-timeout=600000
	</hdp:hbase-configuration>

	<!-- 配置HbaseTemplate -->
	<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
		<property name="configuration" ref="hbaseConfiguration" />
	</bean>
</beans>
 

3.Hadoop 2.x HA下spring-data-hadoop配置文件如下:

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xmlns:hdp="http://www.springframework.org/schema/hadoop"
	xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/hadoop 
    http://www.springframework.org/schema/hadoop/spring-hadoop.xsd
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

	<!-- 默认的hadoopConfiguration,默认ID为hadoopConfiguration,且对于file-system等不需指定ref,自动注入hadoopConfiguration -->
	<hdp:configuration>
		fs.defaultFS=hdfs://hadoop-ha-cluster
		dfs.client.socket-timeout=600000
		ha.zookeeper.quorum=zk1:2181,zk2:2181,zk3:2181,zk4:2181,zk5:2181
		ha.zookeeper.session-timeout.ms=300000
		dfs.nameservices=hadoop-ha-cluster
		dfs.ha.namenodes.hadoop-ha-cluster=namenode1,namenode2
		dfs.namenode.rpc-address.hadoop-ha-cluster.namenode1=hadoop31:9000
		dfs.namenode.http-address.hadoop-ha-cluster.namenode1=hadoop31:50070
		dfs.namenode.rpc-address.hadoop-ha-cluster.namenode2=hadoop32:9000
		dfs.namenode.http-address.hadoop-ha-cluster.namenode2=hadoop32:50070
		dfs.client.failover.proxy.provider.hadoop-ha-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
	</hdp:configuration>

	<!-- hadoop hdfs 操作类FileSystem,用来读写HDFS文件 -->
	<hdp:file-system id="hadoop-cluster" configuration-ref="hadoopConfiguration" />

	<!-- 配置zookeeper地址和端口 -->
	<hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="zk1,zk2,zk3,zk4,zk5" zk-port="2181">
		hbase.rootdir=hdfs://hadoop-ha-cluster/hbase
		hbase.cluster.distributed=true
		zookeeper.session.timeout=30000
		hbase.hregion.majorcompaction=0
		hbase.regionserver.regionSplitLimit=1
		dfs.client.socket-timeout=600000
	</hdp:hbase-configuration>

	<!-- 配置HbaseTemplate -->
	<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
		<property name="configuration" ref="hbaseConfiguration" />
	</bean>
</beans>
 

4.一个在J2EE项目中一个获得spring上下文的工具类

    1)在web.xml中保证配置了spring监听器,如下:

 

	<!-- spring 配置文件的加载 -->
	<context-param>
		<param-name>contextConfigLocation</param-name>
		<param-value>classpath*:/applicationContext.xml</param-value>
	</context-param>
	<!-- 监听器 -->
	<listener>
		<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
	</listener>
     2)工具类SpringContextHolder

 

 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

/**
 * 以静态变量保存Spring ApplicationContext, 可在任何代码任何地方任何时候中取出ApplicaitonContext.
 * 
 * @author calvin
 */
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

	private static ApplicationContext applicationContext = null;

	private static Logger logger = LoggerFactory.getLogger(SpringContextHolder.class);

	/**
	 * 实现ApplicationContextAware接口, 注入Context到静态变量中.
	 */
	public void setApplicationContext(ApplicationContext applicationContext) {
		logger.debug("注入ApplicationContext到SpringContextHolder:" + applicationContext);

		if (SpringContextHolder.applicationContext != null) {
			logger.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:"
					+ SpringContextHolder.applicationContext);
		}

		SpringContextHolder.applicationContext = applicationContext; //NOSONAR
	}

	/**
	 * 实现DisposableBean接口,在Context关闭时清理静态变量.
	 */
	public void destroy() throws Exception {
		SpringContextHolder.clear();
	}

	/**
	 * 取得存储在静态变量中的ApplicationContext.
	 */
	public static ApplicationContext getApplicationContext() {
		assertContextInjected();
		return applicationContext;
	}

	/**
	 * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
	 */
	@SuppressWarnings("unchecked")
	public static <T> T getBean(String name) {
		assertContextInjected();
		return (T) applicationContext.getBean(name);
	}

	/**
	 * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
	 */
	public static <T> T getBean(Class<T> requiredType) {
		assertContextInjected();
		return applicationContext.getBean(requiredType);
	}

	/**
	 * 清除SpringContextHolder中的ApplicationContext为Null.
	 */
	public static void clear() {
		logger.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
		applicationContext = null;
	}

	/**
	 * 检查ApplicationContext不为空.
	 */
	private static void assertContextInjected() {
		if (applicationContext == null) {
			throw new IllegalStateException("applicaitonContext未注入,请在applicationContext.xml中定义SpringContextHolder");
		}
	}
}
     3)工具类需要在spring配置文件中配置

 

	<!-- SpringContext Holder -->
	<bean id="springContextHolder" class="com.xxx.xxx.xxx.SpringContextHolder" lazy-init="false" />

 

5.在J2EE项目中使用HDFS

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import com.besttone.spring.SpringContextHolder;

public class FileSystemUtil {
	private static FileSystem fs = (FileSystem) SpringContextHolder.getBean("hadoop-cluster");

	public void mkdirs() throws Exception { // create HDFS folder 创建一个文件夹
		Path path = new Path("/test");
		fs.mkdirs(path);
	}

	public void create() throws Exception { // create a file 创建一个文件
		Path path = new Path("/test/a.txt");
		FSDataOutputStream out = fs.create(path);
		out.write("hello hadoop".getBytes());
	}

	public void rename() throws Exception { // rename a file 重命名
		Path path = new Path("/test/a.txt");
		Path newPath = new Path("/test/b.txt");
		System.out.println(fs.rename(path, newPath));
	}

	public void copyFromLocalFile() throws Exception { // upload a local file
														// 上传文件
		Path src = new Path("/home/hadoop/hadoop-1.2.1/bin/rcc");
		Path dst = new Path("/test");
		fs.copyFromLocalFile(src, dst);
	}
	
	// upload a local file
	// 上传文件
	public void uploadLocalFile2() throws Exception { 
		Path src = new Path("/home/hadoop/hadoop-1.2.1/bin/rcc");
		Path dst = new Path("/test");
		InputStream in = new BufferedInputStream(new FileInputStream(new File(
				"/home/hadoop/hadoop-1.2.1/bin/rcc")));
		FSDataOutputStream out = fs.create(new Path("/test/rcc1"));
		IOUtils.copyBytes(in, out, 4096);
	}

	public void listFiles() throws Exception { // list files under folder
												// 列出文件
		Path dst = new Path("/test");
		FileStatus[] files = fs.listStatus(dst);
		for (FileStatus file : files) {
			System.out.println(file.getPath().toString());
		}
	}

	public void getBlockInfo() throws Exception { // list block info of file
													// 查找文件所在的数据块
		Path dst = new Path("/test/rcc");
		FileStatus fileStatus = fs.getFileStatus(dst);
		BlockLocation[] blkloc = fs.getFileBlockLocations(fileStatus, 0,
				fileStatus.getLen()); // 查找文件所在数据块
		for (BlockLocation loc : blkloc) {
			for (int i = 0; i < loc.getHosts().length; i++)
				System.out.println(loc.getHosts()[i]);
		}
	}
}

 

6.在J2EE项目中使用hbase

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.RowMapper;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;

@Component
public class HbaseService {
	private static final Logger logger = Logger.getLogger(HbaseService.class);
	private static int FETCH_HBASE_SIZE=15000;
	@Autowired
	HbaseTemplate hbaseTemplate;

	/**
	 * 通过表名和key获取一行数据
	 * 
	 * @param tableName
	 * @param rowKey
	 * @return
	 */
	public Map<String, Object> get(String tableName, String rowKey) {
		return hbaseTemplate.get(tableName, rowKey, new RowMapper<Map<String, Object>>() {
			public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
				List<Cell> ceList = result.listCells();
				Map<String, Object> map = new HashMap<String, Object>();
				if (ceList != null && ceList.size() > 0) {
					for (Cell cell : ceList) {
						map.put(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
								+ "_"
								+ Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
										cell.getQualifierLength()),
								Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
					}
				}
				return map;
			}
		});
	}

	/**
	 * 通过表名和key获取数据,key采取最前端字符匹配方式
	 * 
	 * @param tableName
	 * @param startRow
	 * @param stopRow
	 * @return
	 */
	public List<Map<String, Object>> find(String tableName, String startRow, String stopRow) {
		logger.info("----------------------------------------------------------------------------------------------------------");
		logger.info("hbaseTemplate.getConfiguration().iterator start-----------------------------------------------------------");
		Iterator<Map.Entry<String, String>> iterator = hbaseTemplate.getConfiguration().iterator();
		while (null != iterator && iterator.hasNext()) {
			Map.Entry<String, String> entry = iterator.next();
			logger.info("key=" + entry.getKey() + ",value=" + entry.getValue());
		}
		logger.info("hbaseTemplate.getConfiguration().iterator end  -----------------------------------------------------------");
		logger.info("----------------------------------------------------------------------------------------------------------");

		if (startRow == null) {
			startRow = "";
		}
		if (stopRow == null) {
			stopRow = "";
		}
		Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
		PageFilter filter = new PageFilter(5000);
		scan.setFilter(filter);
		return hbaseTemplate.find(tableName, scan, new RowMapper<Map<String, Object>>() {
			public Map<String, Object> mapRow(Result result, int rowNum) throws Exception {
				List<Cell> ceList = result.listCells();
				Map<String, Object> map = new HashMap<String, Object>();
				String row = "";
				if (ceList != null && ceList.size() > 0) {
					for (Cell cell : ceList) {
						row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
						String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
								cell.getValueLength());
						// String family = Bytes.toString(cell.getFamilyArray(),
						// cell.getFamilyOffset(),cell.getFamilyLength());
						String quali = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
								cell.getQualifierLength());
						// map.put(family + ":" + quali, value);
						map.put(quali, value);
					}
					map.put("rowKey", row);
				}
				return map;
			}
		});
	}

	public boolean batchExcuteInsert(final TableData tableData) {
		return hbaseTemplate.execute(tableData.getTable(), new TableCallback<Boolean>() {
			public Boolean doInTable(HTableInterface table) throws Throwable {
				logger.info("into batchExcuteInsert");
				// table.setAutoFlushTo(false);
				// 缓存在服务器上/opt/hbase-1.1.2/conf/hbase-site.xml统一配置为10M,对所有HTable都生效,这里无须再设置
				// table.setWriteBufferSize(10*1024*1024);//设置缓存到达10M才提交一次
				boolean flag = false;
				if (null != tableData && null != tableData.getRows() && 0 < tableData.getRows().size()) {
					List<Put> putList = new ArrayList<Put>();
					for (RowData row : tableData.getRows()) {
						if (null == row.getColumns() || 0 == row.getColumns().size())
							continue;
						Put put = new Put(row.getRowKey());
						for (ColumnData column : row.getColumns()) {
							put.add(column.getFamily(), column.getQualifier(), column.getValue());
						}
						put.setDurability(Durability.SKIP_WAL);
						putList.add(put);
					}
					logger.info("batchExcuteInsert size=" + putList.size());
					table.put(putList);
					// table.flushCommits();
					flag = true;
				}
				logger.info("out batchExcuteInsert");
				return flag;
			}
		});
	}

	private String fillZero(String src, int length) {
		StringBuilder sb = new StringBuilder();
		if (src.length() < length) {
			for (int count = 0; count < (length - src.length()); count++) {
				sb.append("0");
			}
		}
		sb.append(src);
		return sb.toString();
	}

	/**
	 * 
	 * @param table
	 * @param called
	 * @param startTime
	 * @param endTime
	 * @param fromWeb
	 *            来自web查询为true,否则为false
	 * @return
	 */
	public List<Map<String, Object>> querySignalList(String table, String called, String startTime, String endTime,
			boolean fromWeb) {
		String tableName = table;
		String startRow = "";
		String stopRow = "";
		String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat;
		if (null == called || called.equals("")) {
			startRow = "";
			stopRow = "";
		} else {
			if (null == startTime || startTime.equals("")) {
				startRow = new StringBuffer(fillZero(called, 16)).reverse().toString();
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_signal);
				startRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			}
			if (null == endTime || endTime.equals("")) {
				String timeKey = date2Str(hbaseTimeFormat_signal, new Date());
				stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_signal);
				stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			}
		}
		return this.find(tableName, startRow, stopRow);
	}

	String hbaseTimeFormat_signal = "yyyyMMddHHmmssSSS";
	String hbaseTimeFormat_sms = "yyyyMMddHHmmss";
	String webQueryTimeFormat = "yyyy-MM-dd HH:mm:ss";
	String interfaceTimeFormat = "yyyyMMddHHmmss";

	private String date2Str(String timeFormatStr, Date date) {
		DateFormat sdf = new SimpleDateFormat(timeFormatStr);
		return sdf.format(date);
	}

	private Date str2Date(String timeFormatStr, String dateStr) {
		DateFormat sdf = new SimpleDateFormat(timeFormatStr);
		try {
			return sdf.parse(dateStr);
		} catch (ParseException e) {
			logger.error(e.getMessage(), e);
			return null;
		}
	}

	private String fromTimeStr2TimeStr(String srcTimeFormat, String srcDate, String desTimeFormat) {
		return date2Str(desTimeFormat, str2Date(srcTimeFormat, srcDate));
	}

	/**
	 * 
	 * @param table
	 *            查询哪张表
	 * @param called
	 *            查询的被叫号码
	 * @param startTime
	 *            查询的起始时间
	 * @param endTime
	 *            查询的结束时间
	 * @param page
	 *            查询的分页信息
	 * @param fromWeb
	 *            是否来自管理端页面查询,管理端页面时间格式和接口中时间格式不同
	 * @return
	 */
	public Page querySignalByPage(String table, String called, String startTime, String endTime, Page page,
			boolean fromWeb) {
		String tableName = table;
		String startRow = "";
		String stopRow = "";
		String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat;
		if (null == called || called.equals("")) {
			startRow = "";
			stopRow = "";
		} else {
			if (null == startTime || startTime.equals("")) {
				startRow = new StringBuffer(fillZero(called, 16)).reverse().toString();
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_signal);
				startRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			}
			if (null == endTime || endTime.equals("")) {
				String timeKey = date2Str(hbaseTimeFormat_signal, new Date());
				stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_signal);
				stopRow = new StringBuffer(fillZero(called, 16)).reverse().toString() + timeKey;
			}
		}
		Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
		PageFilter filter = new PageFilter(FETCH_HBASE_SIZE);
		scan.setFilter(filter);
		PageRowMapper pageRowMapper = new PageRowMapper(page);
		hbaseTemplate.find(tableName, scan, pageRowMapper);
		if(null!=pageRowMapper&&pageRowMapper.getPage().getTotal()>=FETCH_HBASE_SIZE){
			PageFilter filter2 = new PageFilter(FETCH_HBASE_SIZE*2);
			scan.setFilter(filter2);
			PageRowMapper pageRowMapper2 = new PageRowMapper(page);
			hbaseTemplate.find(tableName, scan, pageRowMapper2);
			return pageRowMapper2.getPage();
		}
		return pageRowMapper.getPage();
	}

	public Page querySmsSendResultByPage(String table, String sender, String startTime, String endTime, Page page,
			boolean fromWeb) {
		String tableName = table;
		String startRow = "";
		String stopRow = "";
		String timeFormat = fromWeb ? webQueryTimeFormat : interfaceTimeFormat;
		if (null == sender || sender.equals("")) {
			startRow = "";
			stopRow = "";
		} else {
			if (null == startTime || startTime.equals("")) {
				startRow = new StringBuffer(fillZero(sender, 25)).reverse().toString();
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, startTime, hbaseTimeFormat_sms);
				startRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey;
			}
			if (null == endTime || endTime.equals("")) {
				String timeKey = date2Str(hbaseTimeFormat_sms, new Date());
				stopRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey;
			} else {
				String timeKey = fromTimeStr2TimeStr(timeFormat, endTime, hbaseTimeFormat_sms);
				stopRow = new StringBuffer(fillZero(sender, 25)).reverse().toString() + timeKey;
			}
		}
		Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
		PageFilter filter = new PageFilter(10000);
		scan.setFilter(filter);
		PageRowMapper pageRowMapper = new PageRowMapper(page);
		hbaseTemplate.find(tableName, scan, pageRowMapper);
		System.out.println("------------------------------------------------------------");
		System.out.println("tableName:"+tableName);
		System.out.println("startRow:"+startRow);
		System.out.println("stopRow:"+stopRow);
		System.out.println("sssss:"+JSON.toJSONString(pageRowMapper.getPage()));
		System.out.println("------------------------------------------------------------");
		return pageRowMapper.getPage();
	}
}

 

  • 大小: 51.9 KB
  • 大小: 176.6 KB
分享到:
评论

相关推荐

    Hadoop2.7.1+Hbase1.2.1集群环境搭建(7)hbase 性能优化

    本篇将详细阐述如何在Hadoop 2.7.1环境下搭建HBase 1.2.1集群,并进行性能优化,以提升系统效率。 首先,我们需要了解Hadoop和HBase的基本概念。Hadoop是基于分布式文件系统HDFS(Hadoop Distributed File System)...

    hadoop2.7.1+zk3.5+hbase2.1+phoenix 安装部署环境打包

    本压缩包提供了这些组件的安装部署资源,便于快速搭建一个完整的Hadoop2.7.1、ZK3.5、HBase2.1和Phoenix5.1.0的基础环境。 首先,Hadoop是Apache开源项目,它提供了分布式文件系统(HDFS)和MapReduce计算框架,...

    hadoop2.7.1+hbase2.1.4+zookeeper3.6.2.rar

    标题 "hadoop2.7.1+hbase2.1.4+zookeeper3.6.2.rar" 提供的信息表明这是一个包含Hadoop 2.7.1、HBase 2.1.4和ZooKeeper 3.6.2的软件集合。这个压缩包可能包含了这些分布式系统的安装文件、配置文件、文档以及其他...

    虚拟机环境下Hadoop2.7.1+HBase1.3.5安装配置手册 .docx

    ### Hadoop2.7.1 + HBase1.3.5 在 CentOS6.5 虚拟机环境下的安装配置指南 #### 准备工作 为了确保 Hadoop 和 HBase 的顺利安装,需要提前做好一系列准备工作,包括安装 VMware、设置虚拟机、配置 CentOS 操作系统等...

    hadoop-2.7.1.zip

    与Hadoop 2.7.1一同提及的还有hive-1.2.1,Hive是基于Hadoop的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能。在Hive 1.2.1中,可能包含的改进有: 1. 性能优化,包括更快的查询执行...

    hadoop-common-2.7.1-bin-master.zip+hbase-2.0.0-bin.tar.gz Hbase与Hadoop版本对应

    本文将深入探讨Hadoop Common 2.7.1与HBase 2.0.0之间的关系,以及在Windows环境下如何正确安装和配置这两个组件。 Hadoop是Apache软件基金会开发的一个开源框架,主要用于处理和存储大规模数据集。Hadoop Common是...

    hadoop2.7+hbase1.0+hive1.2+zookeeper3.4.6

    在探讨Hadoop2.7.1、HBase1.0、Hive1.2以及ZooKeeper3.4.6的安装和配置时,我们首先需要了解这些组件的基本功能以及它们在整个大数据处理框架中所扮演的角色。以下对这些知识点进行详细说明: ### Hadoop2.7.1 ...

    hadoop-2.7.1

    9. **生态系统**:Hadoop 2.7.1还伴随着许多其他项目,如Hive(数据仓库工具)、Pig(数据分析平台)、HBase(NoSQL数据库)、Spark(快速大数据处理引擎)等,它们共同构成了Hadoop生态系统。 10. **安装与部署**...

    hadoop-2.7.1.tar.gz.zip

    这个名为“hadoop-2.7.1.tar.gz.zip”的文件包含了Hadoop的2.7.1版本,这是一个非常重要的里程碑,因为它包含了对Hadoop生态系统的许多改进和修复。 首先,我们要明白文件的结构。这是一个压缩文件,最外层是.zip...

    Hadoop2.7.1中文文档

    在Hadoop2.7.1中,引入了YARN(Yet Another Resource Negotiator),它作为资源管理器,负责调度集群中的计算资源,提高了系统的资源利用率和任务调度效率。YARN将原本由JobTracker承担的任务调度和资源管理职责分离...

    hadoop.zip hadoop2.7.1安装包

    总之,Hadoop2.7.1安装包提供了在Linux和Windows环境下运行Hadoop所需的一切,让开发者和数据分析师能够利用分布式计算能力处理大规模数据。无论是学习Hadoop基础知识,还是在生产环境中部署大数据解决方案,这个...

    Spark所需的hadoop2.7.1相关资源

    在Windows环境下安装和配置Hadoop2.7.1和Spark2.0.0+时,确保正确放置hadoop.dll和winutils.exe文件,并配置相应的环境变量,是成功运行Spark作业的必要步骤。用户还需要注意Java环境的配置,因为Hadoop和Spark都是...

    hadoop-2.7.1.rar

    Hadoop 2.7.1 是 Apache 基金会发布的一个开源分布式计算框架,它在大数据处理领域扮演着至关重要的角色。...通过持续的改进和优化,Hadoop 2.7.1 为用户提供了更加灵活、高效和可靠的分布式计算环境。

    hadoop 2.7.1

    Hadoop 2.7.1 是一个重要的版本,在大数据处理领域具有广泛的影响力。这个版本包含了Hadoop的核心组件,包括HDFS(Hadoop Distributed File System)和MapReduce,这两个组件是Hadoop生态系统的基础。HDFS提供了...

    hadoop-2.7.1.tar.gz

    总的来说,`hadoop-2.7.1.tar.gz` 包含了搭建、配置和运行一个功能齐全的Hadoop环境所需的所有文件,为大数据处理提供了强大的基础。无论是初学者还是经验丰富的开发者,都能从中学习到关于Hadoop分布式计算框架的...

    hadoop2.7.1安装手册.docx

    【Hadoop 2.7.1 安装详解】 在大数据处理领域,Hadoop 是一个不可或缺的开源框架,主要用于分布式存储和计算。本篇将详细阐述如何在 CentOS 6.4 系统上安装 Hadoop 2.7.1。 **一、准备工作** 1. **获取安装包** ...

    hadoop2.7.1稳定版

    9. **配置和管理**:管理员可以通过 Hadoop 的配置文件进行集群配置,如修改存储和计算参数,监控集群状态,进行故障排查等。 10. **开发和API**:Hadoop 提供了丰富的编程接口,开发者可以使用 Java API 或者基于 ...

Global site tag (gtag.js) - Google Analytics