Pyflink 作业,本地运行正常,提交集群时错误,该如何提交集群运行?

主要报错
【ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy】

flink 版本 1.17.1
python 版本 3.10

demo 代码

import json

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, FlinkKafkaProducer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.common.typeinfo import Types
from json import dumps


if __name__ == '__main__':
    brokers = '172.18.98.96:9092'
    source_topic = "test1"  # 源数据
    sink_topic = "test3"  # 结果

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC)

    env.add_jars("file:///home/demo/jar/flink-sql-connector-kafka-1.17.1.jar")
    # env.add_jars("file:///usr/local/lib/python3.10/dist-packages/lib/flink-sql-connector-kafka-1.17.1.jar")

    source = KafkaSource.builder() \
        .set_bootstrap_servers(brokers) \
        .set_topics(source_topic) \
        .set_group_id("demo") \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()

    ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

    def str_to_dict(data):
        json_data = json.loads(data)
        action=json_data.get('action')
        is_click=1 if action=='click' else 0
        return json_data.get('name'), is_click

    def format_json(data):
        return json.dumps({'name':data[0] ,'click_num':data[1]},ensure_ascii=False)


    ds = ds.map(str_to_dict,output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
    ds = ds.key_by(lambda x: x[0]).sum(1).map(format_json,output_type=Types.STRING())
    
    serialization_schema = SimpleStringSchema()
    kafka_producer = FlinkKafkaProducer(
        topic=sink_topic,
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': brokers, 'group.id': 'my-group'})

    ds.add_sink(kafka_producer)

    env.execute('demo')

本地运行 python demo.py 正常
提交flink 时 执行

flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar

报错


root@a68045bb7b7a:/home/demo# flink run -m 172.19.98.96:8081 -py demo.py --jarfile /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar
Traceback (most recent call last):
  File "/home/demo/demo.py", line 22, in <module>
    source = KafkaSource.builder() \
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 387, in builder
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 430, in __init__
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.connector.kafka.source.KafkaSource.builder.
: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/OffsetResetStrategy
        at org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.earliest(OffsetsInitializer.java:147)
        at org.apache.flink.connector.kafka.source.KafkaSourceBuilder.<init>(KafkaSourceBuilder.java:106)
        at org.apache.flink.connector.kafka.source.KafkaSource.builder(KafkaSource.java:123)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 14 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
        ... 14 more

补充 jobmanager 类库
image.png

阅读 4.3k
1 个回答

先说下我这里测试运行发现的可能的报错原因:
缺少org.apache.kafka.clients.consumer.OffsetResetStrategy类。猜测可能是因为你的Flink集群中缺少相关依赖项。
我的建议是将flink-sql-connector-kafka-1.17.1.jar上传到集群的lib目录中。使用这个命令

cp /home/demo/jar/flink-sql-connector-kafka-1.17.1.jar /opt/flink/lib/

将其复制到Flink的lib目录,重启后应该就可以解决这个环境问题。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题