https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Adrian Chang

    11/11/2022, 4:47 PM
    Hi. I would like to write some unit tests for a stateful
    FlatMapFunction
    in Python. Is there any examples available ? I didn't find much information in the docs. Thanks
    x
    • 2
    • 1
  • r

    Rommel

    11/11/2022, 6:23 PM
    I wantto use flink kubernetes operator to make deployment, so I wrote a FlinkDeployment yaml, when i try to make a deployment, here is the
    Error: unable to build kubernetes objects from release manifest: error validating "": error validating data: ValidationError(FlinkDeployment.spec): unknown field "mode" in org.apache.flink.v1beta1.FlinkDeployment.spec
    but it is clearly listed as here as the last row with the name mode. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/reference/#flinkdeploymentspec Can anyone letting me know why i ran into unknown field “mode”? I am using flink-kubernetes-operator version 1.2
    j
    s
    • 3
    • 15
  • m

    Matt Fysh

    11/12/2022, 2:09 AM
    Any pointers on how to debug
    Key group 91 is not in KeyGroupRange{startKeyGroup=48, endKeyGroup=63}.
    I’m using a KeyedProcessFunction, the stream is keyed by a POJO (2 strings and a JsonNode)
    • 1
    • 1
  • j

    Jason Politis

    11/12/2022, 4:32 AM
    Hey everyone. I'm testing a few things in flink on my local machine in a docker container, I have a folder mounted where I drop a file full of CREATE sql commands. If i run sql-client with -i option using flink 1.14.x, it seems to work just fine. The cli runs, i'm in flink-sql, i can query data and show tables/jars, and all that good stuff. But if I try to use 1.15.1 or 1.15.2 to run sql-client -i init.sql, It fails. The log states that there's an Arithmetic exception, can't divide by 0.
    Copy code
    org.apache.flink.table.client.SqlClientException: Could not read from command line.
    at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:321) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.cli.CliClient.executeFile(CliClient.java:354) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.cli.CliClient.executeInitialization(CliClient.java:248) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:135) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) [flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) [flink-sql-client-1.15.1.jar:1.15.1]
    Caused by: java.lang.ArithmeticException: / by zero
    at org.jline.reader.impl.LineReaderImpl.toColumns(LineReaderImpl.java:4749) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.toColumns(LineReaderImpl.java:4724) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.computePost(LineReaderImpl.java:4689) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.computePost(LineReaderImpl.java:4642) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.doList(LineReaderImpl.java:4531) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.doComplete(LineReaderImpl.java:4149) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.expandOrComplete(LineReaderImpl.java:3875) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:585) ~[flink-sql-client-1.15.1.jar:1.15.1]
    at org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:296) ~[flink-sql-client-1.15.1.jar:1.15.1]
    ... 6 more
    Has anyone else encountered this? I can't seem to find anything online about this. I've thought that maybe it's the code or the size of the file, so I've emptied the file and left a single create table, but still fails with arithmetic exception. I've tried an empty file, but that worked. I then tried a bogus query:
    Copy code
    CREATE VIEW TEST AS SELECT * FROM system;
    And that failed as expected, object system not found. Thank you.
    m
    • 2
    • 17
  • m

    Matt Fysh

    11/12/2022, 5:57 AM
    Copy code
    env.setStateBackend(new HashMapStateBackend());
    env.enableCheckpointing(3000);
    env.getCheckpointConfig().setCheckpointStorage("file:///tmp/cp");
    When I restart my application, all my keyed operators lose their start. I can see files are written to disk, just wondering why they are not restored upon restart? My key is a string
    s
    • 2
    • 8
  • t

    Tawfik Yasser

    11/12/2022, 6:49 AM
    Hello, Are there any resources that could help me understand how a watermark is being emitted internally starting from the strategy going to the operator who is responsible for assigning timestamps and emitting watermarks using the WatermarkEmitter? Thanks in advance
  • v

    Victor Costa

    11/12/2022, 1:35 PM
    Any python examples on how to transform a consumed avro message into a new one to be sinked to a destination pulsar topic? Details in the 🧵
    u
    • 2
    • 21
  • r

    raghav tandon

    11/13/2022, 7:30 AM
    Hi Everyone, I am stuck in a situation here, 1 .Took a save point lets say X 2. Change setting - transaction.timeout.ms = //Some long number 3. Got an exception in JM -
    org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by <http://transaction.max.timeout.ms|transaction.max.timeout.ms>)
    Now, i want to change this config and restart job with same savepoint(X) since i have a stateful job, i cannot restart without save point or change zk id. Please help around what should be the strategy here. I was hoping that if no checkpoint is triggered after savepoint, atleast if i fix the property then job should start normally, but it is not happening it is going into a restart loop and i am unable to take a savepoint now.
    m
    • 2
    • 3
  • a

    Amir Halatzi

    11/13/2022, 4:19 PM
    Hey all! We’ve been running Flink on K8s for the past few weeks, and have noticed increase in network-related issues lately, like the TaskManager t/o to register. Did anyone else encountered such issues? Are there any tweaks I should consider when running in K8s? Thanks!
  • s

    shmily

    11/14/2022, 2:06 AM
    Hi All, When i use dbeaver to connect to flink sql through sql gateway,the log file reports that "Missing version in readMessageBegin, old client?", what should i do?
    s
    • 2
    • 5
  • r

    Rishabh Kedia

    11/14/2022, 2:54 AM
    Im trying to run Apache Beam code with Flink Runner, but its keeps failing with the error as below. This same code works fine with Google Dataflow. Any pointers?
    Copy code
    Caused by: java.lang.ClassCastException: class org.apache.beam.sdk.transforms.windowing.GlobalWindow cannot be cast to class org.apache.beam.sdk.transforms.windowing.IntervalWindow (org.apache.beam.sdk.transforms.windowing.GlobalWindow and org.apache.beam.sdk.transforms.windowing.IntervalWindow are in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @4364b27c)
    The sql is using very basic windowing like :
    Copy code
    SELECT customer_id, sum(bytes_sent) AS bytes_sent, cast(TUMBLE_START(str_to_date(log_time, 'yyyy-MM-dd HH:mm:ss.SSSSSS'),  INTERVAL '2' MINUTE) AS varchar) window_start_time FROM myTable GROUP BY customer_id, TUMBLE(str_to_date(log_time, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), INTERVAL '2' MINUTE)
    m
    • 2
    • 1
  • a

    Aqib Mehmood

    11/14/2022, 6:28 AM
    Hi All, Our flink streaming job ran for 75 days. But then it gave this error
    Copy code
    Caused by: org.apache.flink.util.SerializedThrowable: Header size exceeded max allowed size (10240)
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.connectionError(Http2Exception.java:103) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil.headerListSizeExceeded(Http2CodecUtil.java:245) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.headerSizeExceeded(DefaultHttp2FrameReader.java:694) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$HeadersBlockBuilder.addFragment(DefaultHttp2FrameReader.java:710) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$2.processFragment(DefaultHttp2FrameReader.java:481) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:491) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1486) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1282) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:498) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:437) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[?:?]
    	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[?:?]
    	at java.lang.Thread.run(Unknown Source) ~[?:?]
    Anyone might know the reason of this? TIA
  • c

    czchen

    11/14/2022, 8:19 AM
    We have the following error when upgrading from Flink
    1.15.2
    to
    1.16.0
    (with flink-operator
    1.2.0
    ). According to https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/metrics/#system-resources, we need to add
    com.github.oshi:oshi-core:6.1.5
    to
    lib
    directory. We wonder, if this behavior intentional? This is not mentioned in https://flink.apache.org/news/2022/10/28/1.16-announcement.html.
    Copy code
    2022-11-14 07:05:29,205 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Could not start cluster entrypoint KubernetesApplicationClusterEntrypoint.
            org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:255) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) [flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) [flink-dist-1.16.0.jar:1.16.0]
            Caused by: java.lang.NoSuchMethodError: 'java.util.List oshi.hardware.HardwareAbstractionLayer.getNetworkIFs()'
                    at org.apache.flink.runtime.metrics.util.SystemResourcesCounter.<init>(SystemResourcesCounter.java:92) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics(SystemResourcesMetricsInitializer.java:40) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.metrics.util.MetricUtils.lambda$instantiateProcessMetricGroup$0(MetricUtils.java:98) ~[flink-dist-1.16.0.jar:1.16.0]
                    at java.util.Optional.ifPresent(Unknown Source) ~[?:?]
                    at org.apache.flink.runtime.metrics.util.MetricUtils.instantiateProcessMetricGroup(MetricUtils.java:97) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:410) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-dist-1.16.0.jar:1.16.0]
                    at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) ~[flink-dist-1.16.0.jar:1.16.0]
                    ... 2 more
    m
    • 2
    • 8
  • n

    Nithin Kumar Vokkarla

    11/14/2022, 11:42 AM
    👋 Hello, team! upgraded the k8s version to (1.22) this does not support the ingress apiverion of extension/v1beta1. When trying to deploying flinkapplication in that version(of k8s) job-manager is not getting initialised. When described flinkapplication goth this warning in the events
    Copy code
    Events:
      Type     Reason               Age                 From              Message
      ----     ------               ----                ----              -------
      Warning  CreateClusterFailed  51m (x23 over 62m)  flinkK8sOperator  Failed to create job managers for deploy 28c7ad99: no matches for kind "Ingress" in version "extensions/v1beta1"
    this is the flink-operator used is: https://github.com/lyft/flinkk8soperator/blob/b5eb19657df91782e51028019e89a7c1badd30d1/docs/quick-start-guide.md is there any latest helm-charts for flink-operator/ flinkapplication that supports "networking.k8s.io/v1"
    g
    • 2
    • 9
  • e

    Echo Lee

    11/14/2022, 11:56 AM
    Hi all, I encountered a very troublesome problem. I found in the process of using the temporal join operator that when I open the operator chain, the two streams can be joined and output the result, but after I close the operator chain, the output result only contains row on the left, null row on the right. My understanding is that the operator chain has no effect on the output result, but this result doesn't seem to be the case. Can anyone explain it to me, thanks
  • r

    Robert Quinlivan

    11/14/2022, 3:48 PM
    I'm trying to link the
    flink-sql-connector-kafka
    jar on my Flink installation. The system errors on startup, with what looks like a version mismatch. However, i'm running
    1.16.0-java8
    and the jar i've imported is
    flink-sql-connector-kafka-1.16.0.jar
    obtained from the docs (https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/). Any tips on what I may be missing? Do i need an additional jar? The message I see is:
    Copy code
    jobmanager_1   | Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/runtime/util/config/memory/ProcessMemorySpec
    jobmanager_1   | 	at java.lang.Class.getDeclaredMethods0(Native Method)
    jobmanager_1   | 	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    jobmanager_1   | 	at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
    jobmanager_1   | 	at java.lang.Class.getMethod0(Class.java:3018)
    jobmanager_1   | 	at java.lang.Class.getMethod(Class.java:1784)
    jobmanager_1   | 	at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:650)
    jobmanager_1   | 	at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:632)
    jobmanager_1   | Caused by: java.lang.ClassNotFoundException: org.apache.flink.runtime.util.config.memory.ProcessMemorySpec
    jobmanager_1   | 	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    jobmanager_1   | 	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    jobmanager_1   | 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    jobmanager_1   | 	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    jobmanager_1   | 	... 7 more
    c
    • 2
    • 1
  • a

    André Casimiro

    11/14/2022, 3:51 PM
    Hi all, I'm building a solution with Pyflink, Pulsar and Avro. I'm experiencing the following exception:
    Copy code
    py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
    org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class java.util.HashMap]) does not exist
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
            at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
            at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
            at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
            at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.lang.Thread.run(Thread.java:750)
    What caught my attention was the
    None
    in
    An error occurred while calling <http://None.org|None.org>.apache.flink...
    on the first line of the exception. Does anyone knows anything about this? Can this be related to Apple M1? This is the relevant code from where the exception is triggered:
    Copy code
    from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema
    
    env.add_jars(
            "file:///home/flink-jars/flink-connector-pulsar-1.16.0.jar",
            "file:///home/flink-jars/flink-sql-connector-pulsar-1.15.1.1.jar",
            # "file:///home/flink-jars/flink-avro-1.16.0.jar",
            "file:///home/flink-jars/flink-sql-avro-1.16.0.jar",
        )
    
    deserialization_schema = PulsarDeserializationSchema.flink_schema(
        AvroRowDeserializationSchema(avro_schema_string=Frame_v1_0.schema()))
    Can anyone help me, please?
    u
    • 2
    • 4
  • d

    Da Huo

    11/14/2022, 5:17 PM
    Hi all! I have a quick question regarding the
    RuntimeContext
    in flink processFunctions. Is it safe to keep a reference of
    RuntimeContext
    as oppose to keep calling
    getRuntimeContext()
    in processFunctions? In other words, call
    getRuntimeContext()
    once in open() and keep the reference to it as oppose to calling
    getRuntimeContext()
    on every element in
    processElement()
    From the Flink github repo, looks like the runtime context is set through the
    setRuntimeContext()
    function call (https://github.com/apache/flink/blob/2454dfa3aa27006198cd969840b06fe7313b56d0/flin[…]in/java/org/apache/flink/api/common/functions/RichFunction.java). Does anyone know if
    setRuntimeContext()
    only called once or is it constantly updating?
  • e

    Eric Xiao

    11/14/2022, 8:36 PM
    Hi, we've been exploring the SQL APIs and processing CDC data and found out that interval joins are only supported for
    ModifyKindSetTrait.INSERT_ONLY
    . Scrolling through some Jira threads and issues, we're not sure if this is a bug and something that could easily be turned on or there is additional work to get this working.
    Copy code
    -- Example error 1
    StreamPhysicalIntervalJoin doesn't support consuming update and delete changes
    -- Example error 2
    Table sink '*anonymous_datastream_sink$4*' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[*anonymous_datastream_source$1*, watermark=[SOURCE_WATERMARK()]]], fields=[id, orderId, vendor, createdAt, updatedAt, _op, rowtime])
    --- 1. Seems to suggest as long as you do some pre filtering an interval join should work: https://lists.apache.org/thread/vowm3h89gyqdhbzg7g5vyoby0n06w589. It is not clear if the pre filtering is also done in SQL or before when the CDC data is still in DataStream land. 2. https://issues.apache.org/jira/browse/FLINK-20487?focusedCommentId=17573619&amp;page=com.[…]lassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel
  • r

    RICHARD JOY

    11/15/2022, 1:55 AM
    Good day everyone! Couple of questions around volume mounted using kubernetes operator’s FlinkDeployment using pod template. 1) My organization has restrictions to use s3/google bucket object storage options. Only have RWO(read write once) option for PVs. Is there a way to apply HA, savepoint and checkpoint using RWO PV/PVC? 2) While defining volume mounts on flink main container using pod template, can it be mounted to only JM and not TMs? Since I use volumes with RWO in main container for downloads, the volume tries to mount on JMs and TMs. Any help appreciated! Thx.
    s
    • 2
    • 5
  • g

    Gaurav Miglani

    11/15/2022, 6:01 AM
    is there any api endpoint, where we can check current restart count of a flink job, not able to get it from /jobs api 🤔
    c
    • 2
    • 7
  • r

    raghav tandon

    11/15/2022, 6:21 AM
    https://apache-flink.slack.com/archives/C03G7LJTS2G/p1668324625942159 Can someone pls help here?
  • t

    Tawfik Yasser

    11/15/2022, 6:42 AM
    Hello, Are watermaks being emitted actually from the
    TimestampsAndWatermarksOperator
    or from
    BoundedOutOfOrdernessWatermarks
    ?
    d
    • 2
    • 4
  • s

    Sumit Nekar

    11/15/2022, 8:47 AM
    Hi Team, I have added a pod template for a FlinkDeployment. I tried to add command section inside this pod/container templates to run another background process to collect logs of the JM and TM pods. The command seems to be not getting picked up by the FlinkDeployment. The command doesnt nt have any issues and it runs fine when run the same going into a running container. Anything wrong with this approach?
    Copy code
    containers:
            - name: flink-main-container
              command: ["/usr/share/filebeat/bin/filebeat"]
              args: ["-c","/opt/flink/filebeat/conf/filebeat.yml","-path.home /opt/flink/filebeat","-path.config /etc/filebeat", "-path.data /opt/flink/filebeat/data" ,"-path.logs /opt/flink/filebeat/log","&"]
    g
    • 2
    • 18
  • h

    Haim Ari

    11/15/2022, 3:38 PM
    Hello, I’m facing this issue when deploying session cluster :
    Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.flink.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority
    • 1
    • 2
  • h

    Haim Ari

    11/15/2022, 3:38 PM
    This session-cluster was removed (Flux Helm release) and then redeployed
  • h

    Haim Ari

    11/15/2022, 3:39 PM
    Can someone advise on this ?
  • n

    Nithin Kumar Vokkarla

    11/15/2022, 5:57 PM
    hi facing this issue while trying to deploy flinkdeployment
    Copy code
    TaskManager memory configuration failed: The configured Total Process Memory size (1.758gb (1887436800 bytes)) is less than the sum of the derived Total Flink Memory size (4.000gb (4294967296 bytes)) and the configured or default JVM Metaspace size (256.000mb (268435456 bytes)).
    but the flinkconfigurations that i mentioned are this
    Copy code
    spec:
      flinkConfiguration:
        jobmanager.memory.jvm-overhead.max: 550mb
        jobmanager.memory.process.size: 1800mb
        rest.server.max-content-length: '209715200'
        taskmanager.memory.flink.size: 4gb
        taskmanager.memory.managed.fraction: '0.01'
        taskmanager.memory.process.size: 3500mb
        taskmanager.memory.task.heap.size: 2800mb
        taskmanager.numberOfTaskSlots: '1'
        web.upload.dir: /opt/flink
    what am i doing wrong here?
    r
    • 2
    • 1
  • c

    Connie Yang

    11/16/2022, 1:21 AM
    Hello! I'm new to Flink cluster creation using the Flink Operator via CRD. My goal is to create a Flink cluster in session mode. Once the cluster is up and running, is there a configuration on the Flink Dashboard/console I can confirm that my cluster is created in session mode?
    m
    • 2
    • 5
  • d

    Donatien Schmitz

    11/16/2022, 9:51 AM
    Hi, What is the allocation strategy for fine-grain slots on multiple TM? Is there some sort of binpacking or evenly spread?
    s
    • 2
    • 2
1...323334...98Latest