https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • c

    chunilal kukreja

    11/01/2022, 11:10 AM
    hi Team, I have a use case which seems to get solved by using the hybrid source but I am bit concerned/confused in “switchTimestamp” implementation. Basically the historic data that i need to load first post that kafka source will come into action. If historic data loading takes more time than kafka retention period in that case I will loose the delta events due to this. how can I take care of this edge case?
  • t

    Tiansu Yu

    11/01/2022, 11:16 AM
    What happens when you connect too streams --- stream A with
    WatermarkStrategy.noWatermarks()
    stream B with
    boundedOutOfOrderness()
    , does the result stream inherit timestamp and watermark from stream B entirely or the watermark wont be advanced because of stream A does not have a watermark?
    d
    • 2
    • 3
  • j

    Jin S

    11/01/2022, 3:16 PM
    Hi all, will there be no minor version between Flink 1.17 and Flink 2.0? Is Flink 2.0 planned for end of 2023 or latest by 2024 then? I can see Flink 1.17 is scheduled for March 2023 from this page, but unsure about Flink 2.0 I’m trying to encourage teams who are using the Scala API to change to the Java API since there’s already a consensus to remove the former on Flink 2.0. 🙂 Thank you.
    c
    • 2
    • 1
  • a

    Alexander Fedulov

    11/01/2022, 3:24 PM
    1.16 introduced quite a lot of changes with respect to classloading in the Table API. The way UDFs could previously be loaded is now broken. Doing
    Copy code
    Class<? extends UserDefinedFunction> clazz = (Class<? extends UserDefinedFunction>) Class.forName(f.getClassName(), true, loader);
    tableEnvironment.createTemporarySystemFunction(f.getName(), clazz);
    now leads to
    java.lang.ClassNotFoundException
    . I tried passing the same loader while initializing the
    TableEnvironment
    using the newly added method, but it did not help. Any hints?
    c
    • 2
    • 15
  • s

    Sachin Saikrishna Manikandan

    11/01/2022, 4:51 PM
    Hello team, when upgrading from Flink 1.15.2 to 1.16.0, we see that metrics are not being exposed via the 9250 port. Is that a known problem and is there a fix for this?
    ✅ 1
    c
    s
    • 3
    • 4
  • a

    Adrian Chang

    11/01/2022, 7:03 PM
    Hello team I am using Flink 1.15.2 on Flink Kubernetes Operator 1.2 and trying to expose metrics to Prometheus by setting
    Copy code
    defaultConfiguration:
      create: true
      append: true
      flink-conf.yaml: |+
        # Prometheus
        metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
        metrics.reporter.prom.port: 9250-9260
    In the logs I see these lines
    Copy code
    Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
    Loading configuration property: $internal.application.main, org.apache.flink.client.python.PythonDriver
    Loading configuration property: metrics.reporter.prom.port, 9250-9260
    Started PrometheusReporter HTTP server on port 9250.
    Reporting metrics for reporter prom of type org.apache.flink.metrics.prometheus.PrometheusReporter.
    However the JobManager and TaskManager pods are not exporting the port 9250 Do I need to set something else to open the port in which Prometheus will read the metrics ?
    y
    • 2
    • 3
  • j

    Jason Politis

    11/01/2022, 8:05 PM
    Good evening everyone. Our clients data has some bad dates that break our sql queries when trying to date_format. We are getting DayOfYear error
    Copy code
    java.time.DateTimeException: Field DayOfYear cannot be printed as the value 269 exceeds the maximum print width of 2
    We are trying to identify this bad data. I'm in Ververica now, running a sql preview. There are only 2k records in this table. Is there an easy way to export the results of sql preview so I can analyze the data in excel or something?
    • 1
    • 1
  • j

    Jirawech Siwawut

    11/02/2022, 2:59 AM
    Hello. I would like to do batch operation after tumbling window. It seems that flatmap works on record level after window aggregation. What i am trying to do is to flush every record after window close all together to external storage, not just single record each time. How to do that?
    d
    • 2
    • 1
  • s

    Sachin Saikrishna Manikandan

    11/02/2022, 10:26 AM
    Hello all, I am trying to understand how watermarks break with unaligned checkpoints. I did go through the docs but still having trouble understanding. Let us say, I have a TumblingWindowOperator that works with a watermark, in which scenarios or how, will unaligned checkpoints break the semantics.
    d
    b
    • 3
    • 11
  • a

    Aviv Dozorets

    11/02/2022, 11:05 AM
    Hi, I hope it’s not a stupid question- Let’s say I have a Flink application (1.14), runs datastream API, unbounded (source/sink kafka), 5 operators, all share same parallelism across multiple TM nodes,
    forwarding
    between them, no rebalancing or shuffling. Do I have the way to ensure that all of tasks run on the same TM node ? Or understand how tasks are spread between TM ? thanks
    w
    • 2
    • 4
  • r

    Raghunadh Nittala

    11/02/2022, 11:20 AM
    Hello Team, We have a DataStream created from a kafka topic and we are planning to use TableStream API for tumbling windows of 1 hour as we don’t have a complex logic. We finally want to sink the windowed data to S3. We are referring to https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem and the doc says the <sink.rolling-policy.file-size> is of Type <MemorySize>.My question is, while the data gets buffered while windowing for hour, does the part files get saved to RocksDB, when RocksDBStateBackend is enabled?
    c
    • 2
    • 4
  • m

    Matteo De Martino

    11/02/2022, 3:05 PM
    Hello all, I am sure this is dumb question, but I am a bit stuck. I am trying to read a simple csv file containing DB rows using Flink SQL. This is the simplified
    CREATE TABLE
    Copy code
    CREATE TABLE SOME_TABLE
      (
          id                    INT NOT NULL,
          type                  STRING,
          ...
          ...
    )
    WITH(
        'connector' = 'filesystem',
        'path' = 'file:///path/to/file.csv',
        'format' = 'csv',
        'csv.ignore-parse-errors' = 'true',
        'csv.allow-comments' = 'true'
    );
    I also added
    flink-csv
    as a dependency. But when I try to
    executeSql
    the above I get:
    Copy code
    org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    What am I missing? My understanding is that there is no explicit dependency to add for the filesystem connector to work. Am I wrong? Thanks 🙇
    • 1
    • 1
  • y

    Yaroslav Bezruchenko

    11/02/2022, 4:21 PM
    Hey, I have quite big data stream with 10+ functions, question is what is the best approach to write different types of outputs to different topics within one stream. Can I add multiple kafka sinks in one stream? If not, how can I add separate kafka sink to side output tag?
    s
    • 2
    • 1
  • r

    Rashmin Patel

    11/02/2022, 4:23 PM
    Hello All We have observed an issue with flink kafka connector, where in if source kafka topic gets deleted, even then flink streaming job keeps on running with below warning messages.
    Copy code
    2022-11-02 10:21:38,370 WARN  org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=dp_flink_streaming_1664860215310-0, groupId=dp_flink_streaming_1664860215310] Error while fetching metadata with correlation id 5244076 : {loan_account_service.public.loan_accounts=UNKNOWN_TOPIC_OR_PARTITION}
    Is this ideal behaviour or should we expect the job to fail ?
  • j

    Jeesmon Jacob

    11/02/2022, 4:32 PM
    Hi team, is it possible to use
    kubernetes.operator.job.restart.failed
    only for certain FlinkDeployments? Or is it a global flag that applied to all deployments? https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#restart-failed-job-deployments
    y
    • 2
    • 3
  • k

    Krish Narukulla

    11/02/2022, 5:49 PM
    Has anyone integrated flink with data hub for data lineage? Does it work seamlessly? Spark: https://datahubproject.io/docs/metadata-integration/java/spark-lineage/
    m
    • 2
    • 1
  • a

    Amir Halatzi

    11/02/2022, 10:41 PM
    Hey all! I have a question about autoscaling Flink in K8s: does the Flink job supposed to restart every time I scale in/out? Am I missing some configuration here? because I was expecting a more of ‘scale and rebalance’ kind of flow - not a complete restart…
    s
    • 2
    • 2
  • h

    Huib

    11/02/2022, 11:02 PM
    Hi! I’m playing with Flink to see if it could be a match made in heaven for our (mostly ML-related) use-cases. We’re currently using spark and home-grown solutions in Python and C#, but that stuff gets annoying, slow and expensive quickly. So far I’m really digging the speed and resource usage of Flink, and the ability to write simple SQL to express complex ideas. Very nice. With that out of the way, what would be the easiest way to do the equivalent of
    max(value) over (partition by window(time, 15 minutes), some_id order by time asc)
    ? The goal is to get a “rolling max” that updates whenever new data comes in, but limits itself to 15 minute windows. Data will arrive mostly in-order per key, but there are more keys than partitions in Kafka (Eventhub). Data that arrives more than (say) 5 minutes too late isn’t relevant any more, but I can’t really delay the results. Is this even possible with SQL?
    m
    • 2
    • 19
  • s

    Shubho

    11/03/2022, 6:07 AM
    HI All, I am getting error on using the TCP startup probe. Using the port 6123. Error Msg: "Startup probe failed: dial tcp xx.x.x.x6123 connect: connection refused" please let us know if we can use this port 6123 for Job-Manager and 6122 for Task-Manager for the startup probe, if not for TCP startup Probe which port can we use. Thanks in Advanced.
  • s

    Slackbot

    11/03/2022, 1:14 PM
    This message was deleted.
    m
    r
    • 3
    • 2
  • a

    Abhinav sharma

    11/03/2022, 2:12 PM
    Hi, I am using a springboot application and configured an API endpoint which triggers a flink program to aggregate the data. Its a datastream API so I want a way to get the aggregated results whenever I call the API. The api should keep running in the background and flink will keep aggregating the results, whenever I hit the endpoint I must have the results. Is that possible?
    s
    • 2
    • 2
  • r

    Raghunadh Nittala

    11/03/2022, 2:28 PM
    Hello Team, I’m trying to create Sink my Kafka stream to FileSystem using TableStream API. I’m using the below SQL:
    val createDDl = "CREATE TABLE sink_table (" +
    " sampleId STRING NOT NULL," +
    " location STRING NOT NULL, " +
    " dt STRING NOT NULL, " +
    " hour STRING NOT NULL)" +
    " PARTITIONED BY (sampleId, location, dt, hour)" +
    " WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '$filePath'," +
    "  'format' = 'parquet'," +
    "  'partition.time-extractor.class' = 'HourPartitionTimeExtractor::class.java'," +
    "  'sink.rolling-policy.rollover-interval' = '5 m', " +
    "  'sink.partition-commit.trigger'='partition-time', " +
    "  'sink.partition-commit.policy.kind' = 'success-file' " +
    ")"
    tableEnv.executeSql(createDDL)
    I’m using a query from another table to populate this using
    INSERT INTO.. SELECT
    syntax. I’m trying to use the example given in Flink doc for HourPartTimeExtractor:
    Copy code
    public class HourPartTimeExtractor implements PartitionTimeExtractor {
        @Override
        public LocalDateTime extract(List<String> keys, List<String> values) {
            String dt = values.get(0);
            String hour = values.get(1);
    		return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
    	}
    }
    However, I’m not able to generate the partitions in desired way. I think the class itself is not being called. Do I need to register this as UDF? using something like
    createTemporarySystemFunction
    ?
  • s

    Sevvy Yusuf

    11/03/2022, 3:56 PM
    Hi Team, I noticed per these docs that we should be using numRestarts vs fullRestarts. I don't see these metrics being exported by default in our jobs, do they have to be enabled separately similar to job state metrics?
    c
    s
    • 3
    • 11
  • s

    Sachin Saikrishna Manikandan

    11/03/2022, 4:03 PM
    Are there any good examples of using Async IO to make database calls and use a Guava/Caffeine cache on top of that?
    d
    k
    • 3
    • 5
  • h

    Hannah Hagen

    11/03/2022, 10:57 PM
    hello, I just upgraded to flink 1.16 and am running into this error
    ModuleNotFoundError: No module named '_lzma'
    whenever I run simple scripts. Is anyone else encountering this? I get this error when running a tutorial job:
    /opt/flink/bin/flink run --python examples/python/table/word_count.py
    I am running the code on a docker container using an image nearly the same as this (mine just installs a few additional things). I'm running Flink 1.16.0 and Python-3.7.9 thanks in advance for any help!
    x
    • 2
    • 4
  • m

    Maryam

    11/04/2022, 12:03 AM
    Hi 👋, my team is exploring the Table and SQL APIs and noticed a discrepancy in Optimized Execution Plan when writing a job that uses a
    tumbling window aggregate query
    following the examples in The Table API (ref) and SQL (ref). The Table API execution plan is using
    GroupWindowAggregate
    which is deprecated but execution plan for SQL is using
    GlobalWindowAggregate
    . I am using flink 1.15. Is there a reason the table API is using the deprecated feature? here are the queries and their plan:
    Copy code
    val windowAgg = jTable
          .window(Tumble over 60.seconds() on $"rowtime" as "w") 
          .groupBy($"currency", $"w") 
          .select(
            $"currency", $"totalPrice".sum as "totalSales"
          )
    
    //    == Optimized Execution Plan ==
    //    Calc(select=[currency, EXPR$0 AS totalSales])
    //    +- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w, rowtime, 60000)], select=[currency, SUM(totalPrice) AS EXPR$0])
    Copy code
    val windowAggSQL = tableEnv.sqlQuery(
          s"""
             |Select currency, SUM(totalPrice) AS totalSales
             |From TABLE(
             | TUMBLE(TABLE jtable, DESCRIPTOR(rowtime), INTERVAL '60' SECONDS))
             |Group By currency, window_start, window_end
             |""".stripMargin
        )
    
    //    == Optimized Execution Plan ==
    //    Calc(select=[currency, totalSales])
    //    +- GlobalWindowAggregate(groupBy=[currency], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[currency, SUM(sum$0) AS totalSales, start('w$) AS window_start, end('w$) AS window_end])
    m
    • 2
    • 3
  • s

    shuaiqi xu

    11/04/2022, 3:22 AM
    hi, I used rocksdb as the checkpoint in the flink application. After running it for more than ten days, it was abnormal and reported an error What should be done about it
  • s

    shuaiqi xu

    11/04/2022, 3:22 AM
    Copy code
    Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /tmp/flink-io-75b93805-1057-4177-842a-2279ead561a8/job_118acccbcd2559d9acdfc8be402e693a_op_IntervalJoinOperator_10766d48b60612eb868124fba19e114a__1_1__uuid_2eae3845-9d46-4c66-9b8c-3359bb9b17bd/db/OPTIONS-000014: No such file or directory
  • k

    Krish Narukulla

    11/04/2022, 4:16 AM
    Unit tests failing on master after importing codebase to Intellij. https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/. Configured JDK version 8.
    Copy code
    [ERROR] org.apache.flink.core.fs.LimitedConnectionsFileSystemDelegationTest.testDelegateOutStreamMethods  Time elapsed: 0.013 s  <<< ERROR!
    org.mockito.exceptions.base.MockitoException: 
    
    Mockito cannot mock this class: class org.apache.flink.core.fs.FSDataOutputStream.
    
    If you're not sure why you're getting this error, please report to the mailing list.
    
    
    Java               : 17
    JVM vendor name    : <http://Amazon.com|Amazon.com> Inc.
    JVM vendor version : 17.0.4.1+9-LTS
    JVM name           : OpenJDK 64-Bit Server VM
    JVM version        : 17.0.4.1+9-LTS
    JVM info           : mixed mode, sharing
    OS name            : Mac OS X
    OS version         : 12.6.1
    
    
    You are seeing this disclaimer because Mockito is configured to create inlined mocks.
    You can learn about inline mocks and their limitations under item #39 of the Mockito class javadoc.
    
    Underlying exception : org.mockito.exceptions.base.MockitoException: Could not modify all classes [interface java.io.Closeable, class java.lang.Object, class org.apache.flink.core.fs.FSDataOutputStream, interface java.io.Flushable, class java.io.OutputStream, interface java.lang.AutoCloseable]
            at org.apache.flink.core.fs.LimitedConnectionsFileSystemDelegationTest.testDelegateOutStreamMethods(LimitedConnectionsFileSystemDelegationTest.java:168)
            at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
            at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
            at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
            at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
            at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
            at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
            at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
            at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
            at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
            at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
            at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
            at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
            at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
            at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
            at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
            at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
            at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
            at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
            at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
            at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
            at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
            at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
            at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
            at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
            at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
            at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
            at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.lambda$execute$1(JUnitPlatformProvider.java:199)
            at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
            at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:193)
            at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
            at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:120)
            at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
            at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
            at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
            at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
    Caused by: org.mockito.exceptions.base.MockitoException: Could not modify all classes [interface java.io.Closeable, class java.lang.Object, class org.apache.flink.core.fs.FSDataOutputStream, interface java.io.Flushable, class java.io.OutputStream, interface java.lang.AutoCloseable]
            at net.bytebuddy.TypeCache.findOrInsert(TypeCache.java:152)
            at net.bytebuddy.TypeCache$WithInlineExpunction.findOrInsert(TypeCache.java:365)
            at net.bytebuddy.TypeCache.findOrInsert(TypeCache.java:174)
            at net.bytebuddy.TypeCache$WithInlineExpunction.findOrInsert(TypeCache.java:376)
            ... 42 more
    c
    • 2
    • 2
  • s

    Sumit Nekar

    11/04/2022, 5:20 AM
    Hello Folks, We got hit by the following issue with flink 1.13.6 https://issues.apache.org/jira/browse/FLINK-22014 Is this still open? bug is marked as closed. Any suggestions to avoid this in future?
    m
    • 2
    • 3
1...293031...98Latest