diff --git a/fire-enhance/flink-json/pom.xml b/fire-enhance/flink-json/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..347979954acbaa7573b37280311762b862daa029
--- /dev/null
+++ b/fire-enhance/flink-json/pom.xml
@@ -0,0 +1,28 @@
+
+
+
+ fire-enhance
+ com.zto.fire
+ 2.4.1-SNAPSHOT
+
+ 4.0.0
+
+ flink-json
+
+
+ 8
+ 8
+
+
+
+
+ org.apache.flink
+ flink-json
+ ${flink.version}
+ ${maven.scope}
+
+
+
+
\ No newline at end of file
diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java
new file mode 100644
index 0000000000000000000000000000000000000000..11febc526dc4dfc25f7340d4e26dbe9e09795193
--- /dev/null
+++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java
@@ -0,0 +1,215 @@
+package com.zto.fire.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import com.zto.fire.flink.formats.json.ZDTPJsonDeserializationSchema.MetadataConverter;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+
+public class ZDTPJsonDecodingFormat implements DecodingFormat> {
+ private List metadataKeys;
+ private final boolean ignoreParseErrors;
+
+ private final boolean cdcWriteHive;
+
+ private final TimestampFormat timestampFormatOption;
+
+ public ZDTPJsonDecodingFormat(boolean ignoreParseErrors, boolean cdcWriteHive, TimestampFormat timestampFormatOption) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.cdcWriteHive = cdcWriteHive;
+ this.timestampFormatOption = timestampFormatOption;
+ this.metadataKeys = Collections.emptyList();
+ }
+
+ @Override
+ public DeserializationSchema createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType) {
+
+ final List readableMetadata =
+ Stream.of(ReadableMetadata.values())
+ .filter(m -> metadataKeys.contains(m.key))
+ .collect(Collectors.toList());
+
+
+ final List metadataFields =
+ readableMetadata.stream()
+ .map(m -> DataTypes.FIELD(m.key, m.dataType))
+ .collect(Collectors.toList());
+
+ final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields);
+
+ final TypeInformation producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+
+ return new ZDTPJsonDeserializationSchema(
+ physicalDataType, readableMetadata, producedTypeInfo, ignoreParseErrors, timestampFormatOption);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return cdcWriteHive ? ChangelogMode.insertOnly() : ChangelogMode.all();
+ }
+
+ @Override
+ public Map listReadableMetadata() {
+ final Map metadataMap = new LinkedHashMap<>();
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+ return metadataMap;
+ }
+
+ // 所有 json中的metadataKeys
+ @Override
+ public void applyReadableMetadata(List metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ }
+
+ /**
+ * zdtp 字段中的metadata
+ * {
+ * "schema":"route_order",
+ * "table":"route_order_001",
+ * "gtid":"ldjfdlfj-ldfjdl",
+ * "logFile":"0002334.bin",
+ * "offset":"dfle-efe",
+ * "pos":12384384,
+ * "when":123943434383,
+ * "before":{},
+ * "after":{}
+ * }
+ */
+ enum ReadableMetadata {
+ SCHEMA(
+ "schema",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("schema", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ TABLE(
+ "table",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("table", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ GTID(
+ "gtid",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("gtid", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ LOGFILE(
+ "logFile",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("logFile", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ OFFSET(
+ "offset",
+ DataTypes.STRING().nullable(),
+ DataTypes.FIELD("offset", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getString(pos);
+ }
+ }),
+ POS(
+ "pos",
+ DataTypes.BIGINT().nullable(),
+ DataTypes.FIELD("pos", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ return row.getLong(pos);
+ }
+ }),
+ WHEN(
+ "when",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+ DataTypes.FIELD("when", DataTypes.BIGINT()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+ if (row.isNullAt(pos)) return null;
+ return TimestampData.fromEpochMillis(row.getLong(pos));
+ }
+ }),
+ // 将binlog中的RowKind带出去,当作元数据
+ ROW_KIND(
+ "row_kind",
+ DataTypes.STRING().notNull(),
+ DataTypes.FIELD("row_kind", DataTypes.STRING()),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Object convert(GenericRowData row, int pos) {
+
+ return StringData.fromString(row.getRowKind().toString());
+ }
+ }
+ )
+ ;
+
+ final String key;
+
+ final DataType dataType;
+
+ final DataTypes.Field requiredJsonField;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, DataTypes.Field requiredJsonField, MetadataConverter converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.requiredJsonField = requiredJsonField;
+ this.converter = converter;
+ }
+ }
+}
diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java
new file mode 100644
index 0000000000000000000000000000000000000000..13e3318a061697670dd76ac5649da7111f883dd5
--- /dev/null
+++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java
@@ -0,0 +1,214 @@
+package com.zto.fire.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+
+public class ZDTPJsonDeserializationSchema implements DeserializationSchema {
+ private static final long serialVersionUID = 1L;
+ private static final String OP_TYPE = "op_type";
+ private static final String BEFORE = "before";
+ private static final String AFTER = "after";
+ private static final String OP_INSERT = "I";
+ private static final String OP_UPDATE = "U";
+ private static final String OP_DELETE = "D";
+
+ /** Flag that indicates that an additional projection is required for metadata. */
+ private final boolean hasMetadata;
+
+ private final JsonRowDataDeserializationSchema jsonDeserializer;
+
+ /** Metadata to be extracted for every record. */
+ private final MetadataConverter[] metadataConverters;
+
+ private final TypeInformation producedTypeInfo;
+
+ private final List fieldNames;
+
+ private final boolean ignoreParseErrors;
+
+
+ private final int fieldCount;
+
+ public ZDTPJsonDeserializationSchema(
+ DataType physicalDataType,
+ List readableMetadata,
+ TypeInformation producedTypeInfo,
+ boolean ignoreParseErrors,
+ TimestampFormat timestampFormatOption) {
+ final RowType jsonRowType = createJsonRowType(physicalDataType, readableMetadata);
+ this.jsonDeserializer =
+ new JsonRowDataDeserializationSchema(
+ jsonRowType,
+ producedTypeInfo,
+ false,
+ ignoreParseErrors,
+ timestampFormatOption);
+ this.hasMetadata = readableMetadata.size() > 0;
+
+ this.metadataConverters = createMetadataConverters(jsonRowType, readableMetadata);
+
+ this.producedTypeInfo = producedTypeInfo;
+
+ this.ignoreParseErrors = ignoreParseErrors;
+
+ final RowType physicalRowType = (RowType) physicalDataType.getLogicalType();
+
+ this.fieldNames = physicalRowType.getFieldNames();
+
+ this.fieldCount = physicalRowType.getFieldCount();
+ }
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead.");
+ }
+
+ @Override
+ public void deserialize(byte[] message, Collector collector) throws IOException {
+ try {
+ GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
+ String type = row.getString(0).toString();
+ GenericRowData before = (GenericRowData) row.getField(1); // before filed
+ GenericRowData after = (GenericRowData) row.getField(2); // after field
+ if (OP_INSERT.equals(type)) {
+ // "data" field is a row, contains inserted rows
+ after.setRowKind(RowKind.INSERT);
+ emitRow(row, after, collector);
+ } else if (OP_UPDATE.equals(type)) {
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ emitRow(row, before, collector);
+ emitRow(row, after, collector);
+ } else if (OP_DELETE.equals(type)) {
+ before.setRowKind(RowKind.DELETE);
+ emitRow(row, before, collector);
+ } else {
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format(
+ "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'",
+ type, new String(message)));
+ }
+ }
+ }catch (Throwable t) {
+ if (!ignoreParseErrors) {
+ throw new IOException(
+ format("Corrupt ZDTP JSON message '%s'.", new String(message)), t);
+ }
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return producedTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+
+ if (obj == null || getClass() != obj.getClass()) return false;
+
+ ZDTPJsonDeserializationSchema that = (ZDTPJsonDeserializationSchema) obj;
+ return ignoreParseErrors == that.ignoreParseErrors
+ && fieldCount == that.fieldCount
+ && Objects.equals(jsonDeserializer, that.jsonDeserializer)
+ && Objects.equals(producedTypeInfo, that.producedTypeInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jsonDeserializer,
+ producedTypeInfo,
+ ignoreParseErrors,
+ fieldCount);
+ }
+
+ private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector collector) {
+ if (!hasMetadata) {
+ collector.collect(physicalRow);
+ return;
+ }
+ int physicalArity = physicalRow.getArity();
+ int metadataArity = metadataConverters.length;
+
+ GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity);
+
+ for (int physicalPos = 0; physicalPos < physicalArity; physicalPos ++) {
+ producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
+ }
+ RowKind rootRowRowKind = rootRow.getRowKind();
+ // `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL 从rootRow convert 元数据时,
+ // 由于rootRow的RowKind总是为 INSERT, 获取不到预期的结果,所以每次设置为physicalRow的 RowKind
+ rootRow.setRowKind(physicalRow.getRowKind());
+ for (int metadataPos = 0; metadataPos < metadataArity; metadataPos ++) {
+ producedRow.setField(physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow));
+ }
+ // 还原rootRow的RowKind
+ rootRow.setRowKind(rootRowRowKind);
+ collector.collect(producedRow);
+ }
+
+ private static RowType createJsonRowType(
+ DataType physicalDataType, List readableMetadata) {
+
+ DataType root =
+ DataTypes.ROW(
+ DataTypes.FIELD(OP_TYPE, DataTypes.STRING()),
+ DataTypes.FIELD(BEFORE, physicalDataType),
+ DataTypes.FIELD(AFTER, physicalDataType));
+ // append fields that are required for reading metadata in the root
+ final List rootMetadataFields =
+ readableMetadata.stream()
+ .map(m -> m.requiredJsonField)
+ .distinct()
+ .collect(Collectors.toList());
+ return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
+ }
+
+ private static MetadataConverter[] createMetadataConverters(
+ RowType jsonRowType, List requestedMetadata) {
+ return requestedMetadata.stream()
+ .map(m -> convert(jsonRowType, m))
+// .map(m -> (MetadataConverter) (row, pos) -> m.converter.convert(row, jsonRowType.getFieldNames().indexOf(m.requiredJsonField.getName())))
+ .toArray(MetadataConverter[]::new);
+ }
+
+ private static MetadataConverter convert(RowType jsonRowType, ZDTPJsonDecodingFormat.ReadableMetadata metadata) {
+ final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName());
+ return (root, unused) -> metadata.converter.convert(root, pos);
+ }
+
+ interface MetadataConverter extends Serializable {
+ default Object convert(GenericRowData row) {
+ return convert(row, -1);
+ }
+
+ Object convert(GenericRowData row, int pos);
+ }
+}
diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..b2b04f57dce509ab2642842f54091d8dca84a5e7
--- /dev/null
+++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java
@@ -0,0 +1,69 @@
+package com.zto.fire.flink.formats.json;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptionsUtil;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.zto.fire.flink.formats.json.ZDTPJsonOptions.*;
+
+
+/**
+ * 描述:flink 官网只提供了 canal maxwell debezium三种CDC connector,
+ * 由于目前使用的是中通自己开发的ZDTP 组件采集的binlog,
+ * 所以要写一个关于ZDTP组件来解析CDC消息,
+ * create table rocket_mq_table(
+ * `bill_code` string,
+ * `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL, // 从binlog中提取的rowKind
+ * `mq_topic` STRING METADATA FROM 'topic' VIRTUAL, // topic
+ * `mq_broker` STRING METADATA FROM 'broker' VIRTUAL, // broker
+ * `id` bigint,
+ * customize_id string
+ * ) WITH (
+ * 'connector' = '...',
+ * 'format' = 'zdtp-json',
+ * 'zdtp-json.cdc-write-hive' = 'true', // 添加此参数,下游可以直接insert 到hive
+ * ......
+ * )
+ */
+public class ZDTPJsonFormatFactory implements DeserializationFormatFactory {
+ private static final String IDENTIFIER = "zdtp-json";
+ @Override
+ public DecodingFormat> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ final boolean cdcWriteHive = formatOptions.get(CDC_WRITE_HIVE);
+ final TimestampFormat timestampFormatOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
+
+ return new ZDTPJsonDecodingFormat(ignoreParseErrors, cdcWriteHive, timestampFormatOption);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> options = new HashSet<>();
+ options.add(IGNORE_PARSE_ERRORS);
+ options.add(TIMESTAMP_FORMAT);
+ options.add(JSON_MAP_NULL_KEY_MODE);
+ options.add(JSON_MAP_NULL_KEY_LITERAL);
+ return options;
+ }
+}
diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java
new file mode 100644
index 0000000000000000000000000000000000000000..d3dcbd972a236eefabe1f7d05ca76972420b2792
--- /dev/null
+++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java
@@ -0,0 +1,25 @@
+package com.zto.fire.flink.formats.json;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.formats.json.JsonFormatOptions;
+
+
+public class ZDTPJsonOptions {
+ public static final ConfigOption IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS;
+
+ public static final ConfigOption TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT;
+
+ public static final ConfigOption JSON_MAP_NULL_KEY_MODE = JsonFormatOptions.MAP_NULL_KEY_MODE;
+
+ public static final ConfigOption JSON_MAP_NULL_KEY_LITERAL = JsonFormatOptions.MAP_NULL_KEY_LITERAL;
+
+ public static final ConfigOption CDC_WRITE_HIVE =
+ ConfigOptions.key("cdc-write-hive")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "原生的maxwell-json canal-json debezium-json 因为其getChangelogMode " +
+ "返回值为ChangelogMode.all(),当下游是hive时,flink框架会判断报错,因为hive" +
+ "只支持insert模式,添加这个参数主要针对binlog写hive的场景。");
+}
diff --git a/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000000000000000000000000000000000000..bd1d676c5d0600cda0549f6ad34166df50c71646
--- /dev/null
+++ b/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,3 @@
+## 解析中通zdtp采集的binlog,添加了带出binlog中RowKind功能以及在table properties中添加参数,
+## 可以让binlog下游写hive
+com.zto.fire.flink.formats.json.ZDTPJsonFormatFactory
\ No newline at end of file
diff --git a/fire-enhance/pom.xml b/fire-enhance/pom.xml
index 8142d02adcee91a20b9027c8d2a372b551ffbc14..a99bd26db8f1660883e47171306da50693afc877 100644
--- a/fire-enhance/pom.xml
+++ b/fire-enhance/pom.xml
@@ -33,6 +33,7 @@
apache-spark
apache-flink
apache-arthas
+ flink-json
diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8dd1662c8ffab475dbeb67e1bb1cf939b273526e
--- /dev/null
+++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala
@@ -0,0 +1,47 @@
+package com.zto.fire.examples.flink.sql;
+
+object ZDTPTest extends FlinkStreaming {
+ @Step1("定义RocketMQ源表")
+ def source: Unit = {
+ sql("""
+ |CREATE table source (
+ | row_kind STRING METADATA FROM 'value.row_kind' VIRTUAL, -- 从binlog的 before 或者after所属的RowKind
+ | mq_topic STRING METADATA FROM 'topic' VIRTUAL, -- rocket mq topic
+ | mq_offset BIGINT METADATA FROM 'offset' VIRTUAL, -- rocket mq offset
+ | mq_timestamp TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
+ | id int,
+ | name string,
+ | age int,
+ | length double,
+ | data DECIMAL(10, 5)
+ |) with (
+ | 'connector'='fire-rocketmq',
+ | 'format'='json',
+ | 'rocket.brokers.name'='bigdata_test',
+ | 'rocket.topics'='fire',
+ | 'rocket.group.id'='fire',
+ | 'rocket.consumer.tag'='*',
+ | 'format' = 'zdtp-json', -- 自定义json解析,添加带出RowKind的功能
+ | 'zdtp-json.cdc-write-hive' = 'true' -- 如果写hive则需要加此参数
+ |)
+ |""".stripMargin)
+ }
+
+ @Step3("数据sink")
+ def insert: Unit = {
+ sql("""
+ |insert into
+ | st_znlyhb.zto_route_bill_change_log
+ |select
+ | row_kind,
+ | mq_topic,
+ | mq_offset,
+ | id,
+ | name,
+ | length,
+ | data,
+ | DATA_FORMAT(mq_timestamp, 'yyyyMMdd') ds -- hive分区字段
+ |from source
+ |""".stripMargin)
+ }
+}
\ No newline at end of file