一、HBaseAPI重要的概念
1.Scan
HBase中的数据表通过划分成一个个的Region来实现数据的分片,每一个Region关联一个RowKey的范围区间,而每一个Region中的数据,按RowKey的字典顺序进行组织。
正是基于这种设计,使得HBase能够轻松应对这类查询:“指定一个RowKey的范围区间,获取该区间的所有记录”, 这类查询在HBase被称之为Scan,当然了如果不指定就是全表扫描,下面是一次查询就是一次RPC访问,返回结果集给客户端。
1 . 构建Scan,指定startRow与stopRow,如果未指定的话会进行全表扫描
2 . 获取ResultScanner
3 . 遍历查询结果
4 . 关闭ResultScanner
2.Result
将 Scan 的 封装为 Result 对象 返回给 客户端。
3.Filter的Scan
Filter可以在Scan的结果集基础之上,对返回的记录设置更多条件值,这些条件可以与RowKey有关,可以与列名有关,也可以与列值有关,还可以将多个Filter条件组合在一起,一般组合在一起会是 FilterList ,但是一般不建议,可能存在漏数据的风险。
- Client每一次往RegionServer发送scan请求,都会批量拿回一批数据(由Caching决定过了每一次拿回的Results数量),然后放到本次的Result Cache中
- 应用每一次读取数据时,都是从本地的Result Cache中获取的。如果Result Cache中的数据读完了,则Client会再次往RegionServer发送scan请求获取更多的数据
二、案例分析---HBaseAPI学习
1.需求
解析如下表中的数据,定时上报的数据是按照分钟数上报,分钟后的 value=2#1#0 是要解析汇总的数据,现在就是想把 20200706 这一天的数据汇总,前提是增量的读取数据解析,会在 LastJobTime 表维护时间戳,表中的 modifyTime 大于每次解析后记录的时间戳 就读取新的 数据。
2.思路
- 遍历 Result 结果集,从 Result 中 根据 CF 和 column 直接拿出确定的字段值,比如上面的 c , ci,ct 等字段
- 将不确定的字段也就是按照时间 动态写进表中的字段 d:1300, d:1305等,首先遍历 cell ,根据cell 取出 Qualify ,根据字段的长度 大于等于4 全部读取解析,至此我们就可以 拿到按照时间列动态写进的value数据
如果大家有幸读到这里,理解我的思路就行,上面业务场景不必深究。
3.代码
package com.kangll.hbaseapi
import java.util
import com.winner.utils.KerberosUtil
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{Cell, CellUtil, CompareOperator, HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Result, Scan, Table}
import org.apache.hadoop.hbase.filter.{SingleColumnValueFilter, SubstringComparator}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ListBuffer
/** ******************************************
*
* @AUTHOR kangll
* @DATE 2020/8/11 14:47
* @DESC:
* *******************************************
*/
// 将表中解析出的数据封装为样例类
case class InOutDataHBaseTest(rowkey: String, channel: String, counterid: String, countertype: String, devicesn: String,
datatype: String, hostname: String, modifytime: String, datatime: String, inNum: Int, outNum: Int)
object HBaseAPI_Test_One {
// Kerberos认证
KerberosUtil.kerberosAuth()
private val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("spark-hbase-read")
.getOrCreate()
private val sc: SparkContext = spark.sparkContext
private val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hdp301")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
def getOriginalData() = {
import spark.implicits._
import collection.mutable._
// HBase 源数据库表
val HBASE_TAG_TABLE = "trafficData"
// 维护的时间戳,增量读取解析数据
val HBASE_LAST_JOBTIME = "LastJobTime"
// 创建连接对象
val conn: Connection = ConnectionFactory.createConnection()
val tag_table: Table = conn.getTable(TableName.valueOf(HBASE_TAG_TABLE))
val time_table: Table = conn.getTable(TableName.valueOf(HBASE_LAST_JOBTIME))
// 通过 rowkey 查询 HBase 表的 lastjobtime
val get = new Get("TrafficDateTime".getBytes())
val mdResult: Result = time_table.get(get)
// get 直接拿到 时间戳
val modifyTime: String = Bytes.toString(mdResult.getValue(Bytes.toBytes("t"), Bytes.toBytes("m")))
// 查询原始数据
val scan = new Scan()
// 单列值过滤器 当 表中的 modifyTime 大于时间戳时 增量读取解析
val mdValueFilter = new SingleColumnValueFilter(
"d".getBytes(),
"t".getBytes(),
CompareOperator.GREATER_OR_EQUAL,
new SubstringComparator(modifyTime) // 大于等于增量的时间戳
)
// scan 的条数,默认为100 扫描100 返给 客户端 Result 缓存读取
scan.setCaching(200)
// 设置过滤,下推到服务器 ,减少返回给客户端的数据量和 rowkey 指定范围结合更佳
scan.setFilter(mdValueFilter)
import collection.JavaConversions._
val iter: util.Iterator[Result] = tag_table.getScanner(scan).iterator()
// 存放定义的样例类
val basicListTmp = new ListBuffer[InOutDataHBaseTest]()
while (iter.hasNext) {
var rowkey = ""
var datatime = ""
var inNum = 0
var outNum = 0
val result: Result = iter.next()
val channel = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("c")))
val counterid = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ci")))
val countertype = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ct")))
val devicesn = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("ds")))
val datatype = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("dt")))
val hostname = Bytes.toString(result.getValue(Bytes.toBytes("d"), Bytes.toBytes("h")))
val modifytime = Bytes.toString(result.getValue(Bytes.toBytes("t"), Bytes.toBytes("md")))
rowkey = Bytes.toString(result.getRow)
// 拿到 Result 的cell ,遍历 cell 拿到 columnName后判断列名取出 3#2#1 value 值
val cells = result.listCells()
for (cell <- cells) {
// Cell工具类 获取到 列名
var cname = Bytes.toString(CellUtil.cloneQualifier(cell))
if (cname.length >= 4) {
datatime = rowkey.split("#")(1)+cname
val cvalue = Bytes.toString(CellUtil.cloneValue(cell))
val arr = cvalue.split("#")
inNum = arr(0).toInt
outNum = arr(1).toInt
println(datatime + "--------" + inNum + "------" + outNum)
// 将解析出的 cell 数据 放到 List 的样例类中
basicListTmp += InOutDataHBaseTest(rowkey, channel, counterid, countertype,
devicesn, datatype, hostname, modifytime, datatime, inNum, outNum)
}
}
}
// 获取 返回的 basicListTmp 并且返回
val basicList: ListBuffer[InOutDataHBaseTest] = basicListTmp.map(x => InOutDataHBaseTest(x.rowkey, x.channel, x.counterid, x.countertype,
x.devicesn, x.datatype, x.hostname, x.modifytime, x.datatime ,x.inNum, x.outNum))
basicList.toSet
}
def main(args: Array[String]): Unit = {
getOriginalData().foreach(println(_))
}
}
以上就是 对公司表数据的读取解析示例,当然了 读取还可以根据 rowkey 优化,因为 rowkey 是自定义设计的 ,hostname+ channel md5 加密 散列而成,可以根据 指定 rowkey 的扫描范围---- withStartRow()和withStopRow(),再加上 增量的解析数据速度就比较完美了。
参考:https://www.sohu.com/a/284932698_100109711