From b92c27e5c30c828dde7a901a10fb96fbcf10f90b Mon Sep 17 00:00:00 2001 From: tongxiaotian Date: Thu, 21 Mar 2024 13:54:19 +0800 Subject: [PATCH] =?UTF-8?q?[fire-1110]Hudi=E6=B3=A8=E8=A7=A3=E6=94=AF?= =?UTF-8?q?=E6=8C=81HBase=E7=B4=A2=E5=BC=95=E4=BE=BF=E6=8D=B7=E5=8C=96?= =?UTF-8?q?=E9=85=8D=E7=BD=AE-=E5=A2=9E=E5=8A=A0hoodie.index.hbase.user?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../zto/fire/core/anno/connector/Hudi.java | 4 +++ .../zto/fire/core/anno/connector/Hudi10.java | 5 +++- .../zto/fire/core/anno/connector/Hudi11.java | 5 +++- .../zto/fire/core/anno/connector/Hudi2.java | 5 +++- .../zto/fire/core/anno/connector/Hudi3.java | 4 +++ .../zto/fire/core/anno/connector/Hudi4.java | 5 +++- .../zto/fire/core/anno/connector/Hudi5.java | 5 +++- .../zto/fire/core/anno/connector/Hudi6.java | 5 +++- .../zto/fire/core/anno/connector/Hudi7.java | 5 +++- .../zto/fire/core/anno/connector/Hudi8.java | 5 +++- .../zto/fire/core/anno/connector/Hudi9.java | 5 +++- .../com/zto/fire/core/conf/AnnoManager.scala | 26 ++++++++++--------- 12 files changed, 58 insertions(+), 21 deletions(-) diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java index f07107b..1f2b3c1 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi.java @@ -216,5 +216,9 @@ public @interface Hudi { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java index 10cf733..3a32544 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi10.java @@ -216,5 +216,8 @@ public @interface Hudi10 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java index 98b3f05..32c7c0d 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi11.java @@ -216,5 +216,8 @@ public @interface Hudi11 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java index 23efe7c..3ba0706 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi2.java @@ -217,5 +217,8 @@ public @interface Hudi2 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java index 9406016..c78eecc 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi3.java @@ -217,4 +217,8 @@ public @interface Hudi3 { */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java index 6f8e0f1..662243a 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi4.java @@ -216,5 +216,8 @@ public @interface Hudi4 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java index bc1cb35..35360bf 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi5.java @@ -216,5 +216,8 @@ public @interface Hudi5 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java index eecff2b..5f4f9ce 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi6.java @@ -216,5 +216,8 @@ public @interface Hudi6 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java index 0e30544..9849c8e 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi7.java @@ -216,5 +216,8 @@ public @interface Hudi7 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java index 7c86516..7038ce7 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi8.java @@ -216,5 +216,8 @@ public @interface Hudi8 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java index 6e378b1..624d0ff 100644 --- a/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java +++ b/fire-core/src/main/java/com/zto/fire/core/anno/connector/Hudi9.java @@ -216,5 +216,8 @@ public @interface Hudi9 { * @return */ String hbaseQpsAllocatorClass() default "org.apache.hudi.index.hbase.DefaultHBaseQPSResourceAllocator"; - + /** + * hbase Index User + */ + String hbaseIndexUser() default ""; } \ No newline at end of file diff --git a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala index f4715c2..92a1bbc 100644 --- a/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala +++ b/fire-core/src/main/scala/com/zto/fire/core/conf/AnnoManager.scala @@ -306,7 +306,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._1) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._1) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._1) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._1) @@ -334,7 +334,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._2) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._2) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._2) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._2) @@ -362,7 +362,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._3) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._3) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._3) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._3) @@ -390,7 +390,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._4) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._4) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._4) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._4) @@ -418,7 +418,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._5) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._5) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._5) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._5) @@ -446,7 +446,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._6) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._6) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._6) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._6) @@ -474,7 +474,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._7) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._7) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._7) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._7) @@ -502,7 +502,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._8) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._8) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._8) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._8) @@ -530,7 +530,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._9) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._9) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._9) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._9) @@ -558,7 +558,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._10) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._10) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._10) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._10) @@ -586,7 +586,7 @@ private[fire] trait AnnoManager extends Logging { // hbase索引相关设置 this.hudiHbaseIndexConf(hudi.useHbaseIndex(), hudi.hbaseZkQuorum(), hudi.hbasePort(), hudi.hbaseTable(), hudi.hbaseZkNodePath(), hudi.hbaseRollbackSync() , hudi.hbaseUpdatePartitionPath(), hudi.hbaseGetBatchSize(), hudi.hbasePutBatchSize(), hudi.hbasePutBatchSizeAutoCompute(), hudi.hbaseMaxQpsPerRegionServer() - , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(), KeyNum._11) + , hudi.hbaseQpsFraction(), hudi.hbaseQpsAllocatorClass(),hudi.hbaseIndexUser(), KeyNum._11) // clean相关设置 this.hudiCleanConf(hudi.cleanerAsync(), hudi.cleanerPolicy(), hudi.cleanerCommitsRetained(), KeyNum._11) this.hudiCompactConf(hudi.compactCommits(), hudi.compactSchedule(), KeyNum._11) @@ -669,8 +669,9 @@ private[fire] trait AnnoManager extends Logging { , hbaseUpdatePartitionPath: Boolean, hbaseGetBatchSize: Long , hbasePutBatchSize: Long, hbasePutBatchSizeAutoCompute: Boolean , hbaseMaxQpsPerRegionServer: Long, hbaseQpsFraction: Float - , hbaseQpsAllocatorClass: String, keyNum: Int) { + , hbaseQpsAllocatorClass: String,hbaseIndexUser:String, keyNum: Int) { if (useHbaseIndex) { + requireNonEmpty(hbaseZkQuorum,hbaseTable,hbaseIndexUser){"Hudi Hbase Index需要指定zk集群地址和对应的hbase表名"} this.toHudiConf(("hoodie.index.type", "HBASE"), keyNum) this.toHudiConf(("hoodie.index.hbase.zkport", hbasePort.toString), keyNum) this.toHudiConf(("hoodie.index.hbase.zkquorum", hbaseZkQuorum), keyNum) @@ -684,6 +685,7 @@ private[fire] trait AnnoManager extends Logging { this.toHudiConf(("hoodie.index.hbase.qps.fraction", hbaseQpsFraction.toString), keyNum) this.toHudiConf(("hoodie.index.hbase.max.qps.per.region.server", hbaseMaxQpsPerRegionServer.toString), keyNum) this.toHudiConf(("hoodie.index.hbase.qps.allocator.class", hbaseQpsAllocatorClass), keyNum) + this.toHudiConf(("hoodie.index.hbase.user", hbaseIndexUser), keyNum) } } -- Gitee