flinkcdc mysql DataStream API问题错在哪?

xiaofanku
  • 55
山东

flinkcdc版本是2.1.1, mysql:5.7. flink:1.15.1

照着官方的葫芦画个瓢都IDEAJ没反应,在flink上跑jar提示:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

java代码:

import java.util.Properties;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.example.utils.TransformUtil;
import org.apache.flink.streaming.api.windowing.time.Time;

public class ApplyFundExecutives {
    public static void main(String[] args) {
        Properties debeziumProperties = new Properties();
        debeziumProperties.put("snapshot.locking.mode", "none");
        MySqlSource<String> applyFundSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(33061)
                .databaseList("已存在的数据") // set captured database
                .tableList("某张表") // set captured table
                .username("root")
                .password("root")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to String
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 3s 的 checkpoint 间隔
        // checkpoint every 3000 milliseconds
        env.enableCheckpointing(3000);
        env.fromSource(
                applyFundSource,
                WatermarkStrategy.noWatermarks(),
                "MySQL ApplyFund Source").setParallelism(4).print().setParallelism(1);
        try {
            env.execute("Print MySQL Snapshot + Binlog");
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

pom:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>etl-table</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>alibaba</id>
            <name>ali Repository</name>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
    </repositories>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- source相关的jar-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.1.1</version>

        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.21</version>

        </dependency>
        <!-- Flink Connectors :Stream || DataSet || Table -->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.15.1</version>

        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.15.1</version>

        </dependency>
        <!-- sink相关的jar-->
        <!-- https://docs.microsoft.com/zh-cn/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver15#using-the-jdbc-driver-with-maven-central -->
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>10.2.0.jre8</version>

        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.15.1</version>

        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>

        </dependency>
        <!-- CNFE:org.apache.flink.connector.base.source.reader.RecordEmitter-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
            <version>1.15.1</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.15.1</version>

        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.15.1</version>

        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.15.1</version>

        </dependency>
    </dependencies>
</project>
回复
阅读 649
1 个回答
✓ 已被采纳

找到问题了:
1)版本不匹配: flink的版本要与使用的flinkcdc声明的一样。我使用的是2.1.1对应的flink版本是1.13.6
2)相关jar: 把IDE中依赖的非flink的jar放到flink的lib下:%FLINK_HOME%/lib. 这些除了驱动外flinkcdc依赖的版本也是不一样的.不可以混用

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏