https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Alex Brekken

    04/18/2023, 7:11 PM
    Hi all, I’m trying to troubleshoot an integration test for my Flink pipeline, which is using the
    MiniClusterWithClientResource
    . At a high level, my pipeline does a map operation, then a keyBy, and then a process function immediately after the keyBy. The process function contains an aggregator that puts records into a state store. In my integration test, I’m feeding in 2 records using
    env.fromElements()
    . With parallelism set to 1, the test passes every time. With parallelism set to 2 (or really anything greater than 1), the test will intermittently fail. After debugging, what I’m seeing is that occasionally my process function (which extends KeyedProcessFunction) will get the 2nd test record first, in other words - the input records occasionally arrive out-of-order. (I have logic in the process function that expects these records to arrive in the correct order, which is why the test fails) After arriving at the keyBy function, both test records get re-keyed to the same key. My assumption was that they should get processed in timestamp order by the subsequent process function, even if the record with the higher timestamp arrives first. In my test, I’m using a watermark strategy of boundedOutOfOrderness with a duration of 20 seconds. I’ve checked the elements in the process function by debugging, and they do have timestamps set on each record, and the timestamps are correct. I’m not sure if I’m doing something wrong, or misunderstanding how ordering works. When I deploy this same application “for real” in a Flink cluster (where the source data is actually Kafka), I don’t see this issue at all. I only see the problem in the integration test. Thanks for any ideas or advice!
  • z

    Z Mario

    04/19/2023, 3:30 AM
    Hi guys, I have been trying to submit a sample Pcollection job to a running FlinkDeployment job manager but it keeps failing. I shared this issue yesterday but I guess it wasn’t well understood. On figuring out further I found out that it requires a beam worker pool side cart to run the application without with it fails. The basic FlinkDeployment session cluster yaml format provided in the official documentation is as below.
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      namespace: default
      name: basic-example
    spec:
      image: flink:1.15
      flinkVersion: v1_15
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    How do I add the beam worker pool in this yaml file, or how should I create it? Would greatly appreciate your support!
  • s

    Soumya Ghosh

    04/19/2023, 6:54 AM
    Hello folks, I am using Flink Elasticsearch (
    elasticsearch-7
    ) table connector and exploring the below configurations mentioned in document. • connection.request-timeout • connection.timeout • socket.timeout When I use above connector options in Flink table DDL, I’m getting error that above options are not supported. Is there a specific way to utilize these configs? Error:
    Copy code
    Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'elasticsearch-7'.
    
    Unsupported options:
    
    connection.request-timeout
    connection.timeout
    socket.timeout
    
    Supported options:
    
    connection.path-prefix
    connector
    document-id.key-delimiter
    failure-handler
    format
    hosts
    index
    json.encode.decimal-as-plain-number
    json.fail-on-missing-field
    json.ignore-parse-errors
    json.map-null-key.literal
    json.map-null-key.mode
    json.timestamp-format.standard
    password
    property-version
    sink.bulk-flush.backoff.delay
    sink.bulk-flush.backoff.max-retries
    sink.bulk-flush.backoff.strategy
    sink.bulk-flush.interval
    sink.bulk-flush.max-actions
    sink.bulk-flush.max-size
    sink.flush-on-checkpoint
    username
    	at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:631)
    	at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:923)
    	at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.createDynamicTableSink(Elasticsearch7DynamicSinkFactory.java:93)
    	at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:266)
    	... 18 more
  • a

    Alokh P

    04/19/2023, 8:47 AM
    Hi All, I am trying to read Avro data from Kafka which uses schema registry and write it to s3 in Parquet format. I am facing this issue currently
    Copy code
    Caused by: org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
    	at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
    	at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:420)
    	at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:456)
    As far as I have checked we can avoid this by using AvroParquetWriter and providing the config
    parquet.avro.write-old-list-structure
    as false. Can anybody here help on how this can be achieved ?
    r
    j
    • 3
    • 6
  • f

    Felix Angell

    04/19/2023, 10:33 AM
    does anyone know what the flink git tags mean. how does
    release-1.15.2-rc2
    differ from
    release-1.15
    ? i've noticed that
    rc2
    does not contain the FlinkKinesisConsumer pyflink bindings whereas
    release-1.15
    does.
    m
    d
    +2
    • 5
    • 16
  • t

    Thijs van de Poll

    04/19/2023, 10:57 AM
    Hi all, I have came across a problem with a Postgres JDBC connector. I am not sure, but I don’t think this is expected behavior. I have the following context: • Pipeline source Postgres CDC • Pipeline sink Postgres JDBC If I make a change in the source table, and print the changelog stream:
    Copy code
    -D[1,83819211,545900928]
    +I[-83819211,83819211,545900928]
    This is expected, it should remove the record having primary key 1, and insert the record having primary key -83819211. If I check the database after this change, I see the following:
    Copy code
    -----------+----------------+--------------------
     -83819211 |       83819211 |          545900928
             1 |       83819211 |          545900928
    It suggests that the delete statement has not been executed. Anyone that can help?
    m
    • 2
    • 9
  • p

    Peter Esselius

    04/19/2023, 1:31 PM
    Hi all, I've tried to start using Flink for work multiple times in the last 2 years, but I keep getting stuck on stuff like getting a
    DataStream[RowData]
    from a connector in Scala, but having no idea how to use it. 😕 I enjoyed the rockthejvm-course a lot, so I'm very motivated like to figure this out! Previously I've dabbled in Debezium & kafka as input, but my current input is a delta lake table, the connector doesn't support the table api, so I guess I either need to turn the stream into a
    DataStream[Row]
    to do enable using
    tEnv.fromChangelogstream()
    or figure out how to specify schema with
    tEnv.fromDataStream()
    , but how? Can I avoid specifying schema when the delta lake connector supposedly already knows what the schema is? I also wanted to turn
    DataStream[RowData]
    into
    DataStream[MyCaseClass]
    somehow, hopefully without manually pointing out and casting field-by-field in a
    ds.map()
    I've tried is using
    CaseClassConverter
    and
    RowRowConverter
    , but I couldn't really understand how to use either of them 😖 Am I missing something? Am I failing to progress because I'm trying too hard to avoid retyping the input schema multiple times? The
    DataStream API Integration
    docs mention
    RowData
    ->
    Row
    conversion, but I haven't been able to find any examples 😕 Is using Scala + Case Classes making it worse? I can switch to using the third party scala api implementation if that helps, or switch to java or kotlin
    g
    i
    • 3
    • 4
  • f

    Felix Angell

    04/19/2023, 2:56 PM
    i am a bit confused with what KDA gives out of the box in PyFlink land. trying to continue upgrading an app we have to 1.15.2 and now I get this problem:
    Copy code
    2023-04-19 15:49:21
    java.lang.NoSuchMethodError: 'org.apache.flink.metrics.MetricGroup org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()'
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:765)
    we have a pyflink app that bundles with a fat jar of some extra utilities where we do stuff in java-land, e.g. deserialisation stuff, and so on.
    flink-streaming-java
    is the only dependency in the POM.xml file that is marked as provided. our dependencies are: flink-streaming-java, flink-java, flink-core, flink-connector-kafka, kafka-clients (2.8.1). everything is set to be version 1.15.2 (other than the kafka-clients). any ideas what is up here? thanks!
    m
    d
    h
    • 4
    • 16
  • t

    Thijs van de Poll

    04/19/2023, 3:40 PM
    Hi! I came across a very weird bug in PyFlink, see the following snippet:
    Copy code
    # This works
    table.select(col("parties_id"), col("publication_id"), col("nested_obj"))
    
    # This works
    table.select(col("nested_obj").get("value_from_nested"))
    
    # This does NOT work
    table.select(col("parties_id"), col("publication_id"), col("nested_obj").get("value_from_nested"))
    The last one raises:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream.
    : java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
            at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
            at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
            at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
            at java.base/java.util.Objects.checkIndex(Unknown Source)
            at java.base/java.util.ArrayList.get(Unknown Source)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
            at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
            at org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
            at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
            at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
            at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
            at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
            at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
            at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
            at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
            at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
            at org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
            at org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
            at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
            at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
            at org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
            at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
            at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
            at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
            at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
            at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
            at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
            at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
            at scala.collection.Iterator.foreach(Iterator.scala:937)
            at scala.collection.Iterator.foreach$(Iterator.scala:937)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
            at scala.collection.IterableLike.foreach(IterableLike.scala:70)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
            at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
            at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
            at scala.collection.immutable.Range.foreach(Range.scala:155)
            at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
            at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
            at scala.collection.Iterator.foreach(Iterator.scala:937)
            at scala.collection.Iterator.foreach$(Iterator.scala:937)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
            at scala.collection.IterableLike.foreach(IterableLike.scala:70)
            at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
            at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
            at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
            at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
            at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
            at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
            at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
            at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
            at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
            at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
            at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
            at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
            at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
            at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
            at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
            at java.base/java.lang.reflect.Method.invoke(Unknown Source)
            at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
            at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
            at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
            at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.base/java.lang.Thread.run(Unknown Source)
    Can anyone help?
    d
    • 2
    • 7
  • y

    Yaroslav Bezruchenko

    04/19/2023, 4:42 PM
    Hey, I'm trying PyFlink right now. I'm newbie in Python, and I'm trying to make Flink ProcessFunction
    Copy code
    class AttributePretraining(ProcessFunction):
    
        def process_element(self, value, ctx: 'ProcessFunction.Context'):
            id = value['id']
    
            training_dto = TrainingDTO(id)
    
            return training_dto
    But I got TypeError: AttributePretraining.open() missing 1 required positional argument: 'runtime_context' Any idea why is this happening
    d
    • 2
    • 1
  • k

    Kevin L

    04/19/2023, 6:17 PM
    Hello! I am trying to create a row using the table api with pyflink: just as a test I am doing something simple:
    Copy code
    t_row = t.select(row("key", "value").alias('my_row')
    I expect this schema to be (when I call
    t_row.get_schema()
    )
    Copy code
    root
     |-- my_row : ROW<`key` VARCHAR NOT NULL> NOT NULL
    but what I get is (see screenshot) how to I get the expected schema?
    d
    t
    • 3
    • 4
  • n

    Nathanael England

    04/19/2023, 6:27 PM
    In pyflink, if I wanted to just get back the raw bytes from a data source, what serialization schema should I use? Examples show
    SimpleStringSchema
    but I don't want my bytes stringified.
    👀 1
    d
    • 2
    • 12
  • e

    Eric Xiao

    04/19/2023, 9:05 PM
    Hi, we are using Flink reactive mode and we're switching all
    setParallelism
    calls to
    setMaxParallelism
    , I noticed for sinks (
    DataStreamSink
    ), there is no
    setMaxParallelism
    function. We do not want to use a global parallelism setting to control the sink's parallelism as other operators will have a higher parallelism count. Is there another way we can set a max parallelism on a sink in our pipeline?
    • 1
    • 1
  • e

    Elizaveta Batanina

    04/20/2023, 11:36 AM
    Hi! I am using Flink kubernetes operator. I have deployed a streaming job and I want to check how my table looks like, run some queries using sql client. But when I run sql client inside my Flinkdeployement:
    Copy code
    ./bin/sql-client.sh
    I cannot see tables created but job (job is successfull) and all . Is there a better way to do this?
  • m

    Max Dubinin

    04/20/2023, 3:57 PM
    Hey guys, I’m probably lacking understanding regarding some fundamental Flink functionalities. I’m reading from a Kinesis source (from a TRIM_HORIZON initial position). Every time I cancel and start the same job, I see an X amount of records being pulled at the first checkpoint, while there are no records in the Kinesis shard I’m consuming from. Can someone please help me understand where those records might be coming from? I’m not restoring from a savepoint or anything
    d
    • 2
    • 3
  • վ

    Վահե Քարամյան

    04/20/2023, 4:04 PM
    [already fixed]
  • m

    Max Dubinin

    04/20/2023, 4:53 PM
    Hey guys, I noticed that Flink doesn’t acknowledge processed records from Kinesis. Does anyone know what could be missing? I’m simply taking those events and writing them to local parquet files. Every time I cancel and start the same job, I see those same records again.
    m
    a
    d
    • 4
    • 29
  • a

    Ali Zia

    04/20/2023, 5:12 PM
    Hey folks. I have Flink setup on a single server where I keep losing the task manager and having to restart it. It always happens at the end of executing a batch job. I have multiple streaming jobs and only one batch job. I have been trying to debug to no avail for hours. Have tried setting the following parameters but none seem to help:
    Copy code
    akka.ask.timeout
    akka.lookup.timeout
    akka.tcp.timeout
    heartbeat.interval
    heartbeat.rpc-failure-threshold
    heartbeat.timeout
  • z

    Zachary Schmerber

    04/20/2023, 5:38 PM
    Is there a way to .set this default "org.apache.flink.formats.json.JsonDeserializationSchema" to ignore unknown field values within this code or do I have to make a custom deserializer to make it so? As of now if my "mega source topic"(50+data sources in one topic) has a field that is not recognized the job Flink errors out.
    Copy code
    static void runJob() throws Exception {
            KafkaSource<Message> source =
                    KafkaSource.<Message>builder()
                            .setBootstrapServers(BROKER)
                            .setTopics(TOPIC)
                            .setGroupId("KafkaExactlyOnceRecipeApplication")
                            .setStartingOffsets(
                                    OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                            .setValueOnlyDeserializer(new JsonDeserializationSchema<>(Message.class))
                            .build();
  • r

    Rion Williams

    04/20/2023, 5:42 PM
    While scrolling through the Apache Flink JIRA, I came across this issue that mentions that Hadoop 2.8.3 are no longer supported as of Flink 1.15. I’m in the process of updating the job which relies handling this within the Dockerfile through the use of the shaded uber-jars something like this:
    Copy code
    ENV GCS_CONNECTOR_VERSION="hadoop2-2.1.1"
    ENV FLINK_HADOOP_VERSION="2.8.3-10.0"
    ENV GCS_CONNECTOR_NAME="gcs-connector-${GCS_CONNECTOR_VERSION}.jar"
    ENV GCS_CONNECTOR_URI="$artifactStorage/${GCS_CONNECTOR_NAME}"
    ENV FLINK_HADOOP_JAR_NAME="flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar"
    ENV FLINK_HADOOP_JAR_URI="$artifactStorage/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}"
    I was hoping to upgrade this to a more recent version but I’m struggling to find the latest shaded plugin. I see these Flink Shaded Hadoop 3 Uber ones within Maven Central, but I’m not entirely sure what version of the connector or Hadoop that those would correspond to. Any advice? Just trying to upgrade this to the latest supported version as of 1.15 while I’m already touching the job and related things about the deployment.
    m
    • 2
    • 17
  • z

    Zachary Piazza

    04/20/2023, 8:46 PM
    Howdy again. I am trying to configure the Prometheus integration for Flink. Using the operator, I have defined the following properties in my config. I have also ensured the Prometheus JAR was included in the base image:
    flinkConfiguration:
    collect-sink.batch-size.max: "8388608"
    taskmanager.numberOfTaskSlots: "2"
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: "9999"
    Do I need to use the podTemplate attribute of the FlinkDeployment spec to add the additional port mapping? EDIT I figured it out. The config properties mentioned in the docs were incorrect. And I added an additional container port using the podTemplate param in the spec.
    g
    r
    +2
    • 5
    • 6
  • a

    Amenreet Singh Sodhi

    04/21/2023, 5:38 AM
    Hey Team! If I have a custom appender made on top of logback, how can i use it in flink to store logs. What configuration changes would i require to do? Any wiki/link for the same? Thanks
  • u

    苏超腾

    04/21/2023, 7:36 AM
    Hey Team! I have problems to understand the Flink HA in Flink Operator, the Application-mode High Availability of JobManager will restart the job when leader goes down and switch to the standby node?
    m
    • 2
    • 4
  • u

    张奇文

    04/21/2023, 7:44 AM
    Hello everyone! I have a flink sql task with data skew, and the upstream is the kafka data source. For some reasons, the upstream occasionally has too much data in a certain partition. There is any way to solve or improve this skew phenomenon in flink sql?
    g
    • 2
    • 2
  • m

    Maryam

    04/21/2023, 3:41 PM
    Does the interval join create a window for every key in the predicate or does it create windows and then find key matches? I am using FlinkSQL if it matters.
    • 1
    • 1
  • u

    Urs Schoenenberger

    04/21/2023, 3:57 PM
    Hi - We came across some (to us) unexpected behaviour in the Python
    CountTumblingWindowAssigner
    recently - it is backed by a non-purging
    CountTrigger
    and therefore produces a memory leak unless one replaces the trigger by a purging one using
    .trigger()
    . Are we missing something? Is there a usecase where one can use
    CountTumblingWindowAssigner
    with its current default trigger? I feel the default should be FIRE_AND_PURGE here, but wanted to ask before I file a JIRA.
    d
    • 2
    • 1
  • d

    Daniel Craig

    04/21/2023, 4:05 PM
    I see
    Kryo serializer scala extensions are not available
    in my flink application's logs, should I be concerned about this?
  • f

    Felix Angell

    04/21/2023, 4:39 PM
    hey 👋 i have a few questions, with watermark alignment it is only available on FLIP-27 sources... does this include FlinkKafkaConsumer and FlinkKinesisConsumer?
    It does not work for legacy or if applied after the source via DataStream#assignTimestampsAndWatermarks.
    is another thing noted. does this mean that if i specify a watermarkstrategy via
    assign_timestamps_and_watermarks
    it will not let me use watermark alignment? does this mean i need a watermark strategy per source? since we are technically re-using the strategy and then passing it in e.g.
    assign_timestamps_and_watermarks(watermark_strategy)
    last question. is there any rule of thumb for how many seconds i should set my max_drift + watermark_interval? the docs specify 20 seconds and 1 second respectively. we have events that can be as late as 10-30 minutes. would this mean we are throttling sources for that long? thanks!
    m
    s
    j
    • 4
    • 8
  • z

    Zachary Piazza

    04/22/2023, 3:46 PM
    Is there a way to define variables that can be references in Flink SQL? Specifically on the table properties so that username/passwords don't need to be specified in plaintext?
    m
    h
    • 3
    • 3
  • r

    RICHARD JOY

    04/23/2023, 2:38 AM
    Hi Team 👋 , I’ve few questions around HA while using flink operator 1.4.0. On my FlinkDeployment, upgradeMode is stateless. This is because I don’t have a distributed storage option available yet for checkpoint/savepoint. So for the same reason I don’t have jobmanager HA configured. From the doc, it’s a requirement to provide high-availability.storageDir for Jobmanager HA. Can HA work without a distributed storage? Now I’ve deployed the FlinkDeployment streaming job with single jobmanager and two taskmanagers. The job source from kafka topic , do processing in flink , sink to another kafka topic. To test the reliability of the taskmanagers, I killed one of the taskmanager and the whole 1 jobmanager + 2 taskmanagers were shutdown after a new one pod been tried to spin up, resulting in Application Status: FAILED with “_Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy”_. Any pointers of what might be the cause? Ideally I’m looking for JM and TM to be highly available without a distributed storage while deployed jobs are stateless. Thanks for your time!
    g
    • 2
    • 2
1...747576...98Latest