https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • w

    Wei Wang

    11/08/2022, 7:40 PM
    Hello, did anyone use BEAM + Flink? I am using BEAM 2.40 and Flink 1.14 and got the following errors. I tried to downgrade the dill version but it did not work.
  • w

    Wei Wang

    11/08/2022, 7:40 PM
    Copy code
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1596, in _create_pardo_operation
        dofn_data = pickler.loads(serialized_fn)
      File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 52, in loads
        encoded, enable_trace=enable_trace, use_zlib=use_zlib)
      File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
        return dill.loads(s)
      File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
        return load(file, ignore, **kwds)
      File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
        return Unpickler(file, ignore=ignore, **kwds).load()
      File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
        obj = StockUnpickler.load(self)
    TypeError: code() takes at most 15 arguments (16 given)
  • k

    Krish Narukulla

    11/08/2022, 8:46 PM
    I am planning to add table api connector for
    scylladb.
    Should i go route of JDBC driver support? or custom connector like
    hbase
    ? https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/ https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/
    m
    • 2
    • 1
  • s

    Simi Ily

    11/09/2022, 3:25 AM
    Hi, I am getting following error, for creating table on a Pulsar Topic in PyFlink. Can I know what dependencies need to be provided? I am using PyCharm IDE, PyFlink 1.16 and Pulsar 2.10 org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Please let me know where I am doing wrong. See the flink code below. Thanks. -------------------------------------------------- from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic from pyflink.table import StreamTableEnvironment, Schema, DataTypes, EnvironmentSettings def main(): # Create streaming environment env = StreamExecutionEnvironment.get_execution_environment() env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_parallelism(1) settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() # create table environment tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) #tbl_env.get_config().get_configuration().set_string( tbl_env.get_config().set( "pipeline.jars", "file:///downloads/flink/flink-sql-connector-pulsar-1.16.0.jar" ) #tbl_env.get_config().get_configuration().set_string( tbl_env.get_config().set( "pipeline.classpaths", "file:///downloads/flink/flink-sql-connector-pulsar-1.16.0.jar" ) source_ddl1 = f""" create table t1 ( message STRING ) with ( 'connector' = 'pulsar', 'service-url' = 'pulsar://192.168.2.121:6650', 'admin-url' = 'http://192.168.2.121:8080', 'topics' = 'persistent://public/default/RawDataStream', 'source.subscription-name' = 'mySubscription', 'source.subscription-type' = 'Exclusive', 'source.start.message-id' = 'earliest', 'format'='json', 'json.ignore-parse-errors' = 'true' ) """ tbl_env.execute_sql(source_ddl1) raw_data_tbl = tbl_env.from_path("t1") raw_data_tbl.print_schema() t2 = raw_data_tbl.select() t2.execute().print() env.execute() if name == '__main__': main()
    ✅ 1
    x
    d
    m
    • 4
    • 8
  • e

    Emmanuel Leroy

    11/09/2022, 5:22 AM
    FileSink is supposed to achieve Exactly Once semantics, but I find duplicates in my Avro files. How is this possible? I am streaming from a Kafka Source of avros and just dumping the avro records to S3 compatible fs. (note this is with Flink v1.14.6)
    m
    • 2
    • 3
  • e

    Echo Lee

    11/09/2022, 8:54 AM
    Our flink version is 1.14.0, and use temporalJoin operator and rocksdb statebackend, the thread is stucked in rockdb's seek for too long time, Does anyone have some advice for me?
  • c

    ConradJam

    11/09/2022, 10:23 AM
    Hello Community: I have a question about Flink K8S high availability, ConfigMap and Operator and would like to consult When I use the community's K8S Operator, it can automatically delete the high-availability ConfigMap in the HA. I checked the data inside and has all the leader address information of the JobManager. I would like to know when this ConfigMap can be removed and when not? At the same time, I also listed some suspicious scenarios. • If I don't need to restore based on the latest checkpoint, and I manually trigger a Savepoint on the job (Seesion/ApplicationMode), does that mean I can safely delete this ConfigMap? Because I saw in the flink k8s operator logic code (AbstractFlinkService.cancelJob)that only LAST_STATE is not deleted • Does this leader information mean that as long as I stop the cluster correctly, I can manually delete the ConfigMap that is left and not cleaned up (when I don't use the operator to manage, there will be legacy, this is more annoying, the official website https:/ /nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/kubernetes_ha/ also described) Would be very grateful if someone could answer my question
  • v

    Victor Costa

    11/09/2022, 12:03 PM
    Hello 🙂 trying to get
    PulsarSource
    working but i’m getting this error
    Copy code
    Caused by: java.lang.ClassNotFoundException: org.apache.pulsar.client.api.MessageId
    More details in the 🧵
    ✅ 1
    c
    • 2
    • 31
  • s

    Sahak Maloyan

    11/09/2022, 1:40 PM
    Hello Flink Community , i have an issue with Flink execution of Streaming and Table Environment. Execution in from StreamTableEnvironment blocks the execution of StreamExecutionEnvironment. My use case is getting via Kafka Topics messages sinkt them to S3 and pass the stream to my next Pipeliine which creates Tables and executes inserts. I getting this Exception: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. How i can combine the execution so that the both will work. If i comment out the line StreamExecutionEnvironment.execute("start env"); then works my kafka stream bot doensn't work table inserts via statementSet.execute() and vice versa. Do you have any idea how i can combine both of them so that they can work together ? Thanks in advance
    s
    • 2
    • 3
  • a

    Andy Mei

    11/09/2022, 5:42 PM
    Hi Flink peeps, we are testing Flink cluster Kubernetes DR setup across multiple AWS regions according to this doc. We have successfully setup Flink clusters on both Primary (us-east-1) and DR (us-east-2) with proper s3 bucket replications (both Job Manager and Job RocksDB states). After we shutdown primary Flink cluster and bring up DR Flink cluster we are not seeing new DR JobManager pickup or re-starting any jobs. I really appreciate any points or hints on how to troubleshoot this. TIA. Andy
  • r

    RICHARD JOY

    11/09/2022, 8:18 PM
    Good day everyone, Can someone shed some light on inbuilt webhooks in flink operator? From the docs, this admission controllers are to reload the keystore file. I’m struggling to understand which use case it’s useful to be enabled and when not. Any help is appreciated. Thanks!
    j
    g
    • 3
    • 12
  • n

    Nick

    11/09/2022, 9:26 PM
    Can I limit the number of windows in my flink job so that I can read a subset of the data in my stream?
  • n

    Nick

    11/09/2022, 9:29 PM
    and then write the results
  • n

    Nick

    11/09/2022, 9:34 PM
    after n windows have been generated
  • d

    Dan Andreescu

    11/09/2022, 9:47 PM
    hi all. I'm having a really hard time just configuring a pom.xml for a Flink project. Curious if there are examples and best practices that I'm missing. We use Kerberos and we're on an older version of Hadoop (2.10.2). And what I'm trying to do is use Flink to read from Kafka and write to Iceberg, stored in Parquet format on HDFS. What seems to be happening is I include all the jars I need, set up all the classpaths, configure flink-conf.yaml with what it needs, the job is successfully submitted and I get an endless stream of "class not found" or "no such method" types of errors. These seem to be Services that should register with the Service Provider Interface, but for some reason don't. I'm just typing this out there in case anyone has experience or links for me to check out. I'll write some more details if it helps, just didn't want to overwhelm to start.
    c
    • 2
    • 1
  • e

    Emmanuel Leroy

    11/10/2022, 12:45 AM
    is it possible to scale / change parallelism of a job with the operator in session mode? I tried changing the parallelism on a job and it just got cancelled; the job is then in suspended mode, but the operator errors saying it can’t find the job. The job status goes like this:
    Copy code
    Normal  JobStatusChanged  5m42s                Job                   Job status changed from CREATED to RUNNING
      Normal  JobStatusChanged  2m33s                Job                   Job status changed from RUNNING to FINISHED
      Normal  SpecChanged       77s (x6 over 2m38s)  JobManagerDeployment  SCALE change(s) detected (FlinkSessionJobSpec[job.parallelism=2] differs from FlinkSessionJobSpec[job.parallelism=4]), starting reconciliation.
      Normal  Suspended         77s (x6 over 2m38s)  JobManagerDeployment  Suspending existing deployment.
    and the operator logs say:
    Copy code
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (ffffffff9751e3e50000000000000001)
    is this expected?
    g
    • 2
    • 8
  • m

    Mingliang Liu

    11/10/2022, 1:31 AM
    Hi team! I recently adding Flink code to my existing application written in Scala 2.13. So Flink 1.15 has a release notes:
    Copy code
    For Table / SQL users, the new module flink-table-planner-loader replaces flink-table-planner_2.12 and avoids the need for a Scala suffix. For backwards compatibility, users can still swap it with flink-table-planner_2.12 located in opt/. flink-table-uber has been split into flink-table-api-java-uber, flink-table-planner(-loader), and flink-table-runtime. Scala users need to explicitly add a dependency to flink-table-api-scala or flink-table-api-scala-bridge.
    It makes me think I can add
    flink-table-api-scala
    and/or
    flink-table-api-scala-bridge
    to my application which already pulls in Scala 2.13. However, I do not really get the dependency resolved as the above two packages do not exist in Maven central repository. Did I miss anything? Anyone using Scala 2.13? Thanks
    s
    c
    • 3
    • 4
  • t

    Tan Trinh

    11/10/2022, 4:15 AM
    Hi Team, I am using flink-operator to deploy Flink in the k8s environment. My Flink job is a batch job, and I think I would use k8s Cronjob to schedule my Flink job. But I don't know how to compile k8s Cronjob with FlinkDeployment CRD, because at k8s Cronjob we can only provide a pod template. So, is there any solution for running the flink batch job in k8s ? Thanks in advance!
    👀 1
    g
    • 2
    • 2
  • o

    Owen Lee

    11/10/2022, 9:10 AM
    Hi Team, in flink-connector-kafka-1.16, it tries to fetch topicPartitionBookkeeper variable from org.apache.kafka.clients.producer.internals.transactionManager. however topicPartitionBookkeeper variable doesn't seem to exist in the class. This causes infinite restart. `package org.apache.flink.connector.kafka.sink;`class FlinkKafkaInternalProducer<`_K_, _V_> extends _KafkaProducer_<_K_, _V_> {` ... public
    _void_ resumeTransaction(_long_
    _producerId_, _short_
    _epoch_) {
    _Object_ topicPartitionBookkeeper =
    getField(transactionManager, "*topicPartitionBookkeeper*"); ... }
    ✅ 1
    c
    • 2
    • 5
  • s

    Sandeep Kongathi

    11/10/2022, 3:12 PM
    Hi team, When using Flink SQL in 1.16.0 I am getting this error for kafka
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    
    Available factory identifiers are:
    
    blackhole
    datagen
    filesystem
    print
    python-input-format
    It was working fine in 1.15.0, is there any external jar I should include
    j
    • 2
    • 15
  • d

    Dylan Fontana

    11/10/2022, 4:45 PM
    Hi Folks 👋 I think I have a related question to this thread. The docs mention adding user variables is done via
    addGoup(key, value)
    . Similarly the same method is used for adding user scope. It's also mentioned that user variables affect the outcome of getScopeComponents. Is it possible to add a user variable without affecting the scope components of a metric? I'm finding (when trying to register counters for different categories) each counter reports as a different metric (due to the scope component including the variable) and gets tagged with the variable. I instead want them to be the same metric but different tags. For example:
    Copy code
    Map<Integer, Counter> counters = new HashMap<>();
    for (int i = 0; i < 10; i++) {
      String category = String.format("cat-%d", i);
      counters.put(
        i, 
        context.getMetricGroup()
            .addGroup("category", category)
            .counter("category_sums")
      );
    }
    
    // Results in 10 metrics named:
    // flink.operator.category.cat-{i}.category_sums
    // each with tag {category: cat-i}
    c
    • 2
    • 2
  • k

    Kevin Lam

    11/10/2022, 7:03 PM
    👋 Does anyone know if KafkaSink defaults to distributing records to Kafka partitions in a round-robin fashion?
    s
    • 2
    • 1
  • u

    张思航

    11/10/2022, 8:10 PM
    Hey, guys. I am implementing Flink on Amazon Kinesis Data Analytics using Table API (with table API connector on both source and sink). With datastream API, I can log data while data processing iteration. But with Table API, when I tried to log the data rows while the application is running , I couldn’t find a good way to do it. Can anyone provide some suggestions? Or is there a better way to verify the input and output data?
    d
    • 2
    • 2
  • s

    Slackbot

    11/10/2022, 11:58 PM
    This message was deleted.
  • v

    Victor Costa

    11/11/2022, 12:32 AM
    Does anyone have an example of a python client (with pyflink) consuming messages in avro format? I’m trying to figure out which jars I should include in my flink job.
    Copy code
    TypeError: Could not found the Java class 'org.apache.flink.avro.shaded.org.apache.avro.Schema.Parser'
    More details in the 🧵
    j
    x
    u
    • 4
    • 55
  • j

    Jackwangcs

    11/11/2022, 3:24 AM
    Hey, guys. I am migrating our SQL job from 1.13.2 to 1.16.0. The job works well on 1.13.2 but fails on 1.16.0 with the following exception
    Copy code
    INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Join[292] -> Calc[293] -> ConstraintEnforcer[294] (13/48) (0e3956d4cb2a2fb37397f401d50eec99_084fe6556ee4c166eb77ae58879c63be_12_0) switched from RUNNING to FAILED on container_1667978922771_0006_01_000004 @ host-name (dataPort=33409).
    java.lang.NullPointerException: null
    	at StreamExecCalc$18110.processElement_split890(Unknown Source) ~[?:?]
    	at StreamExecCalc$18110.processElement(Unknown Source) ~[?:?]
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:334) ~[flink-table-runtime-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:219) ~[flink-table-runtime-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:124) ~[flink-table-runtime-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0]
    	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
    Do you guys have any suggestions about how to debug it? Any help is appreciated.
    • 1
    • 1
  • g

    Gaurav Miglani

    11/11/2022, 5:00 AM
    Facing out of memory issue with parquet write from kafka, my source topic has 160 partitions and input rate is 2M/sec json records, I have started my simple flink job with
    taskmanager.numberOfTaskSlots: "16"
    and
    parallelism: 80,
    each TM memory is 32 gb and 12 cpu(c5n.4xlarge), i have tried tuning the parallelism, but it is not working, followed https://docs.immerok.cloud/docs/how-to-guides/development/read-from-apache-kafka-write-to-parquet-files-with-apache-flink/
    s
    • 2
    • 21
  • j

    Jim

    11/11/2022, 6:44 AM
    Hi, considering HA Flink cluster, how much resources JobManager should actually have? I guess the master can be in a much smaller compute than task managers? Is there some reference architecture/recommendations how to set up instance sizes?
  • j

    Jirawech Siwawut

    11/11/2022, 12:09 PM
    Hi. I would like to know how Flink HA on Kubenertes works in detail. Assume that i follow this example https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-checkpoint-ha.yaml Will it create two pods of JobManager and then user leader-follower ideas to achieve that similar to Zookeeper HA?
  • j

    Jaume

    11/11/2022, 2:15 PM
    👋🏼 Hi fellows, we're getting a
    java.lang.RuntimeException: Can not retract a non-existent record. This should never happen
    error in a Flink Job using Table SQL. Our Flink version is v.1.13.6 ❓ Does anyone know in which conditions this exception can happen? 🧵 More info about job & exception in thread ⤵️
    m
    • 2
    • 5
1...313233...98Latest