• v

    Vishal bharatbhai Vanpariya

    12/01/2022, 7:36 AM
    Hi All, i am trying to convert json string to Avro format and writing it to parquet file but its returning a error.
    DataStream<GenericRecord> avrodata = MapFunction<String, GenericRecord>() {
                public GenericRecord map(String s) throws Exception {
                    Schema schema = new Schema.Parser().parse(schemastr);
                    DecoderFactory decoderFactory = new DecoderFactory();
                    Decoder decoder = decoderFactory.jsonDecoder(schema,s);
                    DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
    Schema schema = new Schema.Parser().parse(schemastr);
                    new Path("<s3://bUcket/data/tmp/>"),
    Error: Caused by: java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
  • r


    12/01/2022, 8:16 AM
    Hi all, has anyone tried connecting to a pulsar topic with pyflink? Using this code below and I havent been able to connect to the topic.
    pulsar_source = PulsarSource.builder() \
        .set_service_url(service_url) \
        .set_admin_url(admin_url) \
        .set_start_cursor(StartCursor.latest()) \
        .set_topics("<persistent://topic_name>") \
        .set_config("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationToken") \
        .set_config("pulsar.client.authParams", AUTH_TOKEN) \
            PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
        .set_subscription_name('flink') \
        .set_subscription_type(SubscriptionType.Shared) \
        ds = env.from_source(source=pulsar_source,
                        source_name="pulsar source")
    Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted.
  • a

    Amenreet Singh Sodhi

    12/01/2022, 11:10 AM
    Hi everyone! I noticed snakeyaml 1.27 is being used in flink1.16.0, and it has several vulnerabilities. If i build flink 1.16.0 by manually changing to snakeyaml 1.32, would it have any effects on the build. Also, I observed sankeyaml1.27 is being used since flink 1.12.0, why hasnt it been updated yet, any specific reasons for using this version 1.27? Thanks!
  • j

    Jason Politis

    12/01/2022, 12:36 PM
    Hello everyone. We have a job (sql insert) that would normally use nextval in oracle, essentially an auto incremented number. We have currently settled on the idea of use a hashing algorithm to generate a "unique" number for each record (it's not a requirement for it to be incremental). We are using blake3, converting each alpha character to it's ascii equivalent, and taking the first 18 digits, then converting to a long, all in a java UDF. What are your thoughts on this process and do you have a recommendation for an alternative?
  • e

    Emmanuel Leroy

    12/01/2022, 1:07 PM
    I’m trying to merge related objects from change logs, like a join. I see there are window joins, but that’s not really what I need as there are no guarantees that the objects I need to merge will be in the same window. I need to accumulate state and merge these objects, so I have been trying to translate each event into a meta event that include all keys, and union the streams then reduce, but I see Union works on Datastreams, and produces a Datastream. Union of keyed streams produces a DataStream (it’s not keyed anymore). Is there an option to union keyed streams into a keyedstream and preserve the key? The key in the metaevent is found is different places; i’d need to extract the key to a common place when creating the meta event if i need to re-key the stream after a union. Is there a trick to merge keyed streams with union?
  • René


    12/01/2022, 4:25 PM
    Hi all, I'm trying to run the following connector:
    WITH (
        'connector' = 'kafka', 
        'topic' = 'FCM1.SYST034.CVL', 
        'scan.startup.mode' = 'earliest-offset', 
        'properties.bootstrap.servers' = '<|>', 
        'format' = 'avro',
        '' = 'SASL_SSL',
        'properties.sasl.mechanism' = 'SCRAM-SHA-512',
        'properties.sasl.jaas.config' = ' required username=\"user\" password=\"password\";'
    That's ok, but when I do a simple select * from CVL, then I get this error:
    ... Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config at
    <|> ...
    How do I need to configure Apache Flink (Ververica Platform) to be able to connect to Kafka? I followed the instructions under[…]ica-Platform-when-Connecting-to-SASL-Secured-Kafka-Cluster, but that didn't solve the problem. Thanks for any help!
  • m

    Momir Beljic

    12/01/2022, 4:57 PM
    Hi, could you help me with this code. I am trying to perform run some window on subset of row data and do some grouping and then call
    function to perform some operations and filter the data and based on that return specific Rows. However, no matter what I change I get exception
    org.apache.flink.table.api.ValidationException: Cannot resolve field [MEASUREMENT], input field list:[TERM_ID, F_LOCATION, ADAPTER].
    . Please find the code below. Thank you!
    ds = env.from_collection(
                (Instant.of_epoch_milli(1000), '2022-12-01T17:10:18.191732', '123457', '123456-09', '22.2-2', '12345678', '123456', 'M1', 7, 20, -20, 0, '2', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(2000), '2022-12-01T17:10:20.231437', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 10, 25, -15, 2, '1', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(3000), '2022-12-01T17:10:20.315141', '123459', '123456-09', '22.2-1', '12345679', '123456', 'M1', 20, 29, -3, 3, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(4000), '2022-12-01T17:10:20.389638', '123455', '123456-08', '22.2-1', '12345679', '123456', 'M1', 25, 35, 1, 4, '10', 10, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(5000), '2022-12-01T17:10:20.585687', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 30, 40, -2, 5, '2', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(6000), '2022-12-01T17:10:20.649107', '123457', '123456-06', '22.2-2', '12345678', '123456', 'M1', 4, 45, 4, 6, '10', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
                (Instant.of_epoch_milli(7000), '2022-12-01T17:10:21.040214', '123455', '123456-09', '22.2-1', '12345678', '123456', 'M1', 22, 49, 5, 7, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98')
            type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT(),Types.FLOAT(), Types.FLOAT(),Types.STRING(), Types.STRING(), <http://Types.INT|Types.INT>(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]))
    table = t_env.from_data_stream(
                  .column_by_expression("proctime", "proctime()")
                  .column("f1", DataTypes.STRING())
                  .column("f2", DataTypes.STRING())
                  .column("f3", DataTypes.STRING())
                  .column("f4", DataTypes.STRING())
                  .column("f5", DataTypes.STRING())
                  .column("f6", DataTypes.STRING())
                  .column("f7", DataTypes.STRING())
                  .column("f8", DataTypes.FLOAT())
                  .column("f9", DataTypes.FLOAT())
                  .column("f10", DataTypes.FLOAT())
                  .column("f11", DataTypes.FLOAT())
                  .column("f12", DataTypes.STRING())
                  .column("f13", <http://DataTypes.INT|DataTypes.INT>())
                  .column("f14", DataTypes.STRING())
                  .column("f15", DataTypes.STRING())
                  .column("f16", DataTypes.STRING())
                  .column("f17", DataTypes.STRING())
                  .column("f18", DataTypes.STRING())
                  .column("f19", DataTypes.STRING())
                  .column("f20", DataTypes.STRING())
                  .column("f21", DataTypes.STRING())
                  .column("f22", DataTypes.STRING())
                  .column("f23", DataTypes.STRING())
                  .column("f24", DataTypes.STRING())
                  .column("f25", DataTypes.STRING())
                  #.watermark("proctime", "proctime - INTERVAL '3' SECOND")
                                   .column("TERM_ID", DataTypes.STRING())
                                   .column("F_LOCATION", DataTypes.STRING())
                                   .column("ADAPTER", DataTypes.STRING())
                                   .column("MEASUREMENT", DataTypes.FLOAT())
                                   .column("USL", DataTypes.FLOAT())
                                   .column("LSL", DataTypes.FLOAT())
                                   .column("NOM", DataTypes.FLOAT())
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("TERM_ID", DataTypes.STRING()), DataTypes.FIELD("F_LOCATION", DataTypes.STRING()), DataTypes.FIELD("ADAPTER", DataTypes.STRING()), DataTypes.FIELD("MEASUREMENT", DataTypes.FLOAT()), DataTypes.FIELD("USL", DataTypes.FLOAT()), DataTypes.FIELD("LSL", DataTypes.FLOAT()), DataTypes.FIELD("NOM", DataTypes.FLOAT())]), func_type="pandas")
        def process_udaf(term_id, f_location, adapter, measurement, usl, lsl, nom, teststatus, method_compute):
            if teststatus == 1 and (method_compute > 0 or method_compute < 101):
                zscore_log = (measurement - measurement.mean()) / (measurement.std())
                dmn_log = measurement.mean() - nom
                cp_log = (usl - lsl) / (6 * measurement.std())
                return Row(term_id, f_location, adapter, zscore_log, dmn_log, cp_log)
        t_env.create_temporary_function("process_udaf", process_udaf)
    table = table.window(
            #here just use 100 as row offset and slide of 100 and perform calculation of counting the rows and then call correct processing udf call based on the row number
            Slide.over("4.rows").every("4.rows").on(col("proctime")).alias("w")) \
            .group_by(col("w"), col("TERM_ID"), col("F_LOCATION"), col("ADAPTER")) \
            .select(process_udaf(col("TERM_ID"), col("F_LOCATION"), col("ADAPTER"), col("MEASUREMENT"), col("USL"), col("LSL"), col("NOM"), col("TESTSTATUS"), col("METHOD_COMPUTE")))
    table.execute_insert('sink') \
  • r

    raghav tandon

    12/01/2022, 8:28 PM
    I have a basic question on ordering of events…. if Source -> KeyBy +process+ Sink (kafka) --- Then from process to Sink operator chaining happens….then can i assume it will always be in order of keyBy? I dont have to do another keyBy before Sink..
  • a

    Ans Fida

    12/01/2022, 10:26 PM
    Does anybody know if you can create a proctime or rowtime time attribute on a Table ? I have seen some examples for the same in DDL or stream to table conversion but wanted to do it on a Table. I have a following snippet
    Table table = tEnv.sqlQuery(query.getQuery());
    // create a new column 'EventTime' of type Timestamp from 'EventTimetamp' which is a string
    table = table.addColumns($("EventTimestamp").toTimestamp().as("EventTime"));
    WindowGroupedTable windowedTable = table.window(Tumble.over("10.minutes").on($("EventTime").proctime())
        .groupBy($("w"), $("GroupingColumn"));
    table =$("*"));
    but it doesn’t seem to work and results in this exception
    Expected LocalReferenceExpression. Got: EventTime
  • t


    12/01/2022, 10:37 PM
    Hello team, I have a question that similar to this How to convert a Table to a DataStream containing array types post on stack overflow, in a 1.13 UDF we try to give a proper typeHint to the output of the UDF but got exceptions like:
    [java] Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for unregistered table do not match.
    [java] Cause: Incompatible types for sink column 'f0' at position 0.
    [java] Query schema: [EXPR$0: ARRAY<ROW<> NOT NULL>]
    [java] Sink schema:  [f0: RAW('java.util.List', ?)]
    The typeinformation for the output type is
    Row(f0: List<Row(f0: String)>)
    and the closest we can get is to define
    output = @DataTypeHint(value = "RAW", bridgedTo = List.class)
    and the exception becomes
    [java] Cause: Incompatible types for sink column 'f0' at position 0.
    [java] Query schema: [EXPR$0: RAW('java.util.List', '...')]
    [java] Sink schema:  [f0: RAW('java.util.List', ?)]
    Any suggestion to resolve this? Thanks!
  • s

    Steven Zhang

    12/02/2022, 12:17 AM
    are there any known bugs with the
    HA class? I'm trying to upgrade a session cluster using Flink operator and it seems like the configmaps for the job I had running on it gets cleaned up when I update the FlinkDep CRD image field and the Job/Task manager get torn down. From the docs, it seems like this shouldn't be the case and the configmaps should be left behind, but that's not what I'm seeing. It doesn't seem like operator since it looks like the deleteHaData param is set to false in the reconciler.
    private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
                    deployment.getMetadata(), deployment.getStatus(), false);
    I'm running a session Flink cluster on 1.15.2 deployed in standalone mode
  • j

    Jay Yang

    12/02/2022, 4:27 AM
    Hello team, we have a use case that we want to use Flink to generate dynamic stream from a topic with different event types, and automatically discover new types. We can use Side Output to handle different types when we already know the types. But haven’t figure out a way to do it dynamically. Has anyone know what’s the best way to do this?
  • s

    Sumit Nekar

    12/02/2022, 11:35 AM
    Hello Team, Deploying a job using FlinkDeployment (Upgrade mode: savepoint). Job uses kafka topics as source and sink and it is a stateful job. I wanted to start the job to read from latest offset when the consumer lag of the running job was very high. So I tried to change of the flink consumer and redeployed. I was expecting consumer to read from latest offset with minimal lag but the job seems to have restarted with the offset already stored in the previous savepoint taken during upgrade process. This behaviour is not aligned with the flink native / standalone cluster. Is there any way to redeploy job so that it restarts from latest offset? Note: One way i found was to delete previous checkpoints available in the checkpoint directory.
  • p

    Prasanth Kothuri

    12/02/2022, 1:31 PM
    Hello All, Please can someone help with setting JsonDeserializationSchema with ScalaObjectMapper, I have the following
    val mapper = new ObjectMapper() with ScalaObjectMapper
        mapper.registerModule(new JavaTimeModule)
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
        val scanSource = KafkaSource.builder[scanInput]
          .setValueOnlyDeserializer( new JsonDeserializationSchema(classOf[scanInput],mapper))
    which has the error
    Type JsonDeserializationSchema takes type parameters
    I think the problem is the type of the mapper I am passing to the constructor is not what is expected (SerializableSupplier<ObjectMapper>) by that method , how to fix this, thanks
  • r

    raghav tandon

    12/02/2022, 1:32 PM
    I am seeing worsened performance when Operator chaining (process -> Sink (kafka)) is happening… Performance id dropped from 15K/sec to 8K/sec.. I always thought chaining would increase performance but now entire job is back pressured bcs of this chaining….Even though with chaining I have avoided any Serde then why is this happening? • We read message from kafka -> maintain state -> generate protobuf -> sink to kafka
  • m

    Matt Weiss

    12/02/2022, 1:56 PM
    Case: We are using keyBy to partition a stream based off a unique vehicle id. We are then using a tumbling event time window of 24 hours along with an aggregate function to create a daily aggregate for each truck.. Think on the order of 5,000 - 50,000 trucks. At the end of each day, we are using a jdbc sink to save each aggregated result to a MySQL database. Question: How does Flink handle this? At midnight everyday, we will be inserting thousands of rows at once to a db. Is it handled gracefully within Flink and it is something to be concerned about?
  • Felix Angell

    Felix Angell

    12/02/2022, 4:21 PM
    Has anyone ran into an error like this
    Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @4f9e5d6a
    ? I've started getting this when submitting jobs after bumping my version of Flink from 1.13. to 1.15. Full stacktrace inside this thread
  • l

    Lily Liu

    12/02/2022, 5:52 PM
    Hi all. I am writing a batch processing pipeline to write files to gcs. However, Flink seems to create a lot of small files in
    folder on gcs. Where should I start to look to optimise this? Thanks.
  • a

    Ans Fida

    12/02/2022, 7:42 PM
    Anyone have thoughts about ? I’d appreciate any pointers
  • m

    Marco Villalobos

    12/02/2022, 10:27 PM
    Good afternoon everybody. Across all of our environments, slowly but surely, we have to keep on increasing the akka frame size. We're not even making code changes, but Flink keeps on failing with the error of akka frame size too small. At what point is it too big? Why can't it dynamically size itself?
  • c

    Chris Ro

    12/02/2022, 11:25 PM
    what causes some subtasks to not send/process events? my incoming records are randomly partitioned and eventually go to a keyBy, but i’m seeing subtasks with 0 records sent (unused?) in the flink dashboard. is there a common reason for this?
  • e

    Emmanuel Leroy

    12/03/2022, 2:00 AM
    what happens when using a reduce function after a window, when there is only 1 value in the window? does the single value get passed to the next task as is? I’m trying to mutate an object on reduce to merge / move fields, but sometimes my object seem to be passed as is, and it seems to be when there is only 1 item in the window. Does that make sense? If so what would be the way to edit field in a reduce and insure all items get processed?
  • Sandeep Kongathi

    Sandeep Kongathi

    12/03/2022, 5:24 PM
    Hi All, I am trying to experiment a few things in Flink-SQL with Source
    CREATE TABLE orders (
        order_uid  BIGINT,
        product_id BIGINT,
        price      DECIMAL(32, 2),
        order_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'datagen'
    CREATE TABLE orders_kafka (
        order_uid  BIGINT,
        product_id BIGINT,
        price      DECIMAL(32, 2),
        order_time TIMESTAMP(3),
        PRIMARY KEY (`order_uid`) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'properties.bootstrap.servers' = 'redpanda:29092',
      'topic' = 'orders',
      'sink.parallelism' = '2',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.json.ignore-parse-errors' = 'true',
      '' = 'false',
      'value.fields-include' = 'EXCEPT_KEY'
    Finally when I insert the data with
    INSERT INTO orders_kafka
    SELECT *
    FROM orders;
    I am getting below error
  • r

    raghav tandon

    12/03/2022, 8:33 PM
    I want to change Sink parallelism so that Kafka push rate can be higher… But thr doesnt seem to be any way out rather than setting
    but this increases of other operators as well… And i am not able to set parallelism at sink operator level
    Operator org.apache.flink.streaming.api.datastream.KeyedStream@76828577 cannot set the maximumparalllelism
    Pipeline writes to 3 different sinks using SideOuput…. Pls suggest if there is a way out…
  • Jirawech Siwawut

    Jirawech Siwawut

    12/04/2022, 1:41 AM
    Hi. I am trying to use temporal join to enrich two datastream, perform hop window aggregation and join these two streams eventually. Please refer to picture. Lets assume that i use Flink sql in window join step

    from agg1
    left join agg2
    ON agg1.window_start = agg2.window_start
    AND agg1.window_end = agg2.window_end
    The output is weird where there is always
    value from agg2. It only works for some window at the beginning and start to produce null afterwards. I already try to separate
    , and found that they both product output for the same window and key. Does anyone here experience the same behavior?
  • s

    sharad mishra

    12/04/2022, 2:29 AM
    Hello 👋, I am using flink(1.16) on yarn with Kafka(3.2.3) as source for reading data. Flow of application is Kafka(source topic) -> flink keyed stream -> Flink Kafka sink -> Kafka(target Topic) I noticed a huge difference in flink operators initialization time, when I change my DeliveryGuarantee from AT_LEAST_ONCE to EXACTLY_ONCE in flink kafka sink operator Initialization time with AT_LEAST_ONCE -> 1 mins Initialization time with EXACTLY_ONCE -> 15 mins Is that expected ? is there a way to reduce Initialization time with EXACTLY_ONCE for flink kafka sink operator ? This is how my kafka sink looks like
    val serializer = AvroSerializationSchema.forSpecific(classOf[DCNPOJORecord])
        val kafkaRecordSerializationSchema = KafkaRecordSerializationSchema.builder()
        val sink: KafkaSink[DCNPOJORecord] = KafkaSink.builder()
          .setProperty("<|>", transactionMaxTimeoutMs)
          .setProperty("<|>", transactionTimeoutMs)
  • m

    Marco Villalobos

    12/04/2022, 4:07 AM
    is there a way, within a keyed process function, to detect idleness? Kafka is my source. I use event time processing. I have hundreds of thousands unique keys. And if an element with a given key does not arrive within a window, I need to collect its previous value. That works very well when there is traffic on Kafka. But if the source receives no data, I then use processing time to detect idleness. That works very well if the Flink job is always running. However, if the Flink job goes down for a few days, there are race conditions between the processing time timers and event time timers creating havoc upon the sink. Is there another way to check that the watermark is not advancing, like when was the last time the watermarked changed in a global manner, shared across hundreds of thousands of keys?
  • s

    Sumit Nekar

    12/04/2022, 7:49 AM
    Hello, I am trying to tune memory configuration for my flink job deployed using FlinkOperator. Following are the memory settings I am using. I am configuring only Total memory as mentioned in this doc.
    taskmanager.memory.process.size: "8000m" "500m"
    taskmanager.memory.jvm-metaspace.size: "250m"
    When the job starts processing, the metrics show that flink_taskmanager_Status_Flink_Memory_Managed_Used is always ZERO. where flink_taskmanager_Status_Flink_Memory_Managed_Total is set to 5G Is this configuration fine or should I need to configure either configured explicitly via