https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • e

    Emile Alberts

    06/21/2022, 11:51 AM
    Hi I'm looking at using the Flink Kubernetes Operator to manage the Flink deployments. I want to use the StatsD metrics reporter. The following configuration works:
    Copy code
    defaultConfiguration: 
      create: true
      append: true
      flink-conf.yaml: |+
        kubernetes.operator.metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
        kubernetes.operator.metrics.reporter.stsd.host: localhost
        kubernetes.operator.metrics.reporter.stsd.port: 8125
    However, I want to dynamically resolve the host using the value from the field path
    status.hostIP
    . Any suggestions about how this can be achieved?
    g
    m
    • 3
    • 28
  • z

    Zain Haider Nemati

    06/21/2022, 12:38 PM
    Hi, I am working on submitting a flink job on a yarn application cluster on AWS EMR. I am running these steps :
    Copy code
    aws emr add-steps --cluster-id j-XXXXXX --steps Type=CUSTOM_JAR,Name=Name,Jar=<s3://abc.jar>,Args="bash","-c"," /usr/lib/flink/bin/flink run-application -t yarn-application",MainClass=org.main.classname
    However it gives the following error :
    Copy code
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
    	at java.lang.Class.forName0(Native Method)
    	at java.lang.Class.forName(Class.java:348)
    	at org.apache.hadoop.util.RunJar.run(RunJar.java:237)
    	at org.apache.hadoop.util.RunJar.main(RunJar.java:158)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.functions.source.SourceFunction
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    	... 4 more
    Can someone point me in the right direction or what is wrong here?
    k
    • 2
    • 4
  • c

    Chengxuan Wang

    06/21/2022, 8:43 PM
    Hi, is it possible to have state in
    Copy code
    RichAsyncFunction
    ?
  • j

    Jeff Levesque

    06/21/2022, 11:06 PM
    I got the following SQL syntax working:
    Copy code
    sliding_window_table = table_env.sql_query('''
            SELECT
                ticker,
                MIN(price) as min_price,
                MAX(price) as max_price,
                window_start,
                window_end
            FROM TABLE(
                HOP(TABLE {1}, DESCRIPTOR(utc), INTERVAL {3}, INTERVAL {4}))
            GROUP BY
                ticker,
                window_start,
                window_end
        '''.format(
            sliding_window_alias,
            input_table_name,
            sliding_window_on,
            sliding_window_every,
            sliding_window_over
        ))
    Which produces the following output:
    Copy code
    +I[AAPL, 0.64, 99.77, 2022-06-21T10:19, 2022-06-21T18:19]
    +I[TSLA, 0.33, 99.5, 2022-06-21T10:19, 2022-06-21T18:19]
    +I[MSFT, 0.33, 99.66, 2022-06-21T10:19, 2022-06-21T18:19]
    +I[AMZN, 0.5, 99.78, 2022-06-21T10:19, 2022-06-21T18:19]
    +I[AMZN, 0.12, 99.83, 2022-06-21T10:20, 2022-06-21T18:20]
    +I[AAPL, 0.07, 99.96, 2022-06-21T10:20, 2022-06-21T18:20]
    +I[TSLA, 0.13, 99.88, 2022-06-21T10:20, 2022-06-21T18:20]
    +I[MSFT, 0.05, 99.91, 2022-06-21T10:20, 2022-06-21T18:20]
    An alternative syntax also produces identical results:
    Copy code
    sliding_window_table = table_env.sql_query("""
            SELECT
                ticker,
                MIN(price) as min_price,
                MAX(price) as max_price,
                HOP_START({2}, INTERVAL {3}, INTERVAL {4}) AS utc_start,
                HOP_END({2}, INTERVAL {3}, INTERVAL {4}) AS utc_end
            FROM {1}
            GROUP BY
                HOP({2}, INTERVAL {3}, INTERVAL {4}),
                ticker
        """.format(
            sliding_window_alias,
            input_table_name,
            sliding_window_on,
            sliding_window_every,
            sliding_window_over
        ))
    I would like to integrate some syntax to get the first and last
    price
    within the
    HOP
    range. When I try to expand either of the above sliding window (requires minor sink DDL adjustment) by adding `FIRST_VALUE()`:
    Copy code
    sliding_window_table = table_env.sql_query('''
            SELECT
                ticker,
                FIRST_VALUE(price) OVER (ORDER BY utc ASC) as first_price,
                ...
            GROUP BY
                ...
        '''.format(
            sliding_window_alias,
            input_table_name,
            sliding_window_on,
            sliding_window_every,
            sliding_window_over
        ))
    Both approaches arrive at the same error:
    Copy code
    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:159)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
    	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:104)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
    	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
    	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
    	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    
    
    Process finished with exit code 1
    Just tried the following:
    Copy code
    sliding_window_table = table_env.sql_query('''
            SELECT
                ticker,
                FIRST_VALUE(price) as first_price,
                MIN(price) as min_price,
                MAX(price) as max_price,
                window_start,
                window_end
            FROM TABLE(
                HOP(TABLE {1}, DESCRIPTOR(utc), INTERVAL {3}, INTERVAL {4}))
            GROUP BY
                ticker,
                window_start,
                window_end
        '''.format(
            sliding_window_alias,
            input_table_name,
            sliding_window_on,
            sliding_window_every,
            sliding_window_over
        ))
    Got a
    merge
    method error:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
    : org.apache.flink.table.api.ValidationException: Could not find an implementation method 'merge' in class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction' for function 'FIRST_VALUE' that matches the following signature:
    void merge(org.apache.flink.table.data.RowData, java.lang.Iterable)
    	at org.apache.flink.table.functions.UserDefinedFunctionHelper.validateClassForRuntime(UserDefinedFunctionHelper.java:280)
    	at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:464)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1222)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1222)
    	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1221)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:987)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:615)
    	at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:587)
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate.createAggsHandler(StreamExecWindowAggregate.java:217)
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate.translateToPlanInternal(StreamExecWindowAggregate.java:153)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
    	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:104)
    	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
    	at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
    	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
    	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    
    
    Process finished with exit code 1
    The above is interesting, since I've used SQL syntax yet received a Table API error.
    m
    • 2
    • 3
  • l

    licho

    06/22/2022, 2:59 AM
    why primaryKey must be a physical column?
    Copy code
    Table taskTable =
        tableEnv.fromDataStream(
            taskDS,
            Schema.newBuilder()
                .columnByExpression("taskId", "JSON_VALUE(f0, '$.taskId')")
                .columnByExpression("taskStatus", "JSON_VALUE(f0, '$.taskStatus')")
                .columnByExpression("proc_time", "PROCTIME()")
                .primaryKey("taskId")
                .build());
    I want define a table, the columns calculate from json struct data, and set a column as primarykey, only for temporal table join, but exception show
    Copy code
    Exception in thread "main" org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_taskId'. Column 'taskId' is not a physical column.
    	at org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:372)
    	at org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:340)
    	at org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:89)
    	at org.apache.flink.table.api.Schema.resolve(Schema.java:121)
    	at org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:919)
    	at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.fromStreamInternal(AbstractStreamTableEnvironmentImpl.java:144)
    	at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:180)
    	at zdpx.Simulate2.sqlStream(Simulate2.java:69)
    	at zdpx.Simulate2.main(Simulate2.java:45)
    t
    f
    • 3
    • 4
  • m

    Maciej Obuchowski

    06/22/2022, 9:42 AM
    Is there a list of which dependencies are actually provided by Flink docker images? I could find only that note; https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/configuration/overview/#running-and-packaging
    Copy code
    As a rule of thumb, we suggest packaging the application code and all its required dependencies into one fat/uber JAR. This includes packaging connectors, formats, and third-party dependencies of your job. This rule does not apply to Java APIs, DataStream Scala APIs, and the aforementioned runtime modules, which are already provided by Flink itself and should not be included in a job uber JAR. This job JAR can be submitted to an already running Flink cluster, or added to a Flink application container image easily without modifying the distribution.
    c
    s
    • 3
    • 10
  • j

    Jacob Tolar

    06/22/2022, 7:28 PM
    Hi! I’m wanting to disable Kryo in all cases except those for which I’ve registered a custom serializer (i.e.
    addDefaultKryoSerializer
    ). Would disableGenericTypes() do that, or does it disable Kryo completely?
    e
    • 2
    • 2
  • e

    Eric Sammer

    06/23/2022, 12:20 AM
    Think we found a Flink SQL planner bug. Still researching, but it appears a
    lag(x) over
    can cause an NPE when accessing RowData. It's not immediately clear to me what the smallest reproducible case is just yet. Here's a query:
    Copy code
    select
        window_time,
        window_start,
        window_end,
        req_path,
        cnt
        lag(cnt) over ( order by window_time ) as cnt_prev -- <-- problem
    from (
        select
            window_time,
            window_start,
            window_end,
            req_path,
            count(1) cnt
        from table(
            tumble(table http_events, descriptor(_time), interval '1' second)
        )
        group by window_time, window_start, window_end, req_path
    )
    Exception:
    Copy code
    java.lang.NullPointerException: null
    	at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
    	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:249)
    	at org$apache$flink$table$planner$functions$aggfunctions$LagAggFunction$LagAcc$8$Converter.toExternal(Unknown Source)
    	at org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
    	at UnboundedOverAggregateHelper$1757.setAccumulators(Unknown Source)
    	at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:219)
    	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
    	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
    	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
    	at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    I'll try and come up with a simpler repro. Just a heads up. This is Flink 1.14.2 I think. cc/ @Timo Walther
    t
    • 2
    • 7
  • f

    Francis Conroy

    06/23/2022, 6:27 AM
    Hi all, I've managed to get myself into some trouble upgrading from flink k8s operator 0.1.0 to 1.0.0. I tried to remove everything but made a mistake somewhere with the order. I keep seeing
    Error from server: request to convert CR from an invalid group/version: <http://flink.apache.org/v1alpha1|flink.apache.org/v1alpha1>
    Do I need to re-install the old CRD?
    g
    • 2
    • 41
  • y

    Yoel Benharrous

    06/23/2022, 8:29 AM
    Hi folk, I would like to implement a
    stateless
    function (I accumulate records and flush every X records, records are accumulated in a transient list in memory.) the motivation is to create batch without the need of
    keyBy
    . How can I flush records when a checkpoint barrier is received and warranty exactly-once semantic?
    • 1
    • 1
  • a

    Ananya Goel

    06/23/2022, 1:03 PM
    Hi All, I am facing an issue while trying to read array fields from parquet files which were generated via flink. I am using kafka as a source to consume protobuf messages, and then windowing the messages to generate another protobuf(with few array fields). After processing, I am using
    ParquetProtoWriters
    for writing to S3 sink. I am now querying over S3 via Athena. I am able to query over all the fields except the array fields. Getting:
    org.apache.parquet.io.PrimitiveColumnIO cannot be cast to org.apache.parquet.io.GroupColumnIO
    message. Any idea how can I resolve this?
    • 1
    • 4
  • o

    Omar Izhar

    06/23/2022, 5:13 PM
    Flink version = 1.13.1
    Copy code
    "data":	{
    		"id":	107707,
    		"shop_name":	"redacted_shopname",
    		"shop_location":	"{\"latitude\":33.6992544,\"longitude\":73.0651125}",
    		"shop_type_id":	1,
    		"created_at":	"2022-06-10T14:57:03Z",
    		"updated_at":	"2022-06-10T14:57:03Z"
    	}
    Hello all, I am trying to read messages in the above format from a Kafka source using the Flink Table API. I am having difficulty parsing the above json structure to extract
    latitude
    and
    longitude
    . I am using the following code to try to fetch id and latitude/longitude from this message.
    Copy code
    tEnv.executeSql("CREATE TABLE customer_retailer_shop_details (\n" +
            "    data ROW<`customer_id` BIGINT, `shop_location` ROW<latitude FLOAT, longitude FLOAT>>\n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = 'customer-retailer-shop-details',\n" +
            "    'properties.bootstrap.servers' = 'redacted',\n" +
            "    'scan.startup.mode' = 'earliest-offset',\n" +
            "    'format'    = 'json',\n" +
            "    'json.ignore-parse-errors' = 'true'\n" +
            ")");
    The problem is that when I print out the result,
    customer_id
    is correctly populated but
    latitude
    and
    longitude
    are null. Could this be happening due to escape characters in latitude/longitude nested field? If so, how would we deal with this in the Table API?
    e
    • 2
    • 3
  • j

    Jeff Levesque

    06/23/2022, 7:53 PM
    I was able to perform a general SQL syntax to obtain the equivalent result as earlier desired
    FIRST_VALUE
    , and `LAST_VALUE`:
    Copy code
    CREATE TABLE test
    	(id integer, session_ID integer, value integer, utc timestamp)
    ;
    	
    INSERT INTO test
    	(id, session_ID, value, utc)
    VALUES
    	(0, 2, 100, '2022-06-23 16:38:00'),
    	(1, 2, 120, '2022-06-20 16:38:00'),
    	(2, 2, 140, '2022-06-21 16:38:00'),
    	(3, 1, 900, '2022-06-11 16:38:00'),
    	(4, 1, 800, '2022-06-29 16:38:00'),
    	(5, 1, 500, '2022-06-23 12:38:00')
    ;
    
    SELECT
      jeff.first_value,
      jeff2.last_value
    FROM (
      SELECT
        id,
        value AS first_value,
        NULL AS last_value
      FROM (
        SELECT
          1 AS id,
          value,
          dense_rank() OVER (ORDER BY utc) AS first_value
        FROM test
      ) inner_jeff
      WHERE first_value = 1
    ) AS jeff
    INNER JOIN (
      SELECT
        id,
        NULL AS first_value,
        value AS last_value
      FROM (
        SELECT
          1 AS id,
          value,
          dense_rank() OVER (ORDER BY utc DESC) AS last_value
        FROM test
      ) inner_jeff2
    ) AS jeff2
    ON jeff.id = jeff2.id
    LIMIT 1;
    However, this is contingent
    DENSE_RANK
    support in PyFlink 1.13 (when I attempt to migrate the above syntax to be streaming based). When I checked the documentation, it seems to portray the same "N/A" pattern as
    FIRST_VALUE
    and `LAST_VALUE`:
    Copy code
    DENSE_RANK()	N/A	Returns the rank of a value in a group of values. The result is one plus the previously assigned rank value. Unlike the function rank, dense_rank will not produce gaps in the ranking sequence.
    I'm going to hunch that it is not supported in PyFlink 1.13. Further, alternative approaches, namely
    RANK
    , and
    ROW_NUMBER
    each seem to have "N/A" in the same documentation. If the above general SQL syntax doesn't work in PyFlink, does anyone have an alternative approach they could recommend?
    m
    • 2
    • 12
  • g

    Gaurav Miglani

    06/24/2022, 7:16 AM
    getting below error while using new planner loader in flink 1.15
  • g

    Gaurav Miglani

    06/24/2022, 7:16 AM
    Copy code
    {"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.client.program.ProgramInvocationException: The program caused an error: \n\nClasspath: [file:/tmp/flink-web-8bc46ccd-f545-474c-8605-d084950afed1/flink-web-upload/38118da6-41f1-4e0b-9bb1-c69ee9662f3a_data-streamverse-core-1.0-SNAPSHOT-jar-with-dependencies.jar]\n\nSystem.out: (none)\n\nSystem.err: (none)
    at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
    at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
    at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
    at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:159)
    at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:107)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NoClassDefFoundError: org/apache/commons/math3/util/ArithmeticUtils
    at org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil$.mergeMiniBatchInterval(FlinkRelOptUtil.scala:439)
    at org.apache.flink.table.planner.plan.rules.physical.stream.MiniBatchIntervalInferRule.onMatch(MiniBatchIntervalInferRule.scala:81)
    at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
    at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
    at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
    at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
    at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
    at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
    at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    ✅ 1
    m
    c
    • 3
    • 22
  • e

    Emile Alberts

    06/24/2022, 8:38 AM
    Hi all I'm using the Flink Delta connector to write to a Delta table. However, I'm having an issue assuming an AWS role. I've added the credentials provider to the Hadoop configuration:
    Copy code
    val hadoopConfig = Configuration()
    
            hadoopConfig.set(
                "fs.s3a.aws.credentials.provider",
                "com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
            )
            hadoopConfig.set("fs.s3a.assumed.role.arn", "<role-arn>")
    
            val deltaSink = DeltaSink.forRowData(
                Path(sinkConfiguration.deltaTablePath),
                hadoopConfig,
                rowType,
            ).withPartitionColumns("column1", "column2")
                .build()
    
            dataStream.sinkTo(deltaSink)
    However, the role is not assumed then. The
    AWS_ROLE_ARN
    and
    AWS_WEB_IDENTITY_TOKEN_FILE
    environment variables are added. It works fine when I'm using the AWS credentials. I'm using Flink
    1.15.0
    and version
    0.4.1
    of the Flink Delta connector. I've also added the
    flink-s3-fs-hadoop-1.15.0.jar
    to
    $FLINK_HOME/plugins/s3/
    . What am I missing to be able to use the
    WebIdentityTokenCredentialsProvider
    ? Thank you in advance for the assistance.
    ✅ 1
    m
    • 2
    • 7
  • e

    Emily Morgan

    06/24/2022, 8:57 AM
    Hey there 🙂 I have Flink running locally, and I had one Job Manager running one job, with one Task Manager slot occupied, and a default parallelism on the Job Manager set to 1. The job was consuming some input from kafka (also running locally), processing that and outputting it to another topic on the same kafka instance. The job worked perfectly for the first 6hours, and then I got:
    The heartbeat of JobManager with id [] timed out.
    Disconnect job manager [] for job [] from the resource manager.
    The heartbeat of TaskManager with id [] timed out.
    Closing TaskExecutor connection [] because: The heartbeat of TaskManager with id [] timed out.
    and finally
    Could not fulfill resource requirements of job []. Free slots: 0
    and the job fails. Note that I don't have checkpointing enabled, and I can't see any out of memory errors in the logs either. Any ideas on what the issue might be? Thanks 🙏
    ✅ 1
    c
    • 2
    • 16
  • j

    Jeesmon Jacob

    06/24/2022, 8:09 PM
    In kubernetes operator, how do I override command and args of jobManager if I'm not using stock flink image? I tried to use podTemplate to override it on top level as well as inside jobManager but doesn't seem to be working. So wanted to check. Thanks.
    a
    • 2
    • 4
  • s

    shikai ng

    06/26/2022, 3:34 PM
    Hi all, has anyone tried to use Flink Reactive Scaling with ContinuousFileMonitoring before? seems that on scale and job restart, ContinuousFileMonitoring task would always fail to restart receiving exception
    Copy code
    INITIALIZING to FAILED with failure cause: java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction retrieved invalid state.
    	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
    I assumed this was parallelism previously and have set maxParallelism to 1 (which reactive mode should respect) but to no avail? Does anyone have any suggestions? 🙂 🙏
    r
    • 2
    • 8
  • c

    czchen

    06/27/2022, 2:11 PM
    We found that flink-kubernetes-operator v1.0.0 does not reload new certificate when updated by cert-manager, and it causes the following error when updating FlinkDeployment CRD.
    Copy code
    Failed sync attempt to 597d35a7434bede526f526852c33a65262765219: one or more objects failed to apply, reason: Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.flink-operator.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority (possibly because of "x509: invalid signature: parent certificate cannot sign this kind of certificate" while trying to verify candidate authority certificate "FlinkDeployment Validator") (retried 3 times).
    This problem can be solved by rollout webhook-service. However, it would be better if operator itself and reload certificate automatically.
    gratitude thank you 1
    m
    • 2
    • 5
  • a

    Aitozi

    06/27/2022, 2:12 PM
    Hi community, In the doc https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/deduplication/, It shows that the deduplication (using
    ROW_NUMBER()
    ) supports the batch and stream modes, but My job using the same query run with
    "execution.runtime-mode": "batch"
    do not translate to a Deduplicate operator but a
    Sort
    operator. After checking the code, it only have a rule for
    StreamPhysicalDeduplicateRule
    and there is not a similar rule for batch. Is it a misleading in the doc?
  • k

    Krishna Chaithanya M A

    06/27/2022, 8:57 PM
    Hi #C03G7LJTS2G , we are unable to delete a deployment , we are seeing the below error. I tried to check online but couldn't get a solution 2022-06-27 191521,587 o.a.f.k.o.c.FlinkDeploymentController [INFO ][supplychain-nwsc/nwsc-stream-processor-dev01] Deleting FlinkDeployment 2022-06-27 191521,602 i.j.o.p.e.ReconciliationDispatcher [ERROR][supplychain-nwsc/nwsc-stream-processor-dev01] Error during event processing ExecutionScope{ resource id: CustomResourceID{name='nwsc-stream-processor-dev01', namespace='supplychain-nwsc'}, version: 53209979} failed. java.lang.RuntimeException: Cannot create observe config before first deployment, this indicates a bug. at org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137) at org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357) at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56) at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107) at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68) at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50) at io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34) at io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:49) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:252) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:72) at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:50) at io.javaoperatorsdk.operator.processing.event.EventProcessor$ControllerExecution.run(EventProcessor.java:349) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) Can somebody please help me here
    a
    j
    +2
    • 5
    • 21
  • k

    Kishore Pola

    06/28/2022, 4:13 AM
    Hi all, Do see that couple of folks have worked on similar use cases. Any help is highly appreciated. We do have a Beam (Java) job which is currently running on dataflow. In the process of PoC to migrate to flink. No code modifications to the bundled jar (only additional dependency added was beam-runners-flink-1.13 along with beam-runners-google-cloud-dataflow-java). Using flink kubernetes operator, tried deploying the job with the below yaml. JobManager pod is getting started. Task Manager pods are not coming up. There is no job submission from Job Manager. No errors/warnings in Job Manager or operator logs.
    flink-test-deployment-outside.yaml
    j
    • 2
    • 4
  • t

    Theo Diefenthal

    06/28/2022, 7:02 AM
    In my application logs reading from and writing to kafka with exactly once enabled, after my upgrade to Flink 1.14, I find a lot of those
    INFO
    log messages:
    ProducerId set to -1 with epoch -1
    and
    ProducerId set to 2359014 with epoch 0
    . Investigating a bit into this shows me that those log events are no errors, can be ignored and indeed are on purpose polluting my logs 🙂 In an old flink mailing list post Tzu Li (Gordon) wrote:
    With that in mind, when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint. When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled. So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer.
    which more or less describes what I see in Flink 1.14 codebase despite being an older post: For each new checkpoint, Flink will reuse a producer, but assigns a new transaction.id leading to reinitializing a new transaction-id (producer-broker roundtrip) and assigning a new producerId and epoch. My question is: Why does flink do this? From a Confluent blogpost about transactions, the "default" way to use transactions seems to assign a transactional.id once with application start and keep it from then on. I understand that the pattern used by flink works as well, but is there a specific purpose on why creating fully new transactional ids with each checkpoint having a fully runnig application? Was it easier to implement? Or are there details requiring flink to work that way? Is there a design doc or something where thoughts around all the internal stuff with regards to the Flink Kafka source/writer are shared?
    m
    b
    • 3
    • 19
  • g

    Gaurav Miglani

    06/28/2022, 10:57 AM
    does flink kubernetes operator support jar from s3, the documentation is bit confusing, there is no example to submit custom jar(docker image), https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/, can anyone help here
    m
    h
    • 3
    • 16
  • t

    Trystan

    06/28/2022, 6:13 PM
    hi everyone! we have a job that write many files to s3 for a sink, some of which may be large. on checkpoint, we often see throughput drop to a bare minimum for several minutes. the checkpoint itself (according to flink metrics) is reasonably fast - 20-30s. the job is kafka -> some stateless tranformations -> s3 sink. we use presto for the checkpoint storage and hadoop/s3a for the s3 sink as suggested. swapping the sink to a local filesystem completely eliminates the stall. we’ve tried tuning a bunch of s3a settings, as well as different checkpoint configurations, but so far haven’t had much luck. we are suspicious that, during checkpoint, the filesystem sink which translates the in-progress files to a finished file relies on the s3 “rename” mechanism… which is not so much a rename as a
    copy & delete
    , and that this mechanism is slow in s3 itself. are there any obvious metrics to confirm this? any configurations that might speed this up, or any suggestions to look for? at this point, we are leaning towards not using flink to sink to s3 at any sort of scale … but surely we are missing something 🙂
    a
    s
    • 3
    • 8
  • k

    Kishore Pola

    06/28/2022, 8:07 PM
    Hi Everyone, With Operator when I specify my GCP service account in spec as
    serviceAccount: <mailto:stream-test-sa@test-project.iam.gserviceaccount.com|stream-test-sa@test-project.iam.gserviceaccount.com>
    deployment is failing with error
    Copy code
    org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: <https://x.y.z.1/apis/apps/v1/namespaces/default/deployments>. Message: Deployment.apps "flink-cluster-test-097" is invalid: spec.template.spec.serviceAccountName: Invalid value: "<mailto:stream-test-sa@test-project.iam.gserviceaccount.com|stream-test-sa@test-project.iam.gserviceaccount.com>": a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. '<http://example.com|example.com>', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*').
    I think serviceAccount is copied to serviceAccountName
    m
    • 2
    • 4
  • s

    Sergey Postument

    06/29/2022, 8:43 AM
    Hi, noticed unexpected behavior when using side-outputs
    Copy code
    // SIDE OUTPUT DOESNT WORK
    
    val enrichedPacketData: DataStream[EnrichedData] = rawPackets
      .keyBy(SOME_KEY_HERE)
      .connect(SOME_CONNECT_EXP_HERE)
      .process(new AssetDataEnrichment)
      .map(_.packetAssetData.get)
    
    val packetsNoStateData = enrichedPacketData
          .getSideOutput(new OutputTag[AssetStatusUpdateStreamMessage]("no-state-tags-output"))
    
    
    // WORKS GOOD
    
    val enrichedPacketData: DataStream[EnrichedData] = rawPackets
      .keyBy(SOME_KEY_HERE)
      .connect(SOME_CONNECT_EXP_HERE)
      .process(new AssetDataEnrichment)
    
    val packetsNoStateData = enrichedPacketData
          .getSideOutput(new OutputTag[AssetStatusUpdateStreamMessage]("no-state-tags-output"))
    the only diff is having
    .map(_.packetAssetData.get)
    I would be very grateful if someone could explain why this works 🙏 did not find information in the documentation ..
    d
    • 2
    • 3
  • s

    Saket Sinha

    06/29/2022, 1:40 PM
    Hi Team --> Looking for some help on Integration with Flink StateFunction DataStream SDK with python remote flink state function
    m
    • 2
    • 9
  • g

    Gaurav Miglani

    06/29/2022, 2:06 PM
    can we communicate with flink kubernetes operator via api or kubectl is only option, I was not able to find documentation for same, but flink internally use k8 client, is it only for native k8 integration ?
    g
    • 2
    • 42
12345...98Latest