SeaTunnel如何读取mysql中Generated Invisible Primary Keys (GIPK)字段my_row_id的值?

新手上路,请多包涵

SeaTunnel使用mysql cdc同步到doris中,遇到mysql表包含 Generated Invisible Primary Keys (GIPK) 时,读取my_row_id字段值是null,导致写入doris失败,如何解决?

org.apache.kafka.connect.errors.DataException: Invalid Java object for schema "org.apache.kafka.connect.data.Decimal" with type BYTES: class java.lang.Long for field: "my_row_id"
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242) ~[?:?]
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216) ~[?:?]
        at io.debezium.relational.TableSchemaBuilder.lambda$createKeyGenerator$3(TableSchemaBuilder.java:226) ~[?:?]
        at io.debezium.relational.TableSchema.keyFromColumnData(TableSchema.java:130) ~[?:?]
        at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:86) ~[?:?]
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50) ~[?:?]
        at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:155) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotSplitReadTask.createDataEventsForTable(MySqlSnapshotSplitReadTask.java:241) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotSplitReadTask.createDataEvents(MySqlSnapshotSplitReadTask.java:179) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:147) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:117) ~[?:?]
        at org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask.execute(MySqlSnapshotFetchTask.java:71) ~[?:?]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_111]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_111]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_111]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_111]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111]

暂未找到方法

阅读 653
avatarAI BotBETA

在使用 SeaTunnel 进行 MySQL CDC(Change Data Capture)到 Doris 的同步过程中,遇到 Generated Invisible Primary Keys (GIPK) 字段读取为 null 的问题,通常与 Debezium(SeaTunnel 中 MySQL CDC 可能依赖的库)如何解析和传输这些字段有关。以下是一些可能的解决步骤和考虑因素:

1. 确认 Debezium 配置

首先,确保 Debezium 连接器配置正确,特别是与表结构、字段类型等相关的部分。对于 GIPK,Debezium 应该能够捕获这些字段的变化,但可能由于配置不当或版本问题而无法正确处理。

2. 检查字段类型

错误信息中提到了 "my_row_id" 字段的类型不匹配(期望为 BYTES 但得到的是 java.lang.Long)。这通常意味着在 Kafka Connect 的 schema 中,该字段的类型被错误地定义或解析。你需要检查 SeaTunnel 或 Debezium 的配置,看看是否有地方可以指定或覆盖字段类型。

3. 自定义字段映射

如果默认配置无法满足需求,你可以考虑在 SeaTunnel 的配置中自定义字段映射。这通常涉及到在数据从 MySQL 读取后、写入 Doris 之前,对数据进行转换或处理。

4. 更新或修复 Debezium

如果你正在使用的 Debezium 版本不支持 GIPK 或存在已知的 bug,尝试更新到最新版本可能有助于解决问题。同时,查看 Debezium 的 issue 跟踪器,看看是否有其他用户报告了类似的问题。

5. 使用其他工具或方法

如果问题依旧无法解决,你可以考虑使用其他工具或方法来同步数据,如直接编写一个自定义的同步脚本或使用其他支持 MySQL CDC 的数据集成工具。

示例配置(假设性)

由于 SeaTunnel 的具体配置可能因版本和插件而异,以下是一个假设性的配置示例,用于说明如何在 SeaTunnel 中进行字段映射或类型转换(注意:这只是一个示例,并不直接适用于所有情况):

env:
  execution.parallelism: 1

source:
  mysqlCdc:
    connector:
      url: jdbc:mysql://localhost:3306/mydatabase
      username: myuser
      password: mypassword
      table-name: mytable
      # 其他 Debezium 相关配置...

transform:
  - sql:
      sql: "SELECT CAST(my_row_id AS BINARY) AS my_row_id_binary, other_columns FROM ?"

sink:
  doris:
    connector:
      # Doris 连接器配置...
      fields: [my_row_id_binary, other_columns]
      # 其他 Doris 写入配置...

注意:上面的 sql 转换配置是一个假设性的示例,用于说明如何在数据读取后进行字段转换。在 SeaTunnel 中,实际的转换逻辑可能需要根据实际情况进行调整。

结论

解决 SeaTunnel 中 MySQL CDC 同步到 Doris 时 GIPK 字段读取为 null 的问题,通常需要检查并调整 Debezium 连接器的配置,或者通过自定义转换逻辑来处理字段类型和映射问题。如果问题依旧存在,考虑使用其他工具或方法可能是必要的。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏