https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jaya Ananthram

    07/27/2022, 9:30 AM
    Hello, Is there any technical difference between running kafka consumer (table API) from a Flink standalone cluster and Kubernetes deployment (EKS through flink k8's operator)? The standalone cluster is able to consume messages from Kafka (MSK) in my local, but in AWS (EKS), the Flink consumer reads a very few initial messages (say around 10), and then it is not reading any messages (surely there are a lot of messages in Kafka topic). MSK version is
    3.2.0
    and Kafka client version is
    2.4.1
    (that comes through Flink flink-connector-kafka_2.12 - 1.14.5). From Kafka docs, it should work also it is working in my local Flink standalone cluster (by connecting to MSK). Even the pod is able to read the message from the kafka console consumer. So I can't find the reason 😬. Any idea?
    h
    m
    • 3
    • 71
  • p

    Parthiban PR

    07/27/2022, 10:41 AM
    Hello! Quick question on Kafka -> My kafka consumer is not consuming all of my produced messages. I produce message via an automated code but my consumer misses some of the messages. I found this by adding line number count to the message. Any help is much appreciated!
    m
    • 2
    • 2
  • n

    Nipuna Shantha

    07/27/2022, 2:24 PM
    I have faced an issue when Dynamically configuring values using CLI commands. The problem is whether I try many methods and still I did not figure out how to change the following values dynamically. 1. jobmanager.rpc.port 2. jobmanager.memory.process.size 3. taskmanager.memory.process.size Whether I try to use -D when starting and submitting jobs and tasks those commands are not working fine with these variables. Is there a possible way to configure these values dynamically? Following are some example commands I have tried and not given a better output.
    jobmanager.sh start-foreground <rpc.address> <web-port> -Djobmanager.rpc.port=11100 -Djobmanager.memory.process.size=1g
    taskmanager.sh start-foreground -Dtaskmanager.memory.process.size=40g
    ✅ 1
    h
    m
    c
    • 4
    • 26
  • g

    George Chen

    07/27/2022, 4:01 PM
    Hi, I have a probably dumb question. if one uses a singleton in open method of Rich function, how will it be handled by parallel task instances? e.g.
    Copy code
    public void open(final Configuration parameters) {
      obj = SingletonClass.getInstance();
    }
    I only get some clue in https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/dataset/overview/#passing-parameters-to-functions where it says
    The parameters are serialized as part of the function object and shipped to all parallel task instances.
    but here my instance is non-serializable so I hope to better understand how Flink handles it
    ✅ 1
    c
    • 2
    • 2
  • g

    George Chen

    07/27/2022, 5:00 PM
    Another question on State and parallelism. My use case is to do batching on keyedState. Batching is based off count and timeout. My approach is to implement
    KeyedProcessFunction
    . I was wondering what choice of State might yields the max parallelism. Currently I am using
    ValueState
    like the following:
    Copy code
    public class EMFMetricBatchKeyedProcessFunction extends KeyedProcessFunction<String, EMFMetric, EMFMetricBatch> {
        private final int maxBatchSize;
        private final long timeoutInMillis;
        private final ValueStateDescriptor<EMFMetricBatch.EMFMetricBatchBuilder> valueStateDescriptor;
        private ValueState<EMFMetricBatch.EMFMetricBatchBuilder> emfMetricBatchBuilderValueState;
    
        public EMFMetricBatchKeyedProcessFunction(final int maxBatchSize,
                                                  final long timeoutInMillis,
                                                  final String valueStateName,
                                                  final FlinkValueStateDescriptorFactory flinkValueStateDescriptorFactory) {
            this.maxBatchSize = maxBatchSize;
            this.timeoutInMillis = timeoutInMillis;
            this.valueStateDescriptor = flinkValueStateDescriptorFactory.createValueStateDescriptor(
                    valueStateName, EMFMetricBatch.EMFMetricBatchBuilder.class);
        }
    
        @Override
        public void open(final Configuration parameters) {
            emfMetricBatchBuilderValueState = getRuntimeContext().getState(valueStateDescriptor);
        }
    
        @Override
        public void processElement(final EMFMetric emfMetric,
                                   final KeyedProcessFunction<String, EMFMetric, EMFMetricBatch>.Context ctx,
                                   final Collector<EMFMetricBatch> out) throws Exception {
            EMFMetricBatch.EMFMetricBatchBuilder currentBatchBuilder = emfMetricBatchBuilderValueState.value();
            final TimerService timerService = ctx.timerService();
            final String currentKey = ctx.getCurrentKey();
            if (currentBatchBuilder == null) {
                final long createdAt = timerService.currentProcessingTime();
                currentBatchBuilder = createNewEMFMetricBatchBuilder(createdAt, currentKey);
                timerService.registerProcessingTimeTimer(createdAt + timeoutInMillis);
            }
            addNewMetricAndUpdateState(emfMetric, currentBatchBuilder);
    
            if (currentBatchBuilder.getBatchSize() >= maxBatchSize) {
                <http://log.info|log.info>("Flush EMFMetricBatch for key:{} due to metrics count reaching maximum batch size", currentKey);
                collectAndClearState(currentBatchBuilder, out);
                timerService.deleteProcessingTimeTimer(currentBatchBuilder.getCreatedAtTimestamp() + timeoutInMillis);
            }
        }
    
        @Override
        public void onTimer(final long timestamp,
                            final KeyedProcessFunction<String, EMFMetric, EMFMetricBatch>.OnTimerContext ctx,
                            final Collector<EMFMetricBatch> out) throws Exception {
            final EMFMetricBatch.EMFMetricBatchBuilder currentBatchBuilder = emfMetricBatchBuilderValueState.value();
            if (currentBatchBuilder != null && timestamp >= currentBatchBuilder.getCreatedAtTimestamp() + timeoutInMillis) {
                <http://log.info|log.info>("Flush EMFMetricBatch for key:{} due to timeout", ctx.getCurrentKey());
                collectAndClearState(currentBatchBuilder, out);
            }
        }
    
        private EMFMetricBatch.EMFMetricBatchBuilder createNewEMFMetricBatchBuilder(
                final long createdAt, final String accountId) {
            return EMFMetricBatch.builder().createdAtTimestamp(createdAt).accountId(accountId);
        }
    
        private void addNewMetricAndUpdateState(final EMFMetric emfMetric,
                                                final EMFMetricBatch.EMFMetricBatchBuilder emfMetricBatchBuilder)
                throws IOException {
            emfMetricBatchBuilder.emfMetric(emfMetric);
            emfMetricBatchBuilderValueState.update(emfMetricBatchBuilder);
        }
    
        private void collectAndClearState(final EMFMetricBatch.EMFMetricBatchBuilder batchBuilder,
                                     final Collector<EMFMetricBatch> out) {
            out.collect(batchBuilder.build());
            emfMetricBatchBuilderValueState.clear();
        }
    }
    I wonder if there is any better implementation in increasing parallelism since batching is not like aggregation in the sense that it does not depends on all tasks
    h
    • 2
    • 9
  • s

    Stephan Weinwurm

    07/27/2022, 9:59 PM
    Hi all, I need some help understanding Flink State Function’s behaviour under load. When processing a large backlog, we do see frequent timeouts to the State Functions. It seems like no matter the settings (we currently use
    parallelism.default: 1
    ,
    taskmanager.numberOfTaskSlots: 10
    ,
    statefun.async.max-per-task: 1024
    ) Flink only works through the backlog relatively slowly. We run the master in Kubernetes HA mode. There are two jobmanager pods, three taskmanager pods, and 1 Statefun Pod. We have since turn off any adaptive scheduling as well as k8s autoscaling to figure out what the optimal settings are. Overall the setup looks healthy in regards to CPU / Memory / Network etc. However, Flink only works through the backlog very slowly and we very frequently see this error in worker logs.
    Copy code
    org.apache.flink.statefun.flink.core.httpfn.RetryingCallback [] - Retriable exception caught while trying to deliver a message: ToFunctionRequestSummary(address=Address(com.x.dummy, dummy, eecf7c1b-fe6a-466f-9072-e563fc88554e), batchSize=1, totalSizeInBytes=971, numberOfStates=0)
    java.net.ConnectException: Failed to connect to snooron-worker-dummy.snooron-functions/192.168.18.71:9090
    	at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:265) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:183) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:172) [statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [statefun-flink-distribution.jar:3.2.0]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.net.ConnectException: Cannot assign requested address (connect failed)
    	at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?]
    	at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) ~[?:?]
    	at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) ~[?:?]
    	at java.net.AbstractPlainSocketImpl.connect(Unknown Source) ~[?:?]
    	at java.net.SocksSocketImpl.connect(Unknown Source) ~[?:?]
    	at java.net.Socket.connect(Unknown Source) ~[?:?]
    	at okhttp3.internal.platform.Platform.connectSocket(Platform.java:130) ~[statefun-flink-distribution.jar:3.2.0]
    	at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:263) ~[statefun-flink-distribution.jar:3.2.0]
    	... 22 more
    This is the endpoint configuration:
    Copy code
    spec:
        endpoints:
          - endpoint:
              meta:
                kind: io.statefun.endpoints.v1/http
              spec:
                functions: com.x.dummy/dummy
                urlPathTemplate: <http://snooron-worker-dummy.snooron-functions:9090/statefun>
                timeouts:
                  call: 2 min
                  read: 2 min
                  write: 2 min
                maxNumBatchRequests: 100
    We have also tried using the v2 endpoint config where
    netty
    is used instead of
    okhttp
    but with similar results. If the load further increases, we see lots of other exceptions related to the calls to the HTTP State function such as `SocketTimeout`s and eventually the jobmanager continuously restarts. My guess is that thread pools are saturated and communication between jobmanager and taskmanager is impeded, leading to timeouts. Any help in this would be greatly appreciated! As a side-question, is there a documentation how
    parallelism.default
    ,
    taskmanager.numberOfTaskSlots
    ,
    statefun.async.max-per-task
    relate to different Kafka topics and their partitions? Does one task, executing in a taskslot, consume from one topic and send all the messages to the state functions via HTTP? How do the other properties factor in?
    d
    • 2
    • 5
  • d

    Duc Anh Khu

    07/27/2022, 10:39 PM
    hi all, I'm using PyFlink and trying to deserialize from Kafka message (protobuf) to JSON. The examples from Flink doc points to
    ROW
    a lot. AFAIK,
    ROW
    is a way to map a schema to Flink types. Is it possible in PyFlink to not use
    ROW
    to do something similar to Java SDK and just get an JSON object as return type? Here is my Java
    DeserializationSchema
    :
    Copy code
    import com.google.protobuf.Message;
    import com.google.protobuf.Parser;
    import com.google.protobuf.util.JsonFormat;
    
    public class ProtoJsonDeserializationSchema<T extends Message>
        implements DeserializationSchema<ObjectNode> {
    
      private static final long serialVersionUID = -4257275849353885017L;
      private final Parser<T> parser;
    
      private final ObjectMapper mapper = new ObjectMapper();
    
      public ProtoJsonDeserializationSchema(Parser<T> parser) {
        this.parser = parser;
      }
    
      @Override
      public ObjectNode deserialize(byte[] message) throws IOException {
        var jsonFormat = JsonFormat.printer();
        var t = parser.parseFrom(message);
        var jsonString = jsonFormat.print(t);
    
        return (ObjectNode) mapper.readTree(jsonString);
      }
    ...
    }
  • j

    Jeff Levesque

    07/28/2022, 12:06 AM
    I have a tumbling window that
    INSERT
    into a
    print
    connector sink table, which outputs as follows in my IDE:
    Copy code
    +I[AMZN, 2022-07-20T20:56, 2022-07-20T20:57, 82.64, 34.95, 0.05, 99.81]
    +I[TSLA, 2022-07-20T20:56, 2022-07-20T20:57, 54.89, 93.62, 0.11, 99.91]
    +I[MSFT, 2022-07-20T20:56, 2022-07-20T20:57, 43.12, 76.65, 0.69, 99.79]
    +I[AAPL, 2022-07-20T20:56, 2022-07-20T20:57, 65.29, 93.06, 0.0, 99.71]
    +I[AAPL, 2022-07-20T20:57, 2022-07-20T20:58, 9.86, 10.97, 0.25, 99.94]
    +I[MSFT, 2022-07-20T20:57, 2022-07-20T20:58, 80.06, 64.48, 0.01, 99.86]
    +I[AMZN, 2022-07-20T20:57, 2022-07-20T20:58, 30.36, 37.71, 0.62, 99.97]
    +I[TSLA, 2022-07-20T20:57, 2022-07-20T20:58, 84.05, 38.65, 0.02, 100.0]
    +I[MSFT, 2022-07-20T20:58, 2022-07-20T20:59, 48.8, 39.57, 0.2, 99.89]
    +I[TSLA, 2022-07-20T20:58, 2022-07-20T20:59, 25.3, 82.68, 0.15, 99.93]
    +I[AAPL, 2022-07-20T20:58, 2022-07-20T20:59, 15.78, 86.46, 0.12, 99.98]
    +I[AMZN, 2022-07-20T20:58, 2022-07-20T20:59, 15.77, 10.04, 0.22, 99.96]
    +I[AMZN, 2022-07-20T20:59, 2022-07-20T21:00, 12.63, 46.2, 0.06, 99.75]
    +I[MSFT, 2022-07-20T20:59, 2022-07-20T21:00, 36.7, 8.63, 0.1, 99.95]
    +I[AAPL, 2022-07-20T20:59, 2022-07-20T21:00, 91.41, 44.91, 0.01, 99.55]
    +I[TSLA, 2022-07-20T20:59, 2022-07-20T21:00, 94.23, 6.09, 0.36, 99.81]
    Now, I want to perform an aggregated sliding window for the above results with a UDF. Since each record already has a
    window_start
    and
    window_end
    field from previous tumbling window, I'm partly wondering if I should slide on the
    window_start
    , and use a similar
    HOP
    construct for the successive sliding window. To start, I create a very simply PyFlink query, without any UDF:
    Copy code
    return tbl_env.sql_query('''
            SELECT
                {4}
            FROM {0}
            GROUP BY
                HOP(TABLE {0}, DESCRIPTOR({1}), INTERVAL {2}, INTERVAL {3}),
                ticker
        '''.format(
            input_table_name,
            window_start,
            '1.minute',
            '8.hours',
            field_ticker
        ))
    When I try to run the above SQL, I get the following output trace:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o4.sqlQuery.
    : org.apache.flink.table.api.ValidationException: SQL validation failed. At line 0, column 0: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.calcite.runtime.CalciteContextException: At line 0, column 0: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
    	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4860)
    	at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
    	at org.apache.calcite.sql.type.OperandTypes$6.checkSingleOperandType(OperandTypes.java:672)
    	at org.apache.calcite.sql.type.OperandTypes$6.checkOperandTypes(OperandTypes.java:680)
    	at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
    	at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
    	at org.apache.calcite.sql.SqlInternalOperator.deriveType(SqlInternalOperator.java:83)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
    	at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
    	at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:274)
    	at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419)
    	at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:207)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5397)
    	at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)
    	at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:273)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4111)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupByExpr(SqlValidatorImpl.java:3864)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupByItem(SqlValidatorImpl.java:3849)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3921)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
    	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
    	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
    	... 15 more
    Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
    	at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
    	... 48 more
    
    
    Process finished with exit code 1
    d
    • 2
    • 1
  • s

    Sylvia Lin

    07/28/2022, 6:07 AM
    Hi team, I'm working on flink operator <> fluxCD integration, can someone help to take a look at my job submission failure? My flink job failed to be submitted with fluxCD error message:
    Copy code
    FlinkDeployment/dataeng-admin/basic-example dry-run failed, reason: InternalError, error: Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.operators.svc:443/validate?timeout=10s>": context deadline exceeded
    I installed cert-manager(v.1.8.1) and flink operator(v1.1.0) both via helm release, manifests and job manifest are attached. And I see these pods are all running fine:
    Copy code
    sylvia$ kg pods -n cert-manager
    NAME                                                   READY   STATUS    RESTARTS   AGE
    cert-manager-cert-manager-5d555fc65d-8drd9             1/1     Running   0          8h
    cert-manager-cert-manager-cainjector-d549d56bc-lnpwd   1/1     Running   0          8h
    cert-manager-cert-manager-webhook-669889588b-cs4v5     1/1     Running   0          8h
    sylvia$ kg pods -n operators
    NAME                                         READY   STATUS    RESTARTS   AGE
    flink-kubernetes-operator-57bd9f8884-v7td2   2/2     Running   0          8h
    cert-manager.yamlflink-operator.yamlflink-v2.yaml
    g
    • 2
    • 22
  • m

    Mustafa Akur

    07/28/2022, 6:28 AM
    Hi all, I want to use Python UDF in flink application. However I don't want to use TABLE API&SQL (explained in this link https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/). Is there any method to accomplish this in datastream api. Kind regards
    d
    • 2
    • 5
  • f

    Felix Angell

    07/28/2022, 10:14 AM
    what's the best way to go around testing a pyflink app on local, e.g. mocking out bits of pyflinks apis? im looking to write something for deserialisation of kafka messages but not sure what the best options are for testing on local
    • 1
    • 1
  • a

    Adesh Dsilva

    07/28/2022, 11:37 AM
    Hello I have 4 sources(S1,S2,S3,S4) consuming from 4 topics in Kafka. I want to perform same processing for all sources but few sources will have some additional processing. So, there are two ways I can write the program S1, S2, S3, S4 -> process-1 -> process-2 -> filter (S2, S3) or sideoutput -> process-3 VS S2, S3 -> process-1 -> process-2 -> process-3 S1, S4 -> process-1 -> process-2 Will option 1 have a performance impact? Which would be better? Also, there are no sinks for this program, it’s just api calls.
    h
    • 2
    • 8
  • a

    Aviv Dozorets

    07/28/2022, 12:30 PM
    Hi, question: Using Flink 1.14 with kafka 2.6+, and we wanted to enable kip-392 (rack-awareness, e.g. reading from closest replica in same AZ). Looking over
    KafkaSource
    documentation and source code, can’t seem to find whether it’s same configuration as for general kafka client, e.g.
    client.rack
    configuration. So what’s the preferred way ?
    m
    • 2
    • 10
  • p

    Pedro Cunha

    07/28/2022, 2:22 PM
    Hello everyone. I’m having a bit of an issue here that I can’t get around. So we’re using Flink 1.14 with Scala. I had a state where all classes are POJOs (using the
    @BeanProperty
    annotation in Scala to achieve this) and everything was working fine. Last month, I added a few more Strings to the schema, which I didn’t think would be an issue. But when I deployed it, I keep getting this error
    A serializer has already been registered for the state; re-registration is not allowed.
    I’m not registering any of my state classes by hand, so all was done by Flink itself. I can’t seem to understand what’s causing this problem… Can anyone help? Stacktrace below
    Copy code
    Caused by: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
    	at org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)
    	at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:683)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
    	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
    🧵 1
    m
    • 2
    • 2
  • p

    Pedro Cunha

    07/28/2022, 4:08 PM
    There is also a very weird thing happening at the same time, which I’m not sure it’s related or not.
    Copy code
    Caused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state.
    	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:194)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:787)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:702)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
    	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
    	at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
    	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
    	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
    	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
    	... 27 more
    Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: ohn Oliver
    Serialization trace:
    channelId (com.my.own.app.pojo.User)
    	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
    	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    	at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
    	at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
    	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
    	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:409)
    	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:409)
    	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:191)
    	... 37 more
    My POJO is called User, it has two fields: One is
    channelId: EntityId
    (EntityId is just another POJO with a string field) and the other is
    name: String
    . When trying to restore state, it’s trying to find a class that doesn’t exist but is, in fact, the name.
  • p

    Pedro Cunha

    07/28/2022, 4:09 PM
    Could it be this setting?
    setClosureCleanerLevel()
    the default is
    RECURSIVE
    and I’m not sure how this is working with Scala
  • a

    Adrian Chang

    07/28/2022, 7:28 PM
    Hello, I am using Python and I want to convert a Table to DataStream one of my column is defines as
    Copy code
    tsMs TIMESTAMP(3) NOT NULL
    then when I execute
    Copy code
    t_env.to_data_stream(table)
    I got
    Copy code
    Caused by: java.lang.UnsupportedOperationException: Could not find type serializer for current type [LocalDateTime].
    However if I define my column as
    Copy code
    tsMs BIGINT NOT NULL
    everything works fine. I need to define
    tsMs
    as
    TIMESTAMP(3)
    because I want to define the watermark and do some windows over that column.
    • 1
    • 1
  • j

    Jin Yi

    07/28/2022, 7:45 PM
    i'm trying to build a specific flink-formats maven package using
    mvn clean package
    from within the subdir containing the pom.xml file, but i'm hitting this exception:
    Copy code
    Caused by: java.lang.IllegalAccessError: class com.google.googlejavaformat.java.RemoveUnusedImports (in unnamed module @0x3fecb076) cannot access class com.sun.tools.javac.util.Context (in module jdk.compiler) because module jdk.compiler does not export com.sun.tools.javac.util to unnamed module @0x3fecb076
            at com.google.googlejavaformat.java.RemoveUnusedImports.removeUnusedImports(RemoveUnusedImports.java:187)
            at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
            ... 34 more
    • 1
    • 2
  • j

    Jeff Levesque

    07/29/2022, 2:12 AM
    Yesterday, I was trying to pass my
    print
    connector sink into what I had hoped to be a sliding window using
    HOP
    . However, I tried something different now. So, I tried to pass in my sink table (where data being
    INSERT
    from tumbling window via
    SELECT
    ). However, when I tried to
    SELECT
    from my tumbling window `print`/sink table (i.e.
    output_table_tumbling_window
    ):
    Copy code
    SELECT ticker, window_start
    FROM
        output_table_tumbling_window
    GROUP BY
        ticker,
        window_start
    I get the following error:
    Copy code
    Table options are:
    
    'connector'='print'
    	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
    	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
    	at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
    	at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:480)
    	at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
    	at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
    	... 29 more
    
    
    Process finished with exit code 1
    I think if I sink my tumbling window result into kinesis stream, I can read it into a table then perform subsequent sliding window on it. However, this seems like an unnecessary step. Someone here a few days ago suggested to perform sliding window directly on the
    print
    table, which is what I'm trying to do now.
    j
    a
    • 3
    • 6
  • k

    Kyle Meow

    07/29/2022, 9:54 AM
    Hi Devs, I found that the official document (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/) says TIME type in SQL only supports a precision of 0 (e.g. 150133) on the table of List of Data Types and I would confirm this in
    org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime
    . However in the latter half of this document, it also says the precision of TIME could be 0~9, which is confusing. And when I dig deeper, it seems that
    org.apache.flink.table.data.conversion.TimeLocalTimeConverter
    could actually support a precision up to 3. Additionally, if we change the code at
    org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime
    from
    localTime.toSecondOfDay() * 1000
    to
    localTime.toNanoOfDay() / 1000000
    , then JSON format could support a precision up to 3 instantly. May I conclude that the precision of TIME type actually depends on the format chosen by user, and the maximum value is 3?
  • m

    Mustafa Akur

    07/29/2022, 11:01 AM
    In java api of the flink I can use .setParalellisim() function to determine paralellism of the operation. When I use TABLE & SQL API, is there a way to determine paralelism explicitly in the code. For instance for query below
    Copy code
    SELECT SUM(quantity) FROM Orders 
    GROUP BY product;
    I want to set paralellism 2. But For query below
    Copy code
    SELECT MAX(value) FROM Transactions
    GROUP BY currency;
    I want to set paralellism 1. How can I accomplish that. When I change flink-conf.yaml It manages paralellism its own. I do not have control on the different parts of the pipeline. Kind regards
    d
    • 2
    • 2
  • j

    Jeesmon Jacob

    07/29/2022, 8:15 PM
    Hi team, in kubernetes operator, is it possible to override logging configuration per
    FlinkDeployment
    ? For debugging purpose there may be a case when we need to set logging level to DEBUG for just one
    FlinkDeployment
    without affecting others. If we set the override in
    spec.flinkConfiguration
    , will it override per deployment?
    g
    • 2
    • 19
  • p

    Parthiban PR

    07/31/2022, 5:56 AM
    Hello! QQ: Am running a local flink application to read data from Kafka and process it via TableAPI. Am analysing the memory utilisation. When I look at flink dashboard, I see only heap memory is being used and not managed memory, although managed memory seem to have more memory allocated. Any reason for it and how I could make use of managed memory? Any help is much appreciated. Thanks! PS: My job fails with OOM, when heap memory is completely used.
    m
    d
    l
    • 4
    • 8
  • s

    Samin Ahbab

    07/31/2022, 5:54 PM
    Question: What is the state of Flink Gelly? I am looking at the confluence that the maintainers are using, and the tickets seem quite stale to me. It looks like there is no active development on Flink Gelly. Is this true?
    m
    • 2
    • 5
  • h

    Hunter

    08/01/2022, 1:45 AM
    Hello, guys, how can I query the data of the table through the connect kafka table created by flink through the hive catalog?
  • e

    Emily Li

    08/01/2022, 4:05 AM
    Hello team, I have flink app deployed in kubernetes, and sending the metrics to datadog via the
    DatadogHttpReporterFactory
    , and I noticed for my custom metrics, it’s getting each metrics group that I added as part of the metric name in datadog, and send them separately like the image below… for example
    flink.operator.testmetricsdatadog.topic.ekata_lookups.subtaskIndex.0.eventCount
    it takes my custom group
    topic
    and
    subtaskindex
    as part of the metric name … The code :
    Copy code
    private def buildEventCounter(topic: Topic): Counter =
        getRuntimeContext.getMetricGroup
          .addGroup("testmetricsdatadog")
          .addGroup("topic", topic.value)
          .addGroup("subtaskIndex", getRuntimeContext.getIndexOfThisSubtask.toString)
          .counter("eventCount")
    Ideally, I would want all of them to be counted as one metric, so the metric name I want is
    flink.operator.testmetricsdatadog.eventCount
    (a more generic metric name), and the
    topic
    and
    subtaskindex
    to be added as tags for the metric only. So I can view the metrics in a dashboard, filtering by topic name etc. But I looked at the flink
    DatadogHttpReporter
    implementation, it looks like that’s what it’ll do, take all groups as part of the metric name. Because the metrics are separated in datadog, I couldn’t find a way to combine the metrics into one in easily in datadog. Just wondering if I did anything wrong, or is there a better way to do this?
    ✅ 1
    c
    • 2
    • 10
  • a

    Anoop Khandelwal

    08/01/2022, 5:31 AM
    Hello Team, Can someone please have a look at this issue- https://github.com/streamnative/pulsar-flink/issues/610 Brief: Flink-Pulsar-Connector is not doing the cumulative acknowledgement and there are no error logs (DEBUG mode) as well
    m
    • 2
    • 6
  • l

    laxmi narayan

    08/01/2022, 8:19 AM
    is it possible to access a savepoint created by another flink-cluster and run flink-sql-queries on (flink state-store cache) the data ? I am exploring an option where we could re-use the output of flink checkpoint or savepoint.
    m
    • 2
    • 8
  • s

    shao

    08/01/2022, 8:45 AM
    Hello everyone! Is it suitable for flink to run as a data warehouse using the queryable state feature? (Run an api using queryable state client)
    s
    • 2
    • 1
  • d

    Darin Lee

    08/01/2022, 10:17 AM
    NoClassDefFoundError,but I can find classes in the shadow package .I want to know what happend .Thanks! Here are the logs
    Copy code
    2022-07-29 21:15:50,800 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version : 2.0.1
    2022-07-29 21:15:50,800 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId : fa14705e51bd2ce5
    2022-07-29 21:15:51,350 INFO  org.apache.kafka.clients.Metadata                             - Cluster ID: KUr7pbGDRkGISnTpRo3wPg
    2022-07-29 21:15:52,182 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Source: expose (4/4) (46dff208ab4431650caafb547038f668).
    2022-07-29 21:15:52,183 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: expose (4/4) (46dff208ab4431650caafb547038f668) switched from RUNNING to CANCELING.
    2022-07-29 21:15:52,183 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: expose (4/4) (46dff208ab4431650caafb547038f668).
    2022-07-29 21:15:52,190 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: expose (4/4) (46dff208ab4431650caafb547038f668) switched from CANCELING to CANCELED.
    2022-07-29 21:15:52,190 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: expose (4/4) (46dff208ab4431650caafb547038f668).
    2022-07-29 21:15:52,191 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: expose (4/4) (46dff208ab4431650caafb547038f668) [CANCELED]
    2022-07-29 21:15:52,191 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering task and sending final execution state CANCELED to JobManager for task Source: expose 46dff208ab4431650caafb547038f668.
    2022-07-29 21:15:52,437 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-2, groupId=RecSysRtMsgConsumerUat] Discovered group coordinator 99.47.134.56:9092 (id: 2147483646 rack: null)
    2022-07-29 21:15:52,447 ERROR org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=consumer-2, groupId=RecSysRtMsgConsumerUat] Failed to close coordinator
    java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetCommitRequest$PartitionData
    	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:795)
    m
    • 2
    • 16
1...91011...98Latest