diff --git a/fire-enhance/flink-json/pom.xml b/fire-enhance/flink-json/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..347979954acbaa7573b37280311762b862daa029 --- /dev/null +++ b/fire-enhance/flink-json/pom.xml @@ -0,0 +1,28 @@ + + + + fire-enhance + com.zto.fire + 2.4.1-SNAPSHOT + + 4.0.0 + + flink-json + + + 8 + 8 + + + + + org.apache.flink + flink-json + ${flink.version} + ${maven.scope} + + + + \ No newline at end of file diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java new file mode 100644 index 0000000000000000000000000000000000000000..11febc526dc4dfc25f7340d4e26dbe9e09795193 --- /dev/null +++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDecodingFormat.java @@ -0,0 +1,215 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import com.zto.fire.flink.formats.json.ZDTPJsonDeserializationSchema.MetadataConverter; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +public class ZDTPJsonDecodingFormat implements DecodingFormat> { + private List metadataKeys; + private final boolean ignoreParseErrors; + + private final boolean cdcWriteHive; + + private final TimestampFormat timestampFormatOption; + + public ZDTPJsonDecodingFormat(boolean ignoreParseErrors, boolean cdcWriteHive, TimestampFormat timestampFormatOption) { + this.ignoreParseErrors = ignoreParseErrors; + this.cdcWriteHive = cdcWriteHive; + this.timestampFormatOption = timestampFormatOption; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema createRuntimeDecoder(DynamicTableSource.Context context, DataType physicalDataType) { + + final List readableMetadata = + Stream.of(ReadableMetadata.values()) + .filter(m -> metadataKeys.contains(m.key)) + .collect(Collectors.toList()); + + + final List metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + + final TypeInformation producedTypeInfo = + context.createTypeInformation(producedDataType); + + + return new ZDTPJsonDeserializationSchema( + physicalDataType, readableMetadata, producedTypeInfo, ignoreParseErrors, timestampFormatOption); + } + + @Override + public ChangelogMode getChangelogMode() { + return cdcWriteHive ? ChangelogMode.insertOnly() : ChangelogMode.all(); + } + + @Override + public Map listReadableMetadata() { + final Map metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + // 所有 json中的metadataKeys + @Override + public void applyReadableMetadata(List metadataKeys) { + this.metadataKeys = metadataKeys; + } + + /** + * zdtp 字段中的metadata + * { + * "schema":"route_order", + * "table":"route_order_001", + * "gtid":"ldjfdlfj-ldfjdl", + * "logFile":"0002334.bin", + * "offset":"dfle-efe", + * "pos":12384384, + * "when":123943434383, + * "before":{}, + * "after":{} + * } + */ + enum ReadableMetadata { + SCHEMA( + "schema", + DataTypes.STRING().nullable(), + DataTypes.FIELD("schema", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + TABLE( + "table", + DataTypes.STRING().nullable(), + DataTypes.FIELD("table", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + GTID( + "gtid", + DataTypes.STRING().nullable(), + DataTypes.FIELD("gtid", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + LOGFILE( + "logFile", + DataTypes.STRING().nullable(), + DataTypes.FIELD("logFile", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + OFFSET( + "offset", + DataTypes.STRING().nullable(), + DataTypes.FIELD("offset", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getString(pos); + } + }), + POS( + "pos", + DataTypes.BIGINT().nullable(), + DataTypes.FIELD("pos", DataTypes.BIGINT()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + return row.getLong(pos); + } + }), + WHEN( + "when", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + DataTypes.FIELD("when", DataTypes.BIGINT()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int pos) { + if (row.isNullAt(pos)) return null; + return TimestampData.fromEpochMillis(row.getLong(pos)); + } + }), + // 将binlog中的RowKind带出去,当作元数据 + ROW_KIND( + "row_kind", + DataTypes.STRING().notNull(), + DataTypes.FIELD("row_kind", DataTypes.STRING()), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + @Override + public Object convert(GenericRowData row, int pos) { + + return StringData.fromString(row.getRowKind().toString()); + } + } + ) + ; + + final String key; + + final DataType dataType; + + final DataTypes.Field requiredJsonField; + + final MetadataConverter converter; + + ReadableMetadata(String key, DataType dataType, DataTypes.Field requiredJsonField, MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.requiredJsonField = requiredJsonField; + this.converter = converter; + } + } +} diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..13e3318a061697670dd76ac5649da7111f883dd5 --- /dev/null +++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonDeserializationSchema.java @@ -0,0 +1,214 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static java.lang.String.format; + + +public class ZDTPJsonDeserializationSchema implements DeserializationSchema { + private static final long serialVersionUID = 1L; + private static final String OP_TYPE = "op_type"; + private static final String BEFORE = "before"; + private static final String AFTER = "after"; + private static final String OP_INSERT = "I"; + private static final String OP_UPDATE = "U"; + private static final String OP_DELETE = "D"; + + /** Flag that indicates that an additional projection is required for metadata. */ + private final boolean hasMetadata; + + private final JsonRowDataDeserializationSchema jsonDeserializer; + + /** Metadata to be extracted for every record. */ + private final MetadataConverter[] metadataConverters; + + private final TypeInformation producedTypeInfo; + + private final List fieldNames; + + private final boolean ignoreParseErrors; + + + private final int fieldCount; + + public ZDTPJsonDeserializationSchema( + DataType physicalDataType, + List readableMetadata, + TypeInformation producedTypeInfo, + boolean ignoreParseErrors, + TimestampFormat timestampFormatOption) { + final RowType jsonRowType = createJsonRowType(physicalDataType, readableMetadata); + this.jsonDeserializer = + new JsonRowDataDeserializationSchema( + jsonRowType, + producedTypeInfo, + false, + ignoreParseErrors, + timestampFormatOption); + this.hasMetadata = readableMetadata.size() > 0; + + this.metadataConverters = createMetadataConverters(jsonRowType, readableMetadata); + + this.producedTypeInfo = producedTypeInfo; + + this.ignoreParseErrors = ignoreParseErrors; + + final RowType physicalRowType = (RowType) physicalDataType.getLogicalType(); + + this.fieldNames = physicalRowType.getFieldNames(); + + this.fieldCount = physicalRowType.getFieldCount(); + } + @Override + public RowData deserialize(byte[] message) throws IOException { + throw new RuntimeException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector) instead."); + } + + @Override + public void deserialize(byte[] message, Collector collector) throws IOException { + try { + GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message); + String type = row.getString(0).toString(); + GenericRowData before = (GenericRowData) row.getField(1); // before filed + GenericRowData after = (GenericRowData) row.getField(2); // after field + if (OP_INSERT.equals(type)) { + // "data" field is a row, contains inserted rows + after.setRowKind(RowKind.INSERT); + emitRow(row, after, collector); + } else if (OP_UPDATE.equals(type)) { + before.setRowKind(RowKind.UPDATE_BEFORE); + after.setRowKind(RowKind.UPDATE_AFTER); + emitRow(row, before, collector); + emitRow(row, after, collector); + } else if (OP_DELETE.equals(type)) { + before.setRowKind(RowKind.DELETE); + emitRow(row, before, collector); + } else { + if (!ignoreParseErrors) { + throw new IOException( + format( + "Unknown \"type\" value \"%s\". The Maxwell JSON message is '%s'", + type, new String(message))); + } + } + }catch (Throwable t) { + if (!ignoreParseErrors) { + throw new IOException( + format("Corrupt ZDTP JSON message '%s'.", new String(message)), t); + } + } + } + + @Override + public boolean isEndOfStream(RowData nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return producedTypeInfo; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + + if (obj == null || getClass() != obj.getClass()) return false; + + ZDTPJsonDeserializationSchema that = (ZDTPJsonDeserializationSchema) obj; + return ignoreParseErrors == that.ignoreParseErrors + && fieldCount == that.fieldCount + && Objects.equals(jsonDeserializer, that.jsonDeserializer) + && Objects.equals(producedTypeInfo, that.producedTypeInfo); + } + + @Override + public int hashCode() { + return Objects.hash( + jsonDeserializer, + producedTypeInfo, + ignoreParseErrors, + fieldCount); + } + + private void emitRow(GenericRowData rootRow, GenericRowData physicalRow, Collector collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + int physicalArity = physicalRow.getArity(); + int metadataArity = metadataConverters.length; + + GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); + + for (int physicalPos = 0; physicalPos < physicalArity; physicalPos ++) { + producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); + } + RowKind rootRowRowKind = rootRow.getRowKind(); + // `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL 从rootRow convert 元数据时, + // 由于rootRow的RowKind总是为 INSERT, 获取不到预期的结果,所以每次设置为physicalRow的 RowKind + rootRow.setRowKind(physicalRow.getRowKind()); + for (int metadataPos = 0; metadataPos < metadataArity; metadataPos ++) { + producedRow.setField(physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow)); + } + // 还原rootRow的RowKind + rootRow.setRowKind(rootRowRowKind); + collector.collect(producedRow); + } + + private static RowType createJsonRowType( + DataType physicalDataType, List readableMetadata) { + + DataType root = + DataTypes.ROW( + DataTypes.FIELD(OP_TYPE, DataTypes.STRING()), + DataTypes.FIELD(BEFORE, physicalDataType), + DataTypes.FIELD(AFTER, physicalDataType)); + // append fields that are required for reading metadata in the root + final List rootMetadataFields = + readableMetadata.stream() + .map(m -> m.requiredJsonField) + .distinct() + .collect(Collectors.toList()); + return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType(); + } + + private static MetadataConverter[] createMetadataConverters( + RowType jsonRowType, List requestedMetadata) { + return requestedMetadata.stream() + .map(m -> convert(jsonRowType, m)) +// .map(m -> (MetadataConverter) (row, pos) -> m.converter.convert(row, jsonRowType.getFieldNames().indexOf(m.requiredJsonField.getName()))) + .toArray(MetadataConverter[]::new); + } + + private static MetadataConverter convert(RowType jsonRowType, ZDTPJsonDecodingFormat.ReadableMetadata metadata) { + final int pos = jsonRowType.getFieldNames().indexOf(metadata.requiredJsonField.getName()); + return (root, unused) -> metadata.converter.convert(root, pos); + } + + interface MetadataConverter extends Serializable { + default Object convert(GenericRowData row) { + return convert(row, -1); + } + + Object convert(GenericRowData row, int pos); + } +} diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..b2b04f57dce509ab2642842f54091d8dca84a5e7 --- /dev/null +++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonFormatFactory.java @@ -0,0 +1,69 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptionsUtil; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static com.zto.fire.flink.formats.json.ZDTPJsonOptions.*; + + +/** + * 描述:flink 官网只提供了 canal maxwell debezium三种CDC connector, + * 由于目前使用的是中通自己开发的ZDTP 组件采集的binlog, + * 所以要写一个关于ZDTP组件来解析CDC消息, + * create table rocket_mq_table( + * `bill_code` string, + * `row_kind` STRING METADATA FROM 'value.row_kind' VIRTUAL, // 从binlog中提取的rowKind + * `mq_topic` STRING METADATA FROM 'topic' VIRTUAL, // topic + * `mq_broker` STRING METADATA FROM 'broker' VIRTUAL, // broker + * `id` bigint, + * customize_id string + * ) WITH ( + * 'connector' = '...', + * 'format' = 'zdtp-json', + * 'zdtp-json.cdc-write-hive' = 'true', // 添加此参数,下游可以直接insert 到hive + * ...... + * ) + */ +public class ZDTPJsonFormatFactory implements DeserializationFormatFactory { + private static final String IDENTIFIER = "zdtp-json"; + @Override + public DecodingFormat> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS); + final boolean cdcWriteHive = formatOptions.get(CDC_WRITE_HIVE); + final TimestampFormat timestampFormatOption = JsonFormatOptionsUtil.getTimestampFormat(formatOptions); + + return new ZDTPJsonDecodingFormat(ignoreParseErrors, cdcWriteHive, timestampFormatOption); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(IGNORE_PARSE_ERRORS); + options.add(TIMESTAMP_FORMAT); + options.add(JSON_MAP_NULL_KEY_MODE); + options.add(JSON_MAP_NULL_KEY_LITERAL); + return options; + } +} diff --git a/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..d3dcbd972a236eefabe1f7d05ca76972420b2792 --- /dev/null +++ b/fire-enhance/flink-json/src/main/java/com/zto/fire/flink/formats/json/ZDTPJsonOptions.java @@ -0,0 +1,25 @@ +package com.zto.fire.flink.formats.json; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.formats.json.JsonFormatOptions; + + +public class ZDTPJsonOptions { + public static final ConfigOption IGNORE_PARSE_ERRORS = JsonFormatOptions.IGNORE_PARSE_ERRORS; + + public static final ConfigOption TIMESTAMP_FORMAT = JsonFormatOptions.TIMESTAMP_FORMAT; + + public static final ConfigOption JSON_MAP_NULL_KEY_MODE = JsonFormatOptions.MAP_NULL_KEY_MODE; + + public static final ConfigOption JSON_MAP_NULL_KEY_LITERAL = JsonFormatOptions.MAP_NULL_KEY_LITERAL; + + public static final ConfigOption CDC_WRITE_HIVE = + ConfigOptions.key("cdc-write-hive") + .booleanType() + .defaultValue(false) + .withDescription( + "原生的maxwell-json canal-json debezium-json 因为其getChangelogMode " + + "返回值为ChangelogMode.all(),当下游是hive时,flink框架会判断报错,因为hive" + + "只支持insert模式,添加这个参数主要针对binlog写hive的场景。"); +} diff --git a/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory b/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000000000000000000000000000000..bd1d676c5d0600cda0549f6ad34166df50c71646 --- /dev/null +++ b/fire-enhance/flink-json/src/main/resources/META-INF.services/org.apache.flink.table.factories.Factory @@ -0,0 +1,3 @@ +## 解析中通zdtp采集的binlog,添加了带出binlog中RowKind功能以及在table properties中添加参数, +## 可以让binlog下游写hive +com.zto.fire.flink.formats.json.ZDTPJsonFormatFactory \ No newline at end of file diff --git a/fire-enhance/pom.xml b/fire-enhance/pom.xml index 8142d02adcee91a20b9027c8d2a372b551ffbc14..a99bd26db8f1660883e47171306da50693afc877 100644 --- a/fire-enhance/pom.xml +++ b/fire-enhance/pom.xml @@ -33,6 +33,7 @@ apache-spark apache-flink apache-arthas + flink-json diff --git a/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala new file mode 100644 index 0000000000000000000000000000000000000000..8dd1662c8ffab475dbeb67e1bb1cf939b273526e --- /dev/null +++ b/fire-examples/flink-examples/src/main/scala/com/zto/fire/examples/flink/sql/ZDTPTest.scala @@ -0,0 +1,47 @@ +package com.zto.fire.examples.flink.sql; + +object ZDTPTest extends FlinkStreaming { + @Step1("定义RocketMQ源表") + def source: Unit = { + sql(""" + |CREATE table source ( + | row_kind STRING METADATA FROM 'value.row_kind' VIRTUAL, -- 从binlog的 before 或者after所属的RowKind + | mq_topic STRING METADATA FROM 'topic' VIRTUAL, -- rocket mq topic + | mq_offset BIGINT METADATA FROM 'offset' VIRTUAL, -- rocket mq offset + | mq_timestamp TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, + | id int, + | name string, + | age int, + | length double, + | data DECIMAL(10, 5) + |) with ( + | 'connector'='fire-rocketmq', + | 'format'='json', + | 'rocket.brokers.name'='bigdata_test', + | 'rocket.topics'='fire', + | 'rocket.group.id'='fire', + | 'rocket.consumer.tag'='*', + | 'format' = 'zdtp-json', -- 自定义json解析,添加带出RowKind的功能 + | 'zdtp-json.cdc-write-hive' = 'true' -- 如果写hive则需要加此参数 + |) + |""".stripMargin) + } + + @Step3("数据sink") + def insert: Unit = { + sql(""" + |insert into + | st_znlyhb.zto_route_bill_change_log + |select + | row_kind, + | mq_topic, + | mq_offset, + | id, + | name, + | length, + | data, + | DATA_FORMAT(mq_timestamp, 'yyyyMMdd') ds -- hive分区字段 + |from source + |""".stripMargin) + } +} \ No newline at end of file