https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Slackbot

    11/30/2022, 5:07 AM
    This message was deleted.
    d
    • 2
    • 1
  • r

    Rashmin Patel

    11/30/2022, 11:07 AM
    Hello All I want to use the
    org.apache.flink.streaming.api.operators.TimestampedCollector
    in KeyedCoProcessFunction. How can I achieve this ? Use-case is that I am joining two streams L and R and I always want joined record to be emitted with timestamp of left record. Is there an alternate way to achieve this ?
    👀 1
    u
    • 2
    • 3
  • i

    Ian Raievskyi

    11/30/2022, 2:47 PM
    Hello folks. We encountered such issue during loading job from savepoint using Flink Operator: So in two words it starts to GET jar from our NEXUS than fails with exception (see exception_log.md)
    Copy code
    2022-11-29 15:33:57,428 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Unhandled exception.
    java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /tmp/flink-web-78e41ccf-c3c0-4b5e-b812-818ebab6fde9/flink-web-upload/050ac65e-2aa1-43b5-8ccc-18b7756d7a08_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar does not exist
    Than it makes some retries with no result, and finally it downloads JAR again and starts fine, but without restoring from savepoint (starts to reprocess all events). We have two job-managers deployed and from those we found that: • Manager A has this list of files in folder `/flink-web-78e41ccf-c3c0-4b5e-b812-818ebab6fde9`:
    Copy code
    -rw------- 1 flink flink 123914717 Nov 29 15:33 2b0e0bc2-c602-4e0c-b6e9-8b0c477d885f_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
    -rw------- 1 flink flink 123914717 Nov 29 15:34 5966eb7a-47f4-4ea2-bcd5-87c32f9f9da4_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
    -rw------- 1 flink flink 123914717 Nov 29 15:43 adcfa6f7-c47a-452e-a163-1358568f83b6_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
    • Manager B has list of files in folder `/flink-web-613318f8-ee25-4b67-890d-6f95db4dee18`:
    Copy code
    -rw------- 1 flink flink 123914717 Nov 29 15:33 050ac65e-2aa1-43b5-8ccc-18b7756d7a08_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
    It seems like it wants to load JAR file with path of Job Manager B from Job Manager A and fails. After that it downloads JAR again (
    5966eb7a-47f4-4ea2-bcd5-87c32f9f9da4_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
    ) and starts normally, but without loading from savepoint After that we just redeployed the Job again and it went smooth loading from savepoint on the first run Thank you in advance for help
    exception_log.md
    l
    • 2
    • 1
  • k

    Krish Narukulla

    11/30/2022, 3:11 PM
    Do flink SQL supports "UPDATE statements` ?
    Copy code
    Exception in thread "main" org.apache.flink.table.api.TableException: Unsupported query: UPDATE  `catalog.test1` set `some_int` = 6 
            at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:107)
            at java.util.Optional.orElseThrow(Optional.java:290)
            at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:107)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
    m
    d
    • 3
    • 25
  • b

    Bhupendra Yadav

    11/30/2022, 3:30 PM
    Hi everyone. Background: We are exploring Flink SQL API to JOIN two tables and do a GROUP BY. Query below(simplified):
    Copy code
    SELECT t1.c1, JSON_ARRAYAGG(t2.c2) FROM t1 LEFT OUTER JOIN t2 GROUP BY t1.c1;
    Tables t1 & t2 are fetched using
    filesystem
    connector from an S3 file in CSV format. Column t1.c1 is a primary key and has one to many relationship with table t2. Issue: One of the tables(t2) is of large size(more than 4GB) and can't fit into Taskmanager memory. When we try to JOIN, we get a Heap Out of Memory error. But if we give large enough memory(say 8GB) to Taskmanager then this join works but increasing task manager memory as our table grows is not feasible for us. OOM Stack trace for reference:
    Copy code
    java.lang.OutOfMemoryError: Java heap space
    Dumping heap to /opt/tmp/heapdump.bin ...
    2022-11-30 11:34:30,200 [HashJoin[5] -> Sort[6] -> Calc[7] -> SortAggregate[8] -> ConstraintEnforcer[9] -> Sink: Filesystem (1/1)#0] WARN  [o.a.f.r.t.Task] {} - HashJoin[5] -> Sort[6] -> Calc[7] -> SortAggregate[8] -> ConstraintEnforcer[9] -> Sink: Filesystem (1/1)#0 (7a97e08f5d024c5c720da026005697e2_4bf7c1955ffe56e2106d666433eaf137_0_0) switched from RUNNING to FAILED with failure cause: java.lang.OutOfMemoryError: Java heap space
    	at java.base/java.lang.AbstractStringBuilder.<init>(Unknown Source)
    	at java.base/java.lang.StringBuilder.<init>(Unknown Source)
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:455)
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:85)
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3748)
    	at org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson(SqlJsonUtils.java:104)
    	at org.apache.flink.table.runtime.functions.aggregate.JsonArrayAggFunction.getValue(JsonArrayAggFunction.java:137)
    	at SortAggregateWithKeys$58.processElement(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    So I was wondering if there's a way to Join the table without loading everything in memory upfront? Is there a way I can optimize my query? Will appreciate any suggestions or docs/reference articles. Thank you!
    m
    • 2
    • 5
  • m

    Momir Beljic

    11/30/2022, 4:10 PM
    Hi, I wonder how it would be possible to group data on non-event or time based key and average data stream based on it. I would like to have window of size three that slides every three rows. Then, I would group based on the name and average the price values associated with that name in the collection below. This is code:
    Copy code
    ds = env.from_collection(
            collection=[
                (Instant.of_epoch_milli(1000), 'Alice', 110.1),
                (Instant.of_epoch_milli(4000), 'Bob', 30.2),
                (Instant.of_epoch_milli(3000), 'Alice', 20.0),
                (Instant.of_epoch_milli(2000), 'Bob', 53.1),
                (Instant.of_epoch_milli(5000), 'Alice', 13.1),
                (Instant.of_epoch_milli(3000), 'Bob', 3.1),
                (Instant.of_epoch_milli(7000), 'Bob', 16.1),
                (Instant.of_epoch_milli(10000), 'Alice', 20.1)
            ],
            type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.FLOAT()]))
    table = t_env.from_data_stream(
            ds,
            Schema.new_builder()
                  .column_by_expression("ts", "CAST(f0 AS TIMESTAMP(3))")
                  .column("f1", DataTypes.STRING())
                  .column("f2", DataTypes.FLOAT())
                  .watermark("ts", "ts - INTERVAL '3' SECOND")
                  .build()
        ).alias("ts", "name", "price")
    
        # define the sink
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                           .schema(Schema.new_builder()
                                   .column('name', DataTypes.STRING())
                                   .column('price', DataTypes.FLOAT())
                                   .build())
                           .build())
    table = table.window(
            Slide.over("3.rows").every("3.rows").on(col("name")).alias("w")) \
            .group_by(col("w")) \
            .select(col('name'), col('price').avg)
        
    
        # submit for execution
        table.execute_insert('sink') \
             .wait()
    But this returns this exception:
    Copy code
    table = table.window(
    py4j.protocol.Py4JJavaError: An error occurred while calling o121.select.
    : org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
            at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)
            at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)
            at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)
            at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)
            at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:257)
            at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:823)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:566)
            at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
            at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
            at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
            at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Would you please help me to find the solution? Thank you!
    u
    • 2
    • 2
  • p

    Prasanth Kothuri

    11/30/2022, 4:11 PM
    Hi All, I am trying to update our code for flink 1.16, I am mainly looking at moving from FlinkKafkaConsumer011 -> KafkaSource , the messages in the kafka topic are json, how can I properly deserialise and apply schema ? so far I tried the following with no luck
    Copy code
    val scanSource = KafkaSource.builder[scanInput]
          .setProperties(properties)
          .setTopics(config.getInputScanTopic)
          .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))
          .build()
     
        val scanStream = env
          .fromSource(
            scanSource,
            WatermarkStrategy
            .forBoundedOutOfOrderness[scanInput](Duration.ofSeconds(60))
            .withIdleness(Duration.ofSeconds(300))
            .withTimestampAssigner(new SerializableTimestampAssigner[scanInput] {
              override def extractTimestamp(element: scanInput, recordTimestamp: Long): Long = fixDate.makeInstant(element.timestamp).toEpochMilli
            }), "ReadScanEvents")
          .filter(_!= null )
          .filter(x => x != null && x.ooiInstanceIdentifier != null && x.fullQualifiedName != null && x.timestamp != null && fixDate.isInstant(x.timestamp))
    errors
    Copy code
    found   : org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
     required: org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema[com.leidos.mosiac.tmdataaggregations.scanInput]
          .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))
    m
    • 2
    • 5
  • l

    Leon Xu

    11/30/2022, 4:40 PM
    Hi all, after we move flink job from session-mode to application-mode, we start to see some weird exceptions. Not sure if this is related to how application-mode is loading the flink classes:
    Copy code
    java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
    I am wondering if anyone has seen this before? We are on Flink 1.12
    s
    • 2
    • 4
  • d

    Deryl Rodrigues

    11/30/2022, 6:16 PM
    Hi All facing the following issue when trying to join khafka table with hive table. has anyone encountered this issue ??. For the khafka table, data is in protobuff format.
    Copy code
    INSERT INTO some-table
            SELECT
              *
            FROM (
              SELECT * 
              FROM (
                SELECT fields-from-khakfa-table
                  window_start,
                  window_end,
                  ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY received_timestamp DESC) AS rownum
                FROM TABLE(
                  TUMBLE(TABLE khafkfa-table, DESCRIPTOR(some-column), INTERVAL '10' MINUTES)
                )
              ) WHERE rownum <= 1
            ) AS some-table
            LEFT JOIN some-hive-table fc ON some-table.column1 = some-hive-table.column1
    
    
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'some-table' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(column1 = column10)], select=[all-columns, column1, column10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    m
    • 2
    • 7
  • e

    Eric Xiao

    11/30/2022, 10:37 PM
    Hi there, is there more reading on how processing time works and what happens when a pipeline dies and needs to pick up from where it left off? I have read these two docs: https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#:~:text=Event%20time%3A%20Event%20time%20is,not%20on%20any%20wall%20clocks https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/ But they don't answer my second question, I am drawing the conclusion that you cannot have fault tolerance in a pipeline that uses processing time and it will not pick up from where it died but rather only process net new data when the pipeline is up and ready to consume data again?
    m
    • 2
    • 4
  • v

    Vishal bharatbhai Vanpariya

    12/01/2022, 7:36 AM
    Hi All, i am trying to convert json string to Avro format and writing it to parquet file but its returning a error.
    Copy code
    DataStream<GenericRecord> avrodata = inputData.map(new MapFunction<String, GenericRecord>() {
                @Override
                public GenericRecord map(String s) throws Exception {
    
                    Schema schema = new Schema.Parser().parse(schemastr);
                    DecoderFactory decoderFactory = new DecoderFactory();
                    Decoder decoder = decoderFactory.jsonDecoder(schema,s);
                    DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
                    return reader.read(null,decoder);
                }
            });
    
    Schema schema = new Schema.Parser().parse(schemastr);
            avrodata.sinkTo(FileSink.forBulkFormat(
                    new Path("<s3://bUcket/data/tmp/>"),
                    ParquetAvroWriters.forGenericRecord(schema)
            ).build());
    Error: Caused by: java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
  • r

    Raph

    12/01/2022, 8:16 AM
    Hi all, has anyone tried connecting to a pulsar topic with pyflink? Using this code below and I havent been able to connect to the topic.
    Copy code
    pulsar_source = PulsarSource.builder() \
        .set_service_url(service_url) \
        .set_admin_url(admin_url) \
        .set_start_cursor(StartCursor.latest()) \
        .set_topics("<persistent://topic_name>") \
        .set_config("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationToken") \
        .set_config("pulsar.client.authParams", AUTH_TOKEN) \
        .set_deserialization_schema(
            PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
        .set_subscription_name('flink') \
        .set_subscription_type(SubscriptionType.Shared) \
        .build()
    
        ds = env.from_source(source=pulsar_source,
                        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
                        source_name="pulsar source")
        
        ds.print()
        env.execute()
    Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted.
    d
    • 2
    • 8
  • a

    Amenreet Singh Sodhi

    12/01/2022, 11:10 AM
    Hi everyone! I noticed snakeyaml 1.27 is being used in flink1.16.0, and it has several vulnerabilities. If i build flink 1.16.0 by manually changing to snakeyaml 1.32, would it have any effects on the build. Also, I observed sankeyaml1.27 is being used since flink 1.12.0, why hasnt it been updated yet, any specific reasons for using this version 1.27? Thanks!
    m
    • 2
    • 6
  • j

    Jason Politis

    12/01/2022, 12:36 PM
    Hello everyone. We have a job (sql insert) that would normally use nextval in oracle, essentially an auto incremented number. We have currently settled on the idea of use a hashing algorithm to generate a "unique" number for each record (it's not a requirement for it to be incremental). We are using blake3, converting each alpha character to it's ascii equivalent, and taking the first 18 digits, then converting to a long, all in a java UDF. What are your thoughts on this process and do you have a recommendation for an alternative?
    e
    • 2
    • 6
  • e

    Emmanuel Leroy

    12/01/2022, 1:07 PM
    I’m trying to merge related objects from change logs, like a join. I see there are window joins, but that’s not really what I need as there are no guarantees that the objects I need to merge will be in the same window. I need to accumulate state and merge these objects, so I have been trying to translate each event into a meta event that include all keys, and union the streams then reduce, but I see Union works on Datastreams, and produces a Datastream. Union of keyed streams produces a DataStream (it’s not keyed anymore). Is there an option to union keyed streams into a keyedstream and preserve the key? The key in the metaevent is found is different places; i’d need to extract the key to a common place when creating the meta event if i need to re-key the stream after a union. Is there a trick to merge keyed streams with union?
  • r

    René

    12/01/2022, 4:25 PM
    Hi all, I'm trying to run the following connector:
    Copy code
    CREATE TABLE CVL (EPOCH_ID BIGINT, DESCRIPTION VARCHAR)
    WITH (
        'connector' = 'kafka', 
        'topic' = 'FCM1.SYST034.CVL', 
        'scan.startup.mode' = 'earliest-offset', 
        'properties.bootstrap.servers' = '<http://swwkafe01.suvanet.ch:9092|swwkafe01.suvanet.ch:9092>', 
        'format' = 'avro',
        'properties.security.protocol' = 'SASL_SSL',
        'properties.sasl.mechanism' = 'SCRAM-SHA-512',
        'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";'
    );
    That's ok, but when I do a simple select * from CVL, then I get this error:
    ... Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config at
    <http://org.apache.flink.kafka.shaded.org|org.apache.flink.kafka.shaded.org>.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:116) ...
    How do I need to configure Apache Flink (Ververica Platform) to be able to connect to Kafka? I followed the instructions under https://ververica.zendesk.com/hc/en-us/articles/4416135903634-How-to-Secure-User-Creden[…]ica-Platform-when-Connecting-to-SASL-Secured-Kafka-Cluster, but that didn't solve the problem. Thanks for any help!
    v
    • 2
    • 3
  • m

    Momir Beljic

    12/01/2022, 4:57 PM
    Hi, could you help me with this code. I am trying to perform run some window on subset of row data and do some grouping and then call
    process_udaf
    function to perform some operations and filter the data and based on that return specific Rows. However, no matter what I change I get exception
    org.apache.flink.table.api.ValidationException: Cannot resolve field [MEASUREMENT], input field list:[TERM_ID, F_LOCATION, ADAPTER].
    . Please find the code below. Thank you!
    Copy code
    ds = env.from_collection(
            collection=[
                (Instant.of_epoch_milli(1000), '2022-12-01T17:10:18.191732', '123457', '123456-09', '22.2-2', '12345678', '123456', 'M1', 7, 20, -20, 0, '2', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(2000), '2022-12-01T17:10:20.231437', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 10, 25, -15, 2, '1', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(3000), '2022-12-01T17:10:20.315141', '123459', '123456-09', '22.2-1', '12345679', '123456', 'M1', 20, 29, -3, 3, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(4000), '2022-12-01T17:10:20.389638', '123455', '123456-08', '22.2-1', '12345679', '123456', 'M1', 25, 35, 1, 4, '10', 10, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(5000), '2022-12-01T17:10:20.585687', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 30, 40, -2, 5, '2', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(6000), '2022-12-01T17:10:20.649107', '123457', '123456-06', '22.2-2', '12345678', '123456', 'M1', 4, 45, 4, 6, '10', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(7000), '2022-12-01T17:10:21.040214', '123455', '123456-09', '22.2-1', '12345678', '123456', 'M1', 22, 49, 5, 7, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98')
            ],
            type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT(),Types.FLOAT(), Types.FLOAT(),Types.STRING(), Types.STRING(), <http://Types.INT|Types.INT>(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]))
    
    table = t_env.from_data_stream(
            ds,
            Schema.new_builder()
                  .column_by_expression("proctime", "proctime()")
                  .column("f1", DataTypes.STRING())
                  .column("f2", DataTypes.STRING())
                  .column("f3", DataTypes.STRING())
                  .column("f4", DataTypes.STRING())
                  .column("f5", DataTypes.STRING())
                  .column("f6", DataTypes.STRING())
                  .column("f7", DataTypes.STRING())
                  .column("f8", DataTypes.FLOAT())
                  .column("f9", DataTypes.FLOAT())
                  .column("f10", DataTypes.FLOAT())
                  .column("f11", DataTypes.FLOAT())
                  .column("f12", DataTypes.STRING())
                  .column("f13", <http://DataTypes.INT|DataTypes.INT>())
                  .column("f14", DataTypes.STRING())
                  .column("f15", DataTypes.STRING())
                  .column("f16", DataTypes.STRING())
                  .column("f17", DataTypes.STRING())
                  .column("f18", DataTypes.STRING())
                  .column("f19", DataTypes.STRING())
                  .column("f20", DataTypes.STRING())
                  .column("f21", DataTypes.STRING())
                  .column("f22", DataTypes.STRING())
                  .column("f23", DataTypes.STRING())
                  .column("f24", DataTypes.STRING())
                  .column("f25", DataTypes.STRING())
                  #.watermark("proctime", "proctime - INTERVAL '3' SECOND")
                  .build()
        ).alias("proctime", 'MESSAGE_DATE_TIME', 'ASSET_ID', 'TERM_ID', 'ADAPTER', 'PRODUCT_ID', 'SERIAL', 'F_LOCATION', 'MEASUREMENT', 'USL', 'LSL', 'NOM', 'TESTSTATUS', 'METHOD_COMPUTE', 'CONFIGVALUE_1', 'CONFIGVALUE_2', 'CONFIGVALUE_3', 'CONFIGVALUE_4', 'CONFIGVALUE_5', 'CONFIGVALUE_6', 'CONFIGVALUE_7', 'CONFIGVALUE_8', 'CONFIGVALUE_9', 'CONFIGVALUE_10', 'METHOD_YIELD', 'CONFIGVALUE_1001')
    
     t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                           .schema(Schema.new_builder()
                                   .column("TERM_ID", DataTypes.STRING())
                                   .column("F_LOCATION", DataTypes.STRING())
                                   .column("ADAPTER", DataTypes.STRING())
                                   .column("MEASUREMENT", DataTypes.FLOAT())
                                   .column("USL", DataTypes.FLOAT())
                                   .column("LSL", DataTypes.FLOAT())
                                   .column("NOM", DataTypes.FLOAT())
                                   .build())
                           .build())
    
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("TERM_ID", DataTypes.STRING()), DataTypes.FIELD("F_LOCATION", DataTypes.STRING()), DataTypes.FIELD("ADAPTER", DataTypes.STRING()), DataTypes.FIELD("MEASUREMENT", DataTypes.FLOAT()), DataTypes.FIELD("USL", DataTypes.FLOAT()), DataTypes.FIELD("LSL", DataTypes.FLOAT()), DataTypes.FIELD("NOM", DataTypes.FLOAT())]), func_type="pandas")
        def process_udaf(term_id, f_location, adapter, measurement, usl, lsl, nom, teststatus, method_compute):
            if teststatus == 1 and (method_compute > 0 or method_compute < 101):
                zscore_log = (measurement - measurement.mean()) / (measurement.std())
                dmn_log = measurement.mean() - nom
                cp_log = (usl - lsl) / (6 * measurement.std())
    
                return Row(term_id, f_location, adapter, zscore_log, dmn_log, cp_log)
    
        t_env.create_temporary_function("process_udaf", process_udaf)
    
    table = table.window(
            #here just use 100 as row offset and slide of 100 and perform calculation of counting the rows and then call correct processing udf call based on the row number
            Slide.over("4.rows").every("4.rows").on(col("proctime")).alias("w")) \
            .group_by(col("w"), col("TERM_ID"), col("F_LOCATION"), col("ADAPTER")) \
            .select(process_udaf(col("TERM_ID"), col("F_LOCATION"), col("ADAPTER"), col("MEASUREMENT"), col("USL"), col("LSL"), col("NOM"), col("TESTSTATUS"), col("METHOD_COMPUTE")))
    
    table.execute_insert('sink') \
             .wait()
    d
    • 2
    • 6
  • r

    raghav tandon

    12/01/2022, 8:28 PM
    I have a basic question on ordering of events…. if Source -> KeyBy +process+ Sink (kafka) --- Then from process to Sink operator chaining happens….then can i assume it will always be in order of keyBy? I dont have to do another keyBy before Sink..
    h
    • 2
    • 6
  • a

    Ans Fida

    12/01/2022, 10:26 PM
    Does anybody know if you can create a proctime or rowtime time attribute on a Table ? I have seen some examples for the same in DDL or stream to table conversion but wanted to do it on a Table. I have a following snippet
    Copy code
    Table table = tEnv.sqlQuery(query.getQuery());
    // create a new column 'EventTime' of type Timestamp from 'EventTimetamp' which is a string
    table = table.addColumns($("EventTimestamp").toTimestamp().as("EventTime"));
    WindowGroupedTable windowedTable = table.window(Tumble.over("10.minutes").on($("EventTime").proctime())
        .as("w"))
        .groupBy($("w"), $("GroupingColumn"));
    table = windowedTable.select($("*"));
    but it doesn’t seem to work and results in this exception
    Expected LocalReferenceExpression. Got: EventTime
    m
    • 2
    • 12
  • t

    Tim

    12/01/2022, 10:37 PM
    Hello team, I have a question that similar to this How to convert a Table to a DataStream containing array types post on stack overflow, in a 1.13 UDF we try to give a proper typeHint to the output of the UDF but got exceptions like:
    [java] Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for unregistered table do not match.
    [java] Cause: Incompatible types for sink column 'f0' at position 0.
    [java]
    [java] Query schema: [EXPR$0: ARRAY<ROW<> NOT NULL>]
    [java] Sink schema:  [f0: RAW('java.util.List', ?)]
    The typeinformation for the output type is
    Row(f0: List<Row(f0: String)>)
    and the closest we can get is to define
    output = @DataTypeHint(value = "RAW", bridgedTo = List.class)
    and the exception becomes
    [java] Cause: Incompatible types for sink column 'f0' at position 0.
    [java] Query schema: [EXPR$0: RAW('java.util.List', '...')]
    [java] Sink schema:  [f0: RAW('java.util.List', ?)]
    Any suggestion to resolve this? Thanks!
    m
    • 2
    • 2
  • s

    Steven Zhang

    12/02/2022, 12:17 AM
    are there any known bugs with the
    org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    HA class? I'm trying to upgrade a session cluster using Flink operator and it seems like the configmaps for the job I had running on it gets cleaned up when I update the FlinkDep CRD image field and the Job/Task manager get torn down. From the docs, it seems like this shouldn't be the case and the configmaps should be left behind https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/, but that's not what I'm seeing. It doesn't seem like operator since it looks like the deleteHaData param is set to false in the reconciler.
    Copy code
    private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
            flinkService.deleteClusterDeployment(
                    deployment.getMetadata(), deployment.getStatus(), false);
    ...
    I'm running a session Flink cluster on 1.15.2 deployed in standalone mode
  • j

    Jay Yang

    12/02/2022, 4:27 AM
    Hello team, we have a use case that we want to use Flink to generate dynamic stream from a topic with different event types, and automatically discover new types. We can use Side Output to handle different types when we already know the types. But haven’t figure out a way to do it dynamically. Has anyone know what’s the best way to do this?
  • s

    Sumit Nekar

    12/02/2022, 11:35 AM
    Hello Team, Deploying a job using FlinkDeployment (Upgrade mode: savepoint). Job uses kafka topics as source and sink and it is a stateful job. I wanted to start the job to read from latest offset when the consumer lag of the running job was very high. So I tried to change group.id of the flink consumer and redeployed. I was expecting consumer to read from latest offset with minimal lag but the job seems to have restarted with the offset already stored in the previous savepoint taken during upgrade process. This behaviour is not aligned with the flink native / standalone cluster. Is there any way to redeploy job so that it restarts from latest offset? Note: One way i found was to delete previous checkpoints available in the checkpoint directory.
    r
    g
    • 3
    • 21
  • p

    Prasanth Kothuri

    12/02/2022, 1:31 PM
    Hello All, Please can someone help with setting JsonDeserializationSchema with ScalaObjectMapper, I have the following
    Copy code
    val mapper = new ObjectMapper() with ScalaObjectMapper
        mapper.registerModule(DefaultScalaModule)
        mapper.registerModule(new JavaTimeModule)
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
    
        val scanSource = KafkaSource.builder[scanInput]
          .setProperties(properties)
          .setTopicPattern(Pattern.compile(config.getInputScanTopic))
          .setValueOnlyDeserializer( new JsonDeserializationSchema(classOf[scanInput],mapper))
          .build()
    which has the error
    Copy code
    Type JsonDeserializationSchema takes type parameters
    I think the problem is the type of the mapper I am passing to the constructor is not what is expected (SerializableSupplier<ObjectMapper>) by that method , how to fix this, thanks
  • r

    raghav tandon

    12/02/2022, 1:32 PM
    I am seeing worsened performance when Operator chaining (process -> Sink (kafka)) is happening… Performance id dropped from 15K/sec to 8K/sec.. I always thought chaining would increase performance but now entire job is back pressured bcs of this chaining….Even though with chaining I have avoided any Serde then why is this happening? • We read message from kafka -> maintain state -> generate protobuf -> sink to kafka
    s
    • 2
    • 19
  • m

    Matt Weiss

    12/02/2022, 1:56 PM
    Case: We are using keyBy to partition a stream based off a unique vehicle id. We are then using a tumbling event time window of 24 hours along with an aggregate function to create a daily aggregate for each truck.. Think on the order of 5,000 - 50,000 trucks. At the end of each day, we are using a jdbc sink to save each aggregated result to a MySQL database. Question: How does Flink handle this? At midnight everyday, we will be inserting thousands of rows at once to a db. Is it handled gracefully within Flink and it is something to be concerned about?
  • f

    Felix Angell

    12/02/2022, 4:21 PM
    Has anyone ran into an error like this
    Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @4f9e5d6a
    ? I've started getting this when submitting jobs after bumping my version of Flink from 1.13. to 1.15. Full stacktrace inside this thread
    m
    • 2
    • 9
  • l

    Lily Liu

    12/02/2022, 5:52 PM
    Hi all. I am writing a batch processing pipeline to write files to gcs. However, Flink seems to create a lot of small files in
    .inprogress
    folder on gcs. Where should I start to look to optimise this? Thanks.
    s
    • 2
    • 7
  • a

    Ans Fida

    12/02/2022, 7:42 PM
    Anyone have thoughts about https://apache-flink.slack.com/archives/C03G7LJTS2G/p1669933592232289 ? I’d appreciate any pointers
    m
    • 2
    • 1
  • m

    Marco Villalobos

    12/02/2022, 10:27 PM
    Good afternoon everybody. Across all of our environments, slowly but surely, we have to keep on increasing the akka frame size. We're not even making code changes, but Flink keeps on failing with the error of akka frame size too small. At what point is it too big? Why can't it dynamically size itself?
    s
    d
    • 3
    • 20
1...363738...98Latest