--国庆净玩了,没有好好想这个问题.周一来了之后整理了一下思路,并参考(抄袭)了网上的一些实现,现在把完整的代码给贴一下. 附上参考的链接
http://www.zhyea.com/2017/06/21/visit-hbase-with-custom-spark-rdd.html--感谢 4 楼的兄弟提供思路
提出这个问题主要是 Hbase 的 rowkey 设计为 B+ tree,Hbase 的 scan 操作性能极高.
在 Hbase 建表的时候,预分区是必要的,但是 Hbase 的数据插入分区的时候,又是和 rowkey 的初始几位密切相关
比如,我的 splitkeys 是 Array("0001|","002|","003|","004|","005|","006|","007|","008|")
我在生成 rowkey 的时候,rowkey 的前缀从上面这个数据里随机取一个,如 006|,那么与这个 rowkey 相关的数据一定会插入 start 006| end 007| 这个分区里,给 scan 操作带来很大便利.
但是随之产生的问题就是我主楼里提到的.以下代码解决了这个问题
具体的实现过程主要是两个类,一个重写了 RDD 的实现,一个用于从 hbase 拉取数据
--重写 RDD
class QueryRDD(sc: SparkContext, tableName: String, startRow: String, endRow: String, splitKeys: Array[String]) extends RDD[Map[String,String]](sc, Nil)
{
#重写该方法用于计算每一个 partition
override def compute(split: Partition, context: TaskContext): Iterator[Map[String,String]] =
{
val part = split.asInstanceOf[QueryPartition]
val results = query(part)
new InterruptibleIterator(context, results.iterator)
}
#重写该方法用于获取 partition
override protected def getPartitions: Array[Partition] =
{
val partitions = ArrayBuffer[Partition]()
for (splitKey <- splitKeys)
{
partitions += new QueryPartition(splitKey)
}
partitions.toArray
}
private def query(partition: QueryPartition) =
{
val splitKey = partition.split
val filter = null #该参数可以不为 null,即可在 scan 的同时进行 filter
val start = splitKey + startRow
val end = splitKey + endRow
HBaseClient.scan(tableName, filter, start, end)
}
}
#实现自己的 partition
class QueryPartition(splitKey: String) extends Partition
{
def split: String = splitKey
override def index: Int = splitKey.substring(0, 3).toInt
override def hashCode(): Int = index
}
以上是重写 RDD,hbase 的具体 scan 操作,在我上面的链接里可以找到,我照搬了过来.但是要注意他的 58 行,要把 startRow 改成 stopRow,不然的话其他代码写得再好都白费啦