作者:微信小助手
发布时间:2021-12-06T08:55:29
1. 前言 2. 环境介绍 3. 核心代码 3.1 执行命令及注意事项 3.2 核心代码 4. 优化建议 5. 参考文章 本篇文章和大家分享使用 Spark 读取 HBase 表快照数据的实现细节,此方案适用于对 HBase 表中的数据进行离线 OLAP 处理或同步至 Hive 中。 HBase 在单独的集群上,即 HBase 集群 B,集群版本是 如上命令需要在大集群上提交并运行,然后读取 HBase 独立集群 B 上的数据。 大集群在 Kerberos 环境下,HBase 集群无 Kerberos,所以,Spark 命令提交时需要指定 kerberos 相关的参数,同时,Kerberos 集群访问非 Kerberos 集群的文件时,需要为设置 Hadoop 的参数: 但是,这个参数无论指定在 spark-conf 中随命令一块提交,亦或是显式声明在代码里,貌似都不起作用,原因猜测,newAPIHadoopRDD 这个 API 强制加载了本地 hdfs-site.xml 文件,源码如下: 最终有效的解决办法是,在大集群的客户端 操作 HBase Snappy 压缩的表时,遇到如下报错: 一方面解决 snappy 相关 so 文件的安装部署问题(网上有很多资料可供参考,CDH 集群中默认会部署好),一方面在 Spark 提交命令(或环境变量)中指定 snappy(native)目录:目录导读
1. 前言
2. 环境介绍
spark2.4
跑在 kerberos 认证下的hadoop2.6.0-cdh5.13.1
的 yarn 上,即大集群 A。hbase2.1.0-cdh6.3.2
。3. 核心代码
3.1 执行命令及注意事项
sh /opt/spark-2.4.0-bin-hadoop2.6/bin/spark-submit \
--master yarn \
--queue hbase_to_hive_queue \
--name hbase_to_hive \
# driver-memory 可以调大点,否则可能会出现ApplicationMaster UI 打开卡死的情况
--driver-memory 16g \
--deploy-mode cluster \
# 非kerberos集群无需
--principal {your principal} \
--keytab /home/{current_user}/{current_user}.keytab \
--executor-memory 12G \
--executor-cores 4 \
--num-executors 20 \
--conf spark.kryoserializer.buffer.max=512m \
--conf spark.shuffle.service.enabled=true \
--conf spark.speculation=true \
--conf spark.network.timeout=100000000 \
# 解决读取压缩表,发生sanppy等相关的异常
--conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
--conf spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
--conf spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/* \
--conf spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native \
--files /etc/hive/conf/hive-site.xml \
--class org.bigdata.leo.hbase.bukload.HBaseExportSnapshotToHive /data/hbase1/bigdata-hbase-bukload.jar \
# HBaseToHive需要的main函数传参,具体作用后文
-sourceHBaseZkAddress={sourceHBaseZkAddress} \
-sourceHBaseZkPort={sourceHBaseZkPort} \
-sourceHBaseTableName={sourceHBaseTableName} \
-sourceHBaseTableFamilyName={sourceHBaseTableFamilyName} \
-sourceHBaseFieldNameToLower={sourceHBaseFieldNameToLower} \
-sourceHBaseTablePerRegionSplitNum={sourceHBaseTablePerRegionSplitNum} \
-sourceHBaseTableSnapshotName={sourceHBaseTableSnapshotName} \
-sourceHBaseDefaultFS={sourceHBaseDefaultFS} \
-sourceHBaseRootDir={sourceHBaseRootDir} \
-tmpStoreSnapshotPath={tmpStoreSnapshotPath} \
-targetHiveTableName={targetHiveTableName} \
-targetHiveTableNamePrimaryKeyName={targetHiveTableNamePrimaryKeyName} \
-targetHiveTablePartitionOrNot={targetHiveTablePartitionOrNot} \
-targetHiveTablePartitionFieldName={targetHiveTablePartitionFieldName} \
-scanHBaseTableStartTimestamp={scanHBaseTableStartTimestamp} \
-scanHBaseTableEndTimestamp={scanHBaseTableEndTimestamp}ipc.client.fallback-to-simple-auth-allowed=true
。 def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
// newAPIHadoopRDD 这个API强制加载了本地hdfs-site.xml文件
FileSystem.getLocal(conf)
// Add necessary security credentials to the JobConf. Required to access secure HDFS.
val jconf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jconf)
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
}hdfs-site.xml
高级配置中设置此参数,然后在 yarn 上分发给所有的 node-manager 节点客户端,(此步骤不用重启 hadoop 相关的服务)。·21/11/25 17:47:16 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
0.0 (TID 0, centos-bigdata-datanode, executor 1): org.apache.hadoop.hbas
e.DoNotRetryIOException: Compression algorithm 'snappy' previously faile
d test.