https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • n

    Nithin kharvi

    08/22/2022, 9:07 AM
    Hello guys, We are getting below error in flink 1.15.1 version. Error: javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-kafka-sink-0-1 but there is only one kafka producer created in the pipeline and clientId is auto generated. This error is not faced in the kafka consumer part. so, why this error is coming in the taskmanager - kafka producer even though there is no multiple kafka producer with same clientId
    s
    • 2
    • 15
  • s

    Sylvia Lin

    08/22/2022, 5:31 PM
    Hello forks, is there a way I can disable certain flink metrics tags? Seems we're experiencing cardinality explosion for metrics
    flink.task.numRecordsOut
    , 19606 tags generated for task_attempt_id in past one hour
    ✅ 1
    c
    h
    • 3
    • 3
  • j

    Jirawech Siwawut

    08/22/2022, 5:45 PM
    Hello. Does anyone try to use Flink SQL client with kerberized Hive? I cannot connect to Hive metastore
    Copy code
    17:38:15.505 [main] ERROR org.apache.thrift.transport.TSaslTransport - SASL negotiation failure
    javax.security.sasl.SaslException: GSS initiate failed
    It seems Flink SQL Client does not init Kerberos session?
    p
    • 2
    • 2
  • h

    Hilmi Al Fatih

    08/23/2022, 4:10 AM
    Hi. Does anyone know how to add metrics scoped to Job/JobManager or any other System scope? As far as my findings in docs, it only describe how to add metrics in user scope.
    c
    • 2
    • 1
  • j

    Jirawech Siwawut

    08/23/2022, 5:18 AM
    Hello. Does Flink Hive support reading this option? https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/#streaming-source-partition-include I got this error
    Copy code
    The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
    I check the code but it seems the only option is
    all
    at the moment https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
  • g

    Giannis Polyzos

    08/23/2022, 6:12 AM
    [Kubernetes Operator] I'm trying to install the Kubernetes Operator (probably missing something simple). The operator needs cert-manager. When I follow the instruction on the official docs using the default namespace, everything run ok. But when I have already a cert-manager installed in the kube-system namespace
    Copy code
    kubectl get pods -n kube-system | grep cert
    
    cert-manager-cainjector-695476768b-jsk4t                      1/1     Running   0          6d14h
    cert-manager-controller-95d89d7c8-5sr6g                       1/1     Running   0          6d14h
    cert-manager-webhook-66697d8b77-l69sv                         1/1     Running   0          6d14h
    I'm trying to install the flink-operator in a flink namespace, but I'm getting this error:
    Copy code
    Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "flink" from "": no matches for kind "Certificate" in version "<http://cert-manager.io/v1|cert-manager.io/v1>"
    ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "flink" from "": no matches for kind "Issuer" in version "<http://cert-manager.io/v1|cert-manager.io/v1>"
    ensure CRDs are installed first]
    g
    • 2
    • 5
  • m

    Metehan Yıldırım

    08/23/2022, 7:24 AM
    Hello Everyone, I got a problem with Flink Python UDFs when I use Numba. Can you check the Stackoverflow question I provide? https://stackoverflow.com/questions/73447232/using-numba-in-flink-python-udfs?noredirect=1#comment129713618_73447232
    x
    h
    • 3
    • 35
  • i

    Ivan M

    08/23/2022, 12:51 PM
    Hey folks! I deploy jar flink job via ververica and get
    Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
    error. Any ideas where should look for a root cause? 🙂
  • p

    Pedro Cunha

    08/23/2022, 3:15 PM
    Hello again guys. After upgrading flink from
    1.14.3
    to
    1.15.1
    I started getting errors of
    Regressing checkpoing IDs
    . It looks like Flink is trying to use old ids to create checkpoints, for some reason.
    Previous checkpointId = 31235, new checkpointId = 504
    . On some deployments, it fails when the id is the same. Anyone has any clue on how to do this? Is there an config I’m missing?
    c
    • 2
    • 19
  • w

    Will Norman

    08/23/2022, 3:33 PM
    Hi, I'm working on a POC of using flink for some feature engineering work, and we're interested in testing out HybridSource to bootstrap values. We have a tiered storage model, with event data stored in S3 long term, and streaming through Kinesis. The data is in JSON format. It looks like the current implementation of
    FlinkKinesisConsumer
    extends
    RichParallelSourceFunction
    where as I need a
    Source
    object, similar to
    KafkaSource
    . Does anyone know if it's possible to read from Kinesis as part of a hybrid source?
    s
    • 2
    • 2
  • h

    Hunter Medney

    08/23/2022, 7:02 PM
    Hi all, I'm having trouble configuring a s3 DataStreamSource inside the IDE (VS Code). I've provided the settings as a Configuration object:
    Copy code
    final Configuration jobConfiguraiton = new Configuration();
    jobConfiguraiton.setString("s3.endpoint", "<http://localhost:9000>");
    jobConfiguraiton.setString("s3.path.style.access", "true");
    jobConfiguraiton.setString("s3.access-key", "minio");
    jobConfiguraiton.setString("s3.secret-key", "minio123");
    jobConfiguraiton.setString("s3.connection.ssl.endabled", "false");
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(jobConfiguraiton);
    
    DataStreamSource<String> text = env.readTextFile("<s3://test/example.csv>");
    text.print();
    env.execute();
    and via FLINK_PROPERTIES:
    Copy code
    FLINK_PROPERTIES="s3.access-key: minio\ns3.secret-key: minio123\ns3.endpoint: <http://localhost:9000> "
    and it keeps behaving as if no endpoint or credentials are defined:
    Copy code
    Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, com.amazonaws.auth.profile.ProfileCredentialsProvider@21c90685: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@574cf28b: Failed to connect to service endpoint: ]
            at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1257)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:833)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:783)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
            at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
            at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
            at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
            at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
            at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6220)
            at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6193)
            at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5244)
            at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
            at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360)
            at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1334)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:667)
            at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648)
            at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353)
            at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1690)
            at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:165)
            at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
    ...feel like I'm missing something simple but have been wrestling with this for a while. Thank you for any insights!
  • p

    Prasanth Kothuri

    08/23/2022, 8:47 PM
    We are moving to event time processing and we mostly use tumbling windows, we do need to cater for late arriving events, I am trying to understand how the watermark work and how they close windows. if i have a tumbling window of 1 hrs, does the following close the window after 1hr 5 minutes or 1 hr 10 minutes if there is no input, is my understanding correct? Thanks a ton
    Copy code
    WatermarkStrategy
      .forBoundedOutOfOrderness[(Long, String)](Duration.ofMinutes(5))
      .withIdleness(Duration.ofMinutes(10))
    d
    • 2
    • 13
  • r

    Rafał Trójczak

    08/24/2022, 8:10 AM
    Hello, guys! I would like to run Flink environment from IDE:
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    Is there a way to set configuration options (like
    taskmanager.memory.network.min
    ) from command line when running this Flink job? Of course, I can set configuration options using the
    Configuration
    class and
    createLocalEnvironment
    but I don't want to do this because the same job will run on a cluster.
    t
    p
    • 3
    • 8
  • f

    Felix Angell

    08/24/2022, 11:28 AM
    Heya, how feasible would it be to backport some of the flink functionality around Sessionised windows to an older version (1.13.2)? We've already done this with a connector API for Kinesis consumption but the Window utilities seem more complicated to backport. Note that we have to do this because we are stuck on this version due to AWS KDA. It looks as though the Sessionised windows are in PyFlink 1.16.x so this would be some pretty recent code to backport. What do you think? Thanks 🙂
    ✅ 1
    g
    • 2
    • 8
  • k

    Kevin L

    08/24/2022, 6:36 PM
    Hello, I have a pyflink script that basically creates a table on top of a kafka topic using the Table API SQL syntax, then just for testing, I am printing out 10 rows. When I try to do this, the application prints out 10 rows to the terminal then hangs. To stop the program I have force quit with control-C. any ideas on how to make the table result end when the query completes? Here is the the the script that I am running:
    Copy code
    kafka_source_query = """
        CREATE TABLE if not exists kafka_topic (
        `c1` VARCHAR,
        `c2` VARCHAR,
        `c3` VARCHAR
          ) WITH (
            'connector' = 'kafka',
            'topic' = 'kafka-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
          )
    """
    result = t_env.execute_sql(kafka_source_query)
    tbl = t_env.from_path("kafka_topic")
    tbl.select(col("c1"),col("c2"), col("c3")).limit(10).execute().print()
    tldr: The last statement never finishes (ie.
    tbl.select(col("c1")...
    ), it just prints out 10 rows to the console and hangs. I would like to run this script with it exiting on its own. Any thoughts on this? is this expected or a bug?
    s
    • 2
    • 3
  • k

    Krish Narukulla

    08/24/2022, 7:26 PM
    How to create
    LocalTableEnvironment
    for debugging purpose. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/local_execution/
    s
    d
    • 3
    • 2
  • d

    Darin Amos

    08/24/2022, 9:39 PM
    Hi All! I am trying to understand how aligned (or unaligned) checkpointing works with the
    ContinuousFileReaderOperator
    and an S3 source (a custom FileMonitoringFunction). I have not been able to find an answer on Google or documentation. My understanding was that checkpoint barriers cannot overtake stream records and I considered file splits to be like any other type of record. I also understand that aligned checkpoints are not supposed to persist in-flight data. I don’t think my observations perfectly align with this. Currently my aligned checkpoints run every minute and I have several S3 files that take longer than 1 minute to parse and process. However, when a checkpoint is taken while I am processing one of these large files my checkpoint starts and completes while the file is still being processed. I also notice when this happens that parallel reader operators have a mix of “checkpointed data size” values. I’m curious what is happening behind the scenes here. Is it possible that “transform” operators such as the continuous file reader are treated as a special case where checkpoint barriers are allowed to overtake records (file splits in this case) and the input buffers are persisted in the checkpoint? My guess would have been that the checkpoint barrier needs to wait for all the file splits to be processed.
    d
    • 2
    • 10
  • s

    Sylvia Lin

    08/24/2022, 11:32 PM
    Hey forks, wondering how can we dynamically sink data to different kafka topics with
    KafkaSink
    ? Previously we use
    FlinkKafkaProducer
    , it serializes record as
    ProducerRecord
    which holds topic info. https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html But we know
    FlinkKafkaProducer
    will be deprecated soon, so we want to switch to
    KafkaSink
    or other similar method
    d
    • 2
    • 5
  • k

    Krish Narukulla

    08/25/2022, 12:18 AM
    Unable to do local debugging, running into dependencies error
    Copy code
    Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
    	at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:108)
    	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:300)
    	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:971)
    	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:943)
    	at com.roku.common.frameworks.flink.execution.ExecutionV1.execute(ExecutionV1.scala:31)
    	at com.roku.common.frameworks.flink.FlinkApp$.main(FlinkApp.scala:27)
    	at com.roku.common.frameworks.flink.FlinkApp.main(FlinkApp.scala)
    Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:526)
    	at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105)
    	... 6 more
    • 1
    • 1
  • c

    Carl Choi

    08/25/2022, 2:18 AM
    Hello All! I just got a short question. Is is possible to connect and select from mysql every time I want? I mean that I want to select from Mysql every time Flink gets stream from kafka like bellow kafka --> debeziumCDC --> Flink --> Mysql another table | Mysql So data from Mysql must be most recent data. What method can I apply? Thanks for reading and your attentions!
    s
    • 2
    • 4
  • r

    Rashmin Patel

    08/25/2022, 5:44 AM
    Hello, guys! I have a very basic doubt on Allowed Lateness concept in Flink. If I am in event-time processing and I set window.allowedLateness(5), then which is correct ? 1. After the watermark say cw passes end of window, the window will wait for 5 second in processing time to allow late events 2. Or the window will wait for next watermarks to pass cw + 5 second (i.e in event-time processing)
    c
    • 2
    • 2
  • a

    Adesh Dsilva

    08/25/2022, 10:53 AM
    Hi When using List or Map in Java POJO, Flink seems to partially use Kryo serialization. I see that we need to annotate @TypeInfo and provide a TypeInfoFactory class from docs It doesn’t seem very intuitive though. Another alternative is to use
    String[]
    instead of
    List<String>
    . My question is why doesn’t Flink automatically create this type information? I can understand List<T> can be difficult but it should be possible for primitive types like
    List<String>
    or
    List<Integer>
    . Another approach I see is to use avro or protobuf files and generate java files from them but would like to avoid them if possible.
    c
    • 2
    • 4
  • f

    Felix Angell

    08/25/2022, 11:14 AM
    noob question: i'm trying to understand the table api after coming from looking at the data stream api. in the data stream api you can key a stream to then window each keyed stream (e.g. with a session window) what would the equivalent be for a table api?
    s
    • 2
    • 1
  • d

    Dhavan

    08/26/2022, 4:57 AM
    Hey guys, I had asked a question on mailing list earlier to which I have received a reply: https://lists.apache.org/thread/tr3n4r749oxlsjq1dbkxrg76nfq47o06 I thought to follow up here for some quick clarifications. What flink seems to be doing is that it is actually creating
    DELETE
    statements. Though all my queries are
    INSERT
    queries with a few
    LEFT JOIN
    . So, could it be that my queries are somehow causing records to delete?
    s
    t
    • 3
    • 25
  • p

    Prasanth Kothuri

    08/26/2022, 7:26 AM
    Hello, I have managed to write unit tests for flapmap operator but I can't figure out how to write unit tests for aggregate (window) operator, I am using
    KeyedOneInputStreamOperatorTestHarness
    , are there any examples that I can look at ? Thanks a ton The code I want to unit test is like this - https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/windows/#incremental-window-aggr[…]n-with-aggregatefunction
    r
    • 2
    • 12
  • c

    chunilal kukreja

    08/26/2022, 1:00 PM
    hi team, As per flink documentation latest doc 1.15.1
    Since RocksDB is part of the default Flink distribution, you do not need this dependency if you are not using any RocksDB code in your job and configure the state backend via state.backend and further checkpointing and RocksDB-specific parameters in your flink-conf.yaml.
    That means we needn’t have to include dependency “*right*”;
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>1.15.1</version>
        <scope>provided</scope>
    </dependency>
    But this doesn’t work 😞 while if i do this programmatically it works as expected. Can someone who has done this through config way look into this or suggest?
    flink-conf.yaml
    h
    c
    • 3
    • 19
  • k

    Krish Narukulla

    08/26/2022, 6:53 PM
    I have tried to run unit tests by following https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/ , resulting error.
    Copy code
    src/test/scala/com/xxx/common/frameworks/flink/Test.scala:16: error: Class org.apache.flink.runtime.testutils.MiniClusterResource not found - continuing with a stub.
      val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
                             ^
    src/test/scala/com/xxx/common/frameworks/flink/Test.scala:16: error: Class org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not found - continuing with a stub.
      val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
                                                               ^
    src/test/scala/com/xxx/common/frameworks/flink/Test.scala:41: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[Long]
        env.fromElements(1L, 21L, 22L)
                        ^
    s
    • 2
    • 3
  • j

    Jeff Levesque

    08/27/2022, 1:30 AM
    I wrote a PyFink application that I tested, and successfully run locally. This PyFlink application contains a custom UDAF function, which imports a required
    pip
    package to run. Within my UDAF, I have the following segment to ensure it has the desired
    xxx
    package in order to run:
    Copy code
    try:
        import xxx
    
    except ModuleNotFoundError:
        import sys, subprocess
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'xxx'])
    
    finally:
        import xxx
    However, I don't want to install the
    pip
    package
    xxx
    as performed above. Instead, I'd like it to be downloaded within the PyFlink application directory, and imported via a relative path. This way I could bundle up the build using some CI/CD pipeline. Before I go about this, I wanted to double check on best practices regarding importing pip packages from a local path (preferably within the PyFlink application directory).
  • s

    Sigh

    08/27/2022, 8:30 AM
    Hi, im working with flink 1.15 i see à lot of modification in the dataformat and deprecated api, i use data stream mostly, do yu suggest i use row or genericrowdata that inherit rowdata for my data structure ?
  • k

    Krish Narukulla

    08/28/2022, 5:01 AM
    Unable to run sample program using TableAPI on
    LocalEnvironment
    . Resulting exception.
    Copy code
    Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
            at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:526)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276)
            at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93)
            at com.roku.common.frameworks.flink.Sample$.main(Sample.scala:31)
            at com.roku.common.frameworks.flink.Sample.main(Sample.scala)
    Code:
    Copy code
    import org.apache.flink.table.api._
        import org.apache.flink.table.api.bridge.scala._
    
        // environment configuration
        val settings = EnvironmentSettings
          .newInstance()
          .inBatchMode()
          .build();
    
        val tEnv = TableEnvironment.create(settings);
    
        // register Orders table in table environment
        // ...
    
        // specify table program
        val orders = tEnv.from("Orders") // schema (a, b, c, rowtime)
    
        val result = orders
          .groupBy($"a")
          .select($"a", $"b".count as "cnt")
          .toDataStream
          .print()
    r
    • 2
    • 3
1...141516...98Latest