HBase数据同步|HBase表数据导出至Hive

作者:微信小助手

发布时间:2021-12-06T08:55:29

目录导读


  • 1. 前言

  • 2. 环境介绍

  • 3. 核心代码

    • 3.1 执行命令及注意事项

    • 3.2 核心代码

  • 4. 优化建议

  • 5. 参考文章


1. 前言

本篇文章和大家分享使用 Spark 读取 HBase 表快照数据的实现细节,此方案适用于对 HBase 表中的数据进行离线 OLAP 处理或同步至 Hive 中。

2. 环境介绍

  • spark2.4跑在 kerberos 认证下的hadoop2.6.0-cdh5.13.1的 yarn 上,即大集群 A。

  • HBase 在单独的集群上,即 HBase 集群 B,集群版本是hbase2.1.0-cdh6.3.2

3. 核心代码

3.1 执行命令及注意事项

 sh /opt/spark-2.4.0-bin-hadoop2.6/bin/spark-submit \
 --master yarn \
 --queue hbase_to_hive_queue \
 --name hbase_to_hive \
 # driver-memory 可以调大点,否则可能会出现ApplicationMaster UI 打开卡死的情况
 --driver-memory 16g \
 --deploy-mode cluster \
 # 非kerberos集群无需
 --principal {your principal} \
 --keytab /home/{current_user}/{current_user}.keytab \
 --executor-memory 12G \
 --executor-cores 4 \
 --num-executors 20 \
 --conf spark.kryoserializer.buffer.max=512m \
 --conf spark.shuffle.service.enabled=true \
 --conf spark.speculation=true \
 --conf spark.network.timeout=100000000 \
 # 解决读取压缩表,发生sanppy等相关的异常
 --conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
 --conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
 --conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
 --conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
 --files /etc/hive/conf/hive-site.xml \
 --class org.bigdata.leo.hbase.bukload.HBaseExportSnapshotToHive /data/hbase1/bigdata-hbase-bukload.jar \
 # HBaseToHive需要的main函数传参,具体作用后文
 -sourceHBaseZkAddress={sourceHBaseZkAddress} \
 -sourceHBaseZkPort={sourceHBaseZkPort} \
 -sourceHBaseTableName={sourceHBaseTableName} \
 -sourceHBaseTableFamilyName={sourceHBaseTableFamilyName} \
 -sourceHBaseFieldNameToLower={sourceHBaseFieldNameToLower} \
 -sourceHBaseTablePerRegionSplitNum={sourceHBaseTablePerRegionSplitNum} \
 -sourceHBaseTableSnapshotName={sourceHBaseTableSnapshotName} \
 -sourceHBaseDefaultFS={sourceHBaseDefaultFS} \
 -sourceHBaseRootDir={sourceHBaseRootDir} \
 -tmpStoreSnapshotPath={tmpStoreSnapshotPath} \
 -targetHiveTableName={targetHiveTableName} \
 -targetHiveTableNamePrimaryKeyName={targetHiveTableNamePrimaryKeyName} \
 -targetHiveTablePartitionOrNot={targetHiveTablePartitionOrNot} \
 -targetHiveTablePartitionFieldName={targetHiveTablePartitionFieldName} \
 -scanHBaseTableStartTimestamp={scanHBaseTableStartTimestamp} \
 -scanHBaseTableEndTimestamp={scanHBaseTableEndTimestamp}

如上命令需要在大集群上提交并运行,然后读取 HBase 独立集群 B 上的数据。

大集群在 Kerberos 环境下,HBase 集群无 Kerberos,所以,Spark 命令提交时需要指定 kerberos 相关的参数,同时,Kerberos 集群访问非 Kerberos 集群的文件时,需要为设置 Hadoop 的参数:ipc.client.fallback-to-simple-auth-allowed=true

但是,这个参数无论指定在 spark-conf 中随命令一块提交,亦或是显式声明在代码里,貌似都不起作用,原因猜测,newAPIHadoopRDD 这个 API 强制加载了本地 hdfs-site.xml 文件,源码如下:

  def newAPIHadoopRDD[KVF <: NewInputFormat[KV]](
      conf: Configuration = hadoopConfiguration,
      fClass: Class[F],
      kClass: Class[K],
      vClass: Class[V]): RDD[(KV)] = withScope {
    assertNotStopped()

    // This is a hack to enforce loading hdfs-site.xml.
    // See SPARK-11227 for details.
    // newAPIHadoopRDD 这个API强制加载了本地hdfs-site.xml文件
    FileSystem.getLocal(conf)

    // Add necessary security credentials to the JobConf. Required to access secure HDFS.
    val jconf = new JobConf(conf)
    SparkHadoopUtil.get.addCredentials(jconf)
    new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
  }

最终有效的解决办法是,在大集群的客户端hdfs-site.xml高级配置中设置此参数,然后在 yarn 上分发给所有的 node-manager 节点客户端,(此步骤不用重启 hadoop 相关的服务)。

操作 HBase Snappy 压缩的表时,遇到如下报错:

·21/11/25 17:47:16 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
0.0 (TID 0, centos-bigdata-datanode, executor 1): org.apache.hadoop.hbas
e.DoNotRetryIOException: Compression algorithm 'snappy' previously faile
test.

一方面解决 snappy 相关 so 文件的安装部署问题(网上有很多资料可供参考,CDH 集群中默认会部署好),一方面在 Spark 提交命令(或环境变量)中指定 snappy(native)目录: