Jaya Ananthram
07/27/2022, 9:30 AM3.2.0
and Kafka client version is 2.4.1
(that comes through Flink flink-connector-kafka_2.12 - 1.14.5). From Kafka docs, it should work also it is working in my local Flink standalone cluster (by connecting to MSK). Even the pod is able to read the message from the kafka console consumer. So I can't find the reason 😬. Any idea?Parthiban PR
07/27/2022, 10:41 AMNipuna Shantha
07/27/2022, 2:24 PMjobmanager.sh start-foreground <rpc.address> <web-port> -Djobmanager.rpc.port=11100 -Djobmanager.memory.process.size=1g
taskmanager.sh start-foreground -Dtaskmanager.memory.process.size=40g
George Chen
07/27/2022, 4:01 PMpublic void open(final Configuration parameters) {
obj = SingletonClass.getInstance();
}
I only get some clue in https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/dataset/overview/#passing-parameters-to-functions where it says The parameters are serialized as part of the function object and shipped to all parallel task instances.
but here my instance is non-serializable so I hope to better understand how Flink handles itGeorge Chen
07/27/2022, 5:00 PMKeyedProcessFunction
. I was wondering what choice of State might yields the max parallelism. Currently I am using ValueState
like the following:
public class EMFMetricBatchKeyedProcessFunction extends KeyedProcessFunction<String, EMFMetric, EMFMetricBatch> {
private final int maxBatchSize;
private final long timeoutInMillis;
private final ValueStateDescriptor<EMFMetricBatch.EMFMetricBatchBuilder> valueStateDescriptor;
private ValueState<EMFMetricBatch.EMFMetricBatchBuilder> emfMetricBatchBuilderValueState;
public EMFMetricBatchKeyedProcessFunction(final int maxBatchSize,
final long timeoutInMillis,
final String valueStateName,
final FlinkValueStateDescriptorFactory flinkValueStateDescriptorFactory) {
this.maxBatchSize = maxBatchSize;
this.timeoutInMillis = timeoutInMillis;
this.valueStateDescriptor = flinkValueStateDescriptorFactory.createValueStateDescriptor(
valueStateName, EMFMetricBatch.EMFMetricBatchBuilder.class);
}
@Override
public void open(final Configuration parameters) {
emfMetricBatchBuilderValueState = getRuntimeContext().getState(valueStateDescriptor);
}
@Override
public void processElement(final EMFMetric emfMetric,
final KeyedProcessFunction<String, EMFMetric, EMFMetricBatch>.Context ctx,
final Collector<EMFMetricBatch> out) throws Exception {
EMFMetricBatch.EMFMetricBatchBuilder currentBatchBuilder = emfMetricBatchBuilderValueState.value();
final TimerService timerService = ctx.timerService();
final String currentKey = ctx.getCurrentKey();
if (currentBatchBuilder == null) {
final long createdAt = timerService.currentProcessingTime();
currentBatchBuilder = createNewEMFMetricBatchBuilder(createdAt, currentKey);
timerService.registerProcessingTimeTimer(createdAt + timeoutInMillis);
}
addNewMetricAndUpdateState(emfMetric, currentBatchBuilder);
if (currentBatchBuilder.getBatchSize() >= maxBatchSize) {
<http://log.info|log.info>("Flush EMFMetricBatch for key:{} due to metrics count reaching maximum batch size", currentKey);
collectAndClearState(currentBatchBuilder, out);
timerService.deleteProcessingTimeTimer(currentBatchBuilder.getCreatedAtTimestamp() + timeoutInMillis);
}
}
@Override
public void onTimer(final long timestamp,
final KeyedProcessFunction<String, EMFMetric, EMFMetricBatch>.OnTimerContext ctx,
final Collector<EMFMetricBatch> out) throws Exception {
final EMFMetricBatch.EMFMetricBatchBuilder currentBatchBuilder = emfMetricBatchBuilderValueState.value();
if (currentBatchBuilder != null && timestamp >= currentBatchBuilder.getCreatedAtTimestamp() + timeoutInMillis) {
<http://log.info|log.info>("Flush EMFMetricBatch for key:{} due to timeout", ctx.getCurrentKey());
collectAndClearState(currentBatchBuilder, out);
}
}
private EMFMetricBatch.EMFMetricBatchBuilder createNewEMFMetricBatchBuilder(
final long createdAt, final String accountId) {
return EMFMetricBatch.builder().createdAtTimestamp(createdAt).accountId(accountId);
}
private void addNewMetricAndUpdateState(final EMFMetric emfMetric,
final EMFMetricBatch.EMFMetricBatchBuilder emfMetricBatchBuilder)
throws IOException {
emfMetricBatchBuilder.emfMetric(emfMetric);
emfMetricBatchBuilderValueState.update(emfMetricBatchBuilder);
}
private void collectAndClearState(final EMFMetricBatch.EMFMetricBatchBuilder batchBuilder,
final Collector<EMFMetricBatch> out) {
out.collect(batchBuilder.build());
emfMetricBatchBuilderValueState.clear();
}
}
I wonder if there is any better implementation in increasing parallelism since batching is not like aggregation in the sense that it does not depends on all tasksStephan Weinwurm
07/27/2022, 9:59 PMparallelism.default: 1
, taskmanager.numberOfTaskSlots: 10
, statefun.async.max-per-task: 1024
) Flink only works through the backlog relatively slowly. We run the master in Kubernetes HA mode. There are two jobmanager pods, three taskmanager pods, and 1 Statefun Pod. We have since turn off any adaptive scheduling as well as k8s autoscaling to figure out what the optimal settings are.
Overall the setup looks healthy in regards to CPU / Memory / Network etc. However, Flink only works through the backlog very slowly and we very frequently see this error in worker logs.
org.apache.flink.statefun.flink.core.httpfn.RetryingCallback [] - Retriable exception caught while trying to deliver a message: ToFunctionRequestSummary(address=Address(com.x.dummy, dummy, eecf7c1b-fe6a-466f-9072-e563fc88554e), batchSize=1, totalSizeInBytes=971, numberOfStates=0)
java.net.ConnectException: Failed to connect to snooron-worker-dummy.snooron-functions/192.168.18.71:9090
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:265) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:183) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:172) [statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [statefun-flink-distribution.jar:3.2.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.net.ConnectException: Cannot assign requested address (connect failed)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?]
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) ~[?:?]
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) ~[?:?]
at java.net.AbstractPlainSocketImpl.connect(Unknown Source) ~[?:?]
at java.net.SocksSocketImpl.connect(Unknown Source) ~[?:?]
at java.net.Socket.connect(Unknown Source) ~[?:?]
at okhttp3.internal.platform.Platform.connectSocket(Platform.java:130) ~[statefun-flink-distribution.jar:3.2.0]
at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:263) ~[statefun-flink-distribution.jar:3.2.0]
... 22 more
This is the endpoint configuration:
spec:
endpoints:
- endpoint:
meta:
kind: io.statefun.endpoints.v1/http
spec:
functions: com.x.dummy/dummy
urlPathTemplate: <http://snooron-worker-dummy.snooron-functions:9090/statefun>
timeouts:
call: 2 min
read: 2 min
write: 2 min
maxNumBatchRequests: 100
We have also tried using the v2 endpoint config where netty
is used instead of okhttp
but with similar results.
If the load further increases, we see lots of other exceptions related to the calls to the HTTP State function such as `SocketTimeout`s and eventually the jobmanager continuously restarts. My guess is that thread pools are saturated and communication between jobmanager and taskmanager is impeded, leading to timeouts.
Any help in this would be greatly appreciated!
As a side-question, is there a documentation how parallelism.default
, taskmanager.numberOfTaskSlots
, statefun.async.max-per-task
relate to different Kafka topics and their partitions? Does one task, executing in a taskslot, consume from one topic and send all the messages to the state functions via HTTP? How do the other properties factor in?Duc Anh Khu
07/27/2022, 10:39 PMROW
a lot. AFAIK, ROW
is a way to map a schema to Flink types. Is it possible in PyFlink to not use ROW
to do something similar to Java SDK and just get an JSON object as return type? Here is my Java DeserializationSchema
:
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import com.google.protobuf.util.JsonFormat;
public class ProtoJsonDeserializationSchema<T extends Message>
implements DeserializationSchema<ObjectNode> {
private static final long serialVersionUID = -4257275849353885017L;
private final Parser<T> parser;
private final ObjectMapper mapper = new ObjectMapper();
public ProtoJsonDeserializationSchema(Parser<T> parser) {
this.parser = parser;
}
@Override
public ObjectNode deserialize(byte[] message) throws IOException {
var jsonFormat = JsonFormat.printer();
var t = parser.parseFrom(message);
var jsonString = jsonFormat.print(t);
return (ObjectNode) mapper.readTree(jsonString);
}
...
}
Jeff Levesque
07/28/2022, 12:06 AMINSERT
into a print
connector sink table, which outputs as follows in my IDE:
+I[AMZN, 2022-07-20T20:56, 2022-07-20T20:57, 82.64, 34.95, 0.05, 99.81]
+I[TSLA, 2022-07-20T20:56, 2022-07-20T20:57, 54.89, 93.62, 0.11, 99.91]
+I[MSFT, 2022-07-20T20:56, 2022-07-20T20:57, 43.12, 76.65, 0.69, 99.79]
+I[AAPL, 2022-07-20T20:56, 2022-07-20T20:57, 65.29, 93.06, 0.0, 99.71]
+I[AAPL, 2022-07-20T20:57, 2022-07-20T20:58, 9.86, 10.97, 0.25, 99.94]
+I[MSFT, 2022-07-20T20:57, 2022-07-20T20:58, 80.06, 64.48, 0.01, 99.86]
+I[AMZN, 2022-07-20T20:57, 2022-07-20T20:58, 30.36, 37.71, 0.62, 99.97]
+I[TSLA, 2022-07-20T20:57, 2022-07-20T20:58, 84.05, 38.65, 0.02, 100.0]
+I[MSFT, 2022-07-20T20:58, 2022-07-20T20:59, 48.8, 39.57, 0.2, 99.89]
+I[TSLA, 2022-07-20T20:58, 2022-07-20T20:59, 25.3, 82.68, 0.15, 99.93]
+I[AAPL, 2022-07-20T20:58, 2022-07-20T20:59, 15.78, 86.46, 0.12, 99.98]
+I[AMZN, 2022-07-20T20:58, 2022-07-20T20:59, 15.77, 10.04, 0.22, 99.96]
+I[AMZN, 2022-07-20T20:59, 2022-07-20T21:00, 12.63, 46.2, 0.06, 99.75]
+I[MSFT, 2022-07-20T20:59, 2022-07-20T21:00, 36.7, 8.63, 0.1, 99.95]
+I[AAPL, 2022-07-20T20:59, 2022-07-20T21:00, 91.41, 44.91, 0.01, 99.55]
+I[TSLA, 2022-07-20T20:59, 2022-07-20T21:00, 94.23, 6.09, 0.36, 99.81]
Now, I want to perform an aggregated sliding window for the above results with a UDF. Since each record already has a window_start
and window_end
field from previous tumbling window, I'm partly wondering if I should slide on the window_start
, and use a similar HOP
construct for the successive sliding window. To start, I create a very simply PyFlink query, without any UDF:
return tbl_env.sql_query('''
SELECT
{4}
FROM {0}
GROUP BY
HOP(TABLE {0}, DESCRIPTOR({1}), INTERVAL {2}, INTERVAL {3}),
ticker
'''.format(
input_table_name,
window_start,
'1.minute',
'8.hours',
field_ticker
))
When I try to run the above SQL, I get the following output trace:
py4j.protocol.Py4JJavaError: An error occurred while calling o4.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. At line 0, column 0: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.calcite.runtime.CalciteContextException: At line 0, column 0: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4860)
at org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
at org.apache.calcite.sql.type.OperandTypes$6.checkSingleOperandType(OperandTypes.java:672)
at org.apache.calcite.sql.type.OperandTypes$6.checkOperandTypes(OperandTypes.java:680)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlInternalOperator.deriveType(SqlInternalOperator.java:83)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:274)
at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:419)
at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:207)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5397)
at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)
at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:273)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateExpr(SqlValidatorImpl.java:4111)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupByExpr(SqlValidatorImpl.java:3864)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupByItem(SqlValidatorImpl.java:3849)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3921)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:996)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:974)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:951)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:703)
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
... 15 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(<RECORDTYPE(VARCHAR(6) TICKER, TIMESTAMP(3) *ROWTIME* WINDOW_START, TIMESTAMP(3) *ROWTIME* WINDOW_END, DOUBLE FIRST_PRICE, DOUBLE LAST_PRICE, DOUBLE MIN_PRICE, DOUBLE MAX_PRICE)>)'. Supported form(s): '$SCALAR_QUERY(<RECORDTYPE(SINGLE FIELD)>)'
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
... 48 more
Process finished with exit code 1
Sylvia Lin
07/28/2022, 6:07 AMFlinkDeployment/dataeng-admin/basic-example dry-run failed, reason: InternalError, error: Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.operators.svc:443/validate?timeout=10s>": context deadline exceeded
I installed cert-manager(v.1.8.1) and flink operator(v1.1.0) both via helm release, manifests and job manifest are attached.
And I see these pods are all running fine:
sylvia$ kg pods -n cert-manager
NAME READY STATUS RESTARTS AGE
cert-manager-cert-manager-5d555fc65d-8drd9 1/1 Running 0 8h
cert-manager-cert-manager-cainjector-d549d56bc-lnpwd 1/1 Running 0 8h
cert-manager-cert-manager-webhook-669889588b-cs4v5 1/1 Running 0 8h
sylvia$ kg pods -n operators
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-57bd9f8884-v7td2 2/2 Running 0 8h
Mustafa Akur
07/28/2022, 6:28 AMFelix Angell
07/28/2022, 10:14 AMAdesh Dsilva
07/28/2022, 11:37 AMAviv Dozorets
07/28/2022, 12:30 PMKafkaSource
documentation and source code, can’t seem to find whether it’s same configuration as for general kafka client, e.g. client.rack
configuration. So what’s the preferred way ?Pedro Cunha
07/28/2022, 2:22 PM@BeanProperty
annotation in Scala to achieve this) and everything was working fine. Last month, I added a few more Strings to the schema, which I didn’t think would be an issue. But when I deployed it, I keep getting this error A serializer has already been registered for the state; re-registration is not allowed.
I’m not registering any of my state classes by hand, so all was done by Flink itself. I can’t seem to understand what’s causing this problem… Can anyone help? Stacktrace below
Caused by: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
at org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)
at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:683)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
Pedro Cunha
07/28/2022, 4:08 PMCaused by: org.apache.flink.util.StateMigrationException: Error while trying to migrate RocksDB state.
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:194)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.migrateStateValues(RocksDBKeyedStateBackend.java:787)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:702)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 27 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: ohn Oliver
Serialization trace:
channelId (com.my.own.app.pojo.User)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:409)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:409)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.migrateSerializedValue(AbstractRocksDBState.java:191)
... 37 more
My POJO is called User, it has two fields: One is channelId: EntityId
(EntityId is just another POJO with a string field) and the other is name: String
. When trying to restore state, it’s trying to find a class that doesn’t exist but is, in fact, the name.Pedro Cunha
07/28/2022, 4:09 PMsetClosureCleanerLevel()
the default is RECURSIVE
and I’m not sure how this is working with ScalaAdrian Chang
07/28/2022, 7:28 PMtsMs TIMESTAMP(3) NOT NULL
then when I execute
t_env.to_data_stream(table)
I got
Caused by: java.lang.UnsupportedOperationException: Could not find type serializer for current type [LocalDateTime].
However if I define my column as
tsMs BIGINT NOT NULL
everything works fine.
I need to define tsMs
as TIMESTAMP(3)
because I want to define the watermark and do some windows over that column.Jin Yi
07/28/2022, 7:45 PMmvn clean package
from within the subdir containing the pom.xml file, but i'm hitting this exception:
Caused by: java.lang.IllegalAccessError: class com.google.googlejavaformat.java.RemoveUnusedImports (in unnamed module @0x3fecb076) cannot access class com.sun.tools.javac.util.Context (in module jdk.compiler) because module jdk.compiler does not export com.sun.tools.javac.util to unnamed module @0x3fecb076
at com.google.googlejavaformat.java.RemoveUnusedImports.removeUnusedImports(RemoveUnusedImports.java:187)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
... 34 more
Jeff Levesque
07/29/2022, 2:12 AMprint
connector sink into what I had hoped to be a sliding window using HOP
. However, I tried something different now. So, I tried to pass in my sink table (where data being INSERT
from tumbling window via SELECT
). However, when I tried to SELECT
from my tumbling window `print`/sink table (i.e. output_table_tumbling_window
):
SELECT ticker, window_start
FROM
output_table_tumbling_window
GROUP BY
ticker,
window_start
I get the following error:
Table options are:
'connector'='print'
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:480)
at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
... 29 more
Process finished with exit code 1
I think if I sink my tumbling window result into kinesis stream, I can read it into a table then perform subsequent sliding window on it. However, this seems like an unnecessary step. Someone here a few days ago suggested to perform sliding window directly on the print
table, which is what I'm trying to do now.Kyle Meow
07/29/2022, 9:54 AMorg.apache.flink.formats.json.JsonToRowDataConverters#convertToTime
. However in the latter half of this document, it also says the precision of TIME could be 0~9, which is confusing.
And when I dig deeper, it seems that org.apache.flink.table.data.conversion.TimeLocalTimeConverter
could actually support a precision up to 3. Additionally, if we change the code at org.apache.flink.formats.json.JsonToRowDataConverters#convertToTime
from localTime.toSecondOfDay() * 1000
to localTime.toNanoOfDay() / 1000000
, then JSON format could support a precision up to 3 instantly.
May I conclude that the precision of TIME type actually depends on the format chosen by user, and the maximum value is 3?Mustafa Akur
07/29/2022, 11:01 AMSELECT SUM(quantity) FROM Orders
GROUP BY product;
I want to set paralellism 2. But For query below
SELECT MAX(value) FROM Transactions
GROUP BY currency;
I want to set paralellism 1. How can I accomplish that. When I change flink-conf.yaml It manages paralellism its own. I do not have control on the different parts of the pipeline. Kind regardsJeesmon Jacob
07/29/2022, 8:15 PMFlinkDeployment
? For debugging purpose there may be a case when we need to set logging level to DEBUG for just one FlinkDeployment
without affecting others. If we set the override in spec.flinkConfiguration
, will it override per deployment?Parthiban PR
07/31/2022, 5:56 AMSamin Ahbab
07/31/2022, 5:54 PMHunter
08/01/2022, 1:45 AMEmily Li
08/01/2022, 4:05 AMDatadogHttpReporterFactory
, and I noticed for my custom metrics, it’s getting each metrics group that I added as part of the metric name in datadog, and send them separately like the image below… for example flink.operator.testmetricsdatadog.topic.ekata_lookups.subtaskIndex.0.eventCount
it takes my custom group topic
and subtaskindex
as part of the metric name …
The code :
private def buildEventCounter(topic: Topic): Counter =
getRuntimeContext.getMetricGroup
.addGroup("testmetricsdatadog")
.addGroup("topic", topic.value)
.addGroup("subtaskIndex", getRuntimeContext.getIndexOfThisSubtask.toString)
.counter("eventCount")
Ideally, I would want all of them to be counted as one metric, so the metric name I want is flink.operator.testmetricsdatadog.eventCount
(a more generic metric name), and the topic
and subtaskindex
to be added as tags for the metric only. So I can view the metrics in a dashboard, filtering by topic name etc.
But I looked at the flink DatadogHttpReporter
implementation, it looks like that’s what it’ll do, take all groups as part of the metric name. Because the metrics are separated in datadog, I couldn’t find a way to combine the metrics into one in easily in datadog.
Just wondering if I did anything wrong, or is there a better way to do this?Anoop Khandelwal
08/01/2022, 5:31 AMlaxmi narayan
08/01/2022, 8:19 AMshao
08/01/2022, 8:45 AMDarin Lee
08/01/2022, 10:17 AM2022-07-29 21:15:50,800 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.1
2022-07-29 21:15:50,800 INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fa14705e51bd2ce5
2022-07-29 21:15:51,350 INFO org.apache.kafka.clients.Metadata - Cluster ID: KUr7pbGDRkGISnTpRo3wPg
2022-07-29 21:15:52,182 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: expose (4/4) (46dff208ab4431650caafb547038f668).
2022-07-29 21:15:52,183 INFO org.apache.flink.runtime.taskmanager.Task - Source: expose (4/4) (46dff208ab4431650caafb547038f668) switched from RUNNING to CANCELING.
2022-07-29 21:15:52,183 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: expose (4/4) (46dff208ab4431650caafb547038f668).
2022-07-29 21:15:52,190 INFO org.apache.flink.runtime.taskmanager.Task - Source: expose (4/4) (46dff208ab4431650caafb547038f668) switched from CANCELING to CANCELED.
2022-07-29 21:15:52,190 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: expose (4/4) (46dff208ab4431650caafb547038f668).
2022-07-29 21:15:52,191 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: expose (4/4) (46dff208ab4431650caafb547038f668) [CANCELED]
2022-07-29 21:15:52,191 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state CANCELED to JobManager for task Source: expose 46dff208ab4431650caafb547038f668.
2022-07-29 21:15:52,437 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-2, groupId=RecSysRtMsgConsumerUat] Discovered group coordinator 99.47.134.56:9092 (id: 2147483646 rack: null)
2022-07-29 21:15:52,447 ERROR org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-2, groupId=RecSysRtMsgConsumerUat] Failed to close coordinator
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetCommitRequest$PartitionData
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:795)