Slackbot
11/30/2022, 5:07 AMRashmin Patel
11/30/2022, 11:07 AMorg.apache.flink.streaming.api.operators.TimestampedCollector
in KeyedCoProcessFunction. How can I achieve this ?
Use-case is that I am joining two streams L and R and I always want joined record to be emitted with timestamp of left record. Is there an alternate way to achieve this ?Ian Raievskyi
11/30/2022, 2:47 PM2022-11-29 15:33:57,428 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Unhandled exception.
java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /tmp/flink-web-78e41ccf-c3c0-4b5e-b812-818ebab6fde9/flink-web-upload/050ac65e-2aa1-43b5-8ccc-18b7756d7a08_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar does not exist
Than it makes some retries with no result, and finally it downloads JAR again and starts fine, but without restoring from savepoint (starts to reprocess all events).
We have two job-managers deployed and from those we found that:
• Manager A has this list of files in folder `/flink-web-78e41ccf-c3c0-4b5e-b812-818ebab6fde9`:
-rw------- 1 flink flink 123914717 Nov 29 15:33 2b0e0bc2-c602-4e0c-b6e9-8b0c477d885f_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
-rw------- 1 flink flink 123914717 Nov 29 15:34 5966eb7a-47f4-4ea2-bcd5-87c32f9f9da4_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
-rw------- 1 flink flink 123914717 Nov 29 15:43 adcfa6f7-c47a-452e-a163-1358568f83b6_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
• Manager B has list of files in folder `/flink-web-613318f8-ee25-4b67-890d-6f95db4dee18`:
-rw------- 1 flink flink 123914717 Nov 29 15:33 050ac65e-2aa1-43b5-8ccc-18b7756d7a08_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
It seems like it wants to load JAR file with path of Job Manager B from Job Manager A and fails. After that it downloads JAR again (5966eb7a-47f4-4ea2-bcd5-87c32f9f9da4_kpis-engine_2.12-61a60e3f-SNAPSHOT-assembly.jar
) and starts normally, but without loading from savepoint
After that we just redeployed the Job again and it went smooth loading from savepoint on the first run
Thank you in advance for helpKrish Narukulla
11/30/2022, 3:11 PMException in thread "main" org.apache.flink.table.api.TableException: Unsupported query: UPDATE `catalog.test1` set `some_int` = 6
at org.apache.flink.table.planner.delegation.ParserImpl.lambda$parse$0(ParserImpl.java:107)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:107)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
Bhupendra Yadav
11/30/2022, 3:30 PMSELECT t1.c1, JSON_ARRAYAGG(t2.c2) FROM t1 LEFT OUTER JOIN t2 GROUP BY t1.c1;
Tables t1 & t2 are fetched using filesystem
connector from an S3 file in CSV format. Column t1.c1 is a primary key and has one to many relationship with table t2.
Issue: One of the tables(t2) is of large size(more than 4GB) and can't fit into Taskmanager memory. When we try to JOIN, we get a Heap Out of Memory error. But if we give large enough memory(say 8GB) to Taskmanager then this join works but increasing task manager memory as our table grows is not feasible for us.
OOM Stack trace for reference:
java.lang.OutOfMemoryError: Java heap space
Dumping heap to /opt/tmp/heapdump.bin ...
2022-11-30 11:34:30,200 [HashJoin[5] -> Sort[6] -> Calc[7] -> SortAggregate[8] -> ConstraintEnforcer[9] -> Sink: Filesystem (1/1)#0] WARN [o.a.f.r.t.Task] {} - HashJoin[5] -> Sort[6] -> Calc[7] -> SortAggregate[8] -> ConstraintEnforcer[9] -> Sink: Filesystem (1/1)#0 (7a97e08f5d024c5c720da026005697e2_4bf7c1955ffe56e2106d666433eaf137_0_0) switched from RUNNING to FAILED with failure cause: java.lang.OutOfMemoryError: Java heap space
at java.base/java.lang.AbstractStringBuilder.<init>(Unknown Source)
at java.base/java.lang.StringBuilder.<init>(Unknown Source)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:455)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:85)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3748)
at org.apache.flink.table.runtime.functions.SqlJsonUtils.serializeJson(SqlJsonUtils.java:104)
at org.apache.flink.table.runtime.functions.aggregate.JsonArrayAggFunction.getValue(JsonArrayAggFunction.java:137)
at SortAggregateWithKeys$58.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
So I was wondering if there's a way to Join the table without loading everything in memory upfront? Is there a way I can optimize my query? Will appreciate any suggestions or docs/reference articles. Thank you!Momir Beljic
11/30/2022, 4:10 PMds = env.from_collection(
collection=[
(Instant.of_epoch_milli(1000), 'Alice', 110.1),
(Instant.of_epoch_milli(4000), 'Bob', 30.2),
(Instant.of_epoch_milli(3000), 'Alice', 20.0),
(Instant.of_epoch_milli(2000), 'Bob', 53.1),
(Instant.of_epoch_milli(5000), 'Alice', 13.1),
(Instant.of_epoch_milli(3000), 'Bob', 3.1),
(Instant.of_epoch_milli(7000), 'Bob', 16.1),
(Instant.of_epoch_milli(10000), 'Alice', 20.1)
],
type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.FLOAT()]))
table = t_env.from_data_stream(
ds,
Schema.new_builder()
.column_by_expression("ts", "CAST(f0 AS TIMESTAMP(3))")
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts - INTERVAL '3' SECOND")
.build()
).alias("ts", "name", "price")
# define the sink
t_env.create_temporary_table(
'sink',
TableDescriptor.for_connector('print')
.schema(Schema.new_builder()
.column('name', DataTypes.STRING())
.column('price', DataTypes.FLOAT())
.build())
.build())
table = table.window(
Slide.over("3.rows").every("3.rows").on(col("name")).alias("w")) \
.group_by(col("w")) \
.select(col('name'), col('price').avg)
# submit for execution
table.execute_insert('sink') \
.wait()
But this returns this exception:
table = table.window(
py4j.protocol.Py4JJavaError: An error occurred while calling o121.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:327)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:307)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:300)
at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:265)
at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:257)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:823)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Would you please help me to find the solution? Thank you!Prasanth Kothuri
11/30/2022, 4:11 PMval scanSource = KafkaSource.builder[scanInput]
.setProperties(properties)
.setTopics(config.getInputScanTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))
.build()
val scanStream = env
.fromSource(
scanSource,
WatermarkStrategy
.forBoundedOutOfOrderness[scanInput](Duration.ofSeconds(60))
.withIdleness(Duration.ofSeconds(300))
.withTimestampAssigner(new SerializableTimestampAssigner[scanInput] {
override def extractTimestamp(element: scanInput, recordTimestamp: Long): Long = fixDate.makeInstant(element.timestamp).toEpochMilli
}), "ReadScanEvents")
.filter(_!= null )
.filter(x => x != null && x.ooiInstanceIdentifier != null && x.fullQualifiedName != null && x.timestamp != null && fixDate.isInstant(x.timestamp))
errors
found : org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema
required: org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema[com.leidos.mosiac.tmdataaggregations.scanInput]
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)))
Leon Xu
11/30/2022, 4:40 PMjava.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')
I am wondering if anyone has seen this before? We are on Flink 1.12Deryl Rodrigues
11/30/2022, 6:16 PMINSERT INTO some-table
SELECT
*
FROM (
SELECT *
FROM (
SELECT fields-from-khakfa-table
window_start,
window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY received_timestamp DESC) AS rownum
FROM TABLE(
TUMBLE(TABLE khafkfa-table, DESCRIPTOR(some-column), INTERVAL '10' MINUTES)
)
) WHERE rownum <= 1
) AS some-table
LEFT JOIN some-hive-table fc ON some-table.column1 = some-hive-table.column1
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Table sink 'some-table' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(column1 = column10)], select=[all-columns, column1, column10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
Eric Xiao
11/30/2022, 10:37 PMVishal bharatbhai Vanpariya
12/01/2022, 7:36 AMDataStream<GenericRecord> avrodata = inputData.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String s) throws Exception {
Schema schema = new Schema.Parser().parse(schemastr);
DecoderFactory decoderFactory = new DecoderFactory();
Decoder decoder = decoderFactory.jsonDecoder(schema,s);
DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
return reader.read(null,decoder);
}
});
Schema schema = new Schema.Parser().parse(schemastr);
avrodata.sinkTo(FileSink.forBulkFormat(
new Path("<s3://bUcket/data/tmp/>"),
ParquetAvroWriters.forGenericRecord(schema)
).build());
Error: Caused by: java.lang.UnsupportedOperationException: This s3 file system implementation does not support recoverable writers.
Raph
12/01/2022, 8:16 AMpulsar_source = PulsarSource.builder() \
.set_service_url(service_url) \
.set_admin_url(admin_url) \
.set_start_cursor(StartCursor.latest()) \
.set_topics("<persistent://topic_name>") \
.set_config("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationToken") \
.set_config("pulsar.client.authParams", AUTH_TOKEN) \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
.set_subscription_name('flink') \
.set_subscription_type(SubscriptionType.Shared) \
.build()
ds = env.from_source(source=pulsar_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="pulsar source")
ds.print()
env.execute()
Caused by: org.apache.pulsar.client.admin.PulsarAdminException: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted.
Amenreet Singh Sodhi
12/01/2022, 11:10 AMJason Politis
12/01/2022, 12:36 PMEmmanuel Leroy
12/01/2022, 1:07 PMRené
12/01/2022, 4:25 PMCREATE TABLE CVL (EPOCH_ID BIGINT, DESCRIPTION VARCHAR)
WITH (
'connector' = 'kafka',
'topic' = 'FCM1.SYST034.CVL',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '<http://swwkafe01.suvanet.ch:9092|swwkafe01.suvanet.ch:9092>',
'format' = 'avro',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"password\";'
);
That's ok, but when I do a simple select * from CVL, then I get this error:
... Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config at
<http://org.apache.flink.kafka.shaded.org|org.apache.flink.kafka.shaded.org>.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:116) ...
How do I need to configure Apache Flink (Ververica Platform) to be able to connect to Kafka? I followed the instructions under https://ververica.zendesk.com/hc/en-us/articles/4416135903634-How-to-Secure-User-Creden[…]ica-Platform-when-Connecting-to-SASL-Secured-Kafka-Cluster, but that didn't solve the problem. Thanks for any help!Momir Beljic
12/01/2022, 4:57 PMprocess_udaf
function to perform some operations and filter the data and based on that return specific Rows. However, no matter what I change I get exception org.apache.flink.table.api.ValidationException: Cannot resolve field [MEASUREMENT], input field list:[TERM_ID, F_LOCATION, ADAPTER].
. Please find the code below. Thank you!
ds = env.from_collection(
collection=[
(Instant.of_epoch_milli(1000), '2022-12-01T17:10:18.191732', '123457', '123456-09', '22.2-2', '12345678', '123456', 'M1', 7, 20, -20, 0, '2', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(2000), '2022-12-01T17:10:20.231437', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 10, 25, -15, 2, '1', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(3000), '2022-12-01T17:10:20.315141', '123459', '123456-09', '22.2-1', '12345679', '123456', 'M1', 20, 29, -3, 3, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(4000), '2022-12-01T17:10:20.389638', '123455', '123456-08', '22.2-1', '12345679', '123456', 'M1', 25, 35, 1, 4, '10', 10, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(5000), '2022-12-01T17:10:20.585687', '123458', '123456-07', '22.2-1', '12345679', '123456', 'M1', 30, 40, -2, 5, '2', 120, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(6000), '2022-12-01T17:10:20.649107', '123457', '123456-06', '22.2-2', '12345678', '123456', 'M1', 4, 45, 4, 6, '10', 0, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98'),
(Instant.of_epoch_milli(7000), '2022-12-01T17:10:21.040214', '123455', '123456-09', '22.2-1', '12345678', '123456', 'M1', 22, 49, 5, 7, '2', 100, '17', '-17', '0', '15', '-15', '0', '1.67', '3', '-3', '0', '1001', '98')
],
type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.FLOAT(), Types.FLOAT(),Types.FLOAT(), Types.FLOAT(),Types.STRING(), Types.STRING(), <http://Types.INT|Types.INT>(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()]))
table = t_env.from_data_stream(
ds,
Schema.new_builder()
.column_by_expression("proctime", "proctime()")
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.STRING())
.column("f3", DataTypes.STRING())
.column("f4", DataTypes.STRING())
.column("f5", DataTypes.STRING())
.column("f6", DataTypes.STRING())
.column("f7", DataTypes.STRING())
.column("f8", DataTypes.FLOAT())
.column("f9", DataTypes.FLOAT())
.column("f10", DataTypes.FLOAT())
.column("f11", DataTypes.FLOAT())
.column("f12", DataTypes.STRING())
.column("f13", <http://DataTypes.INT|DataTypes.INT>())
.column("f14", DataTypes.STRING())
.column("f15", DataTypes.STRING())
.column("f16", DataTypes.STRING())
.column("f17", DataTypes.STRING())
.column("f18", DataTypes.STRING())
.column("f19", DataTypes.STRING())
.column("f20", DataTypes.STRING())
.column("f21", DataTypes.STRING())
.column("f22", DataTypes.STRING())
.column("f23", DataTypes.STRING())
.column("f24", DataTypes.STRING())
.column("f25", DataTypes.STRING())
#.watermark("proctime", "proctime - INTERVAL '3' SECOND")
.build()
).alias("proctime", 'MESSAGE_DATE_TIME', 'ASSET_ID', 'TERM_ID', 'ADAPTER', 'PRODUCT_ID', 'SERIAL', 'F_LOCATION', 'MEASUREMENT', 'USL', 'LSL', 'NOM', 'TESTSTATUS', 'METHOD_COMPUTE', 'CONFIGVALUE_1', 'CONFIGVALUE_2', 'CONFIGVALUE_3', 'CONFIGVALUE_4', 'CONFIGVALUE_5', 'CONFIGVALUE_6', 'CONFIGVALUE_7', 'CONFIGVALUE_8', 'CONFIGVALUE_9', 'CONFIGVALUE_10', 'METHOD_YIELD', 'CONFIGVALUE_1001')
t_env.create_temporary_table(
'sink',
TableDescriptor.for_connector('print')
.schema(Schema.new_builder()
.column("TERM_ID", DataTypes.STRING())
.column("F_LOCATION", DataTypes.STRING())
.column("ADAPTER", DataTypes.STRING())
.column("MEASUREMENT", DataTypes.FLOAT())
.column("USL", DataTypes.FLOAT())
.column("LSL", DataTypes.FLOAT())
.column("NOM", DataTypes.FLOAT())
.build())
.build())
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("TERM_ID", DataTypes.STRING()), DataTypes.FIELD("F_LOCATION", DataTypes.STRING()), DataTypes.FIELD("ADAPTER", DataTypes.STRING()), DataTypes.FIELD("MEASUREMENT", DataTypes.FLOAT()), DataTypes.FIELD("USL", DataTypes.FLOAT()), DataTypes.FIELD("LSL", DataTypes.FLOAT()), DataTypes.FIELD("NOM", DataTypes.FLOAT())]), func_type="pandas")
def process_udaf(term_id, f_location, adapter, measurement, usl, lsl, nom, teststatus, method_compute):
if teststatus == 1 and (method_compute > 0 or method_compute < 101):
zscore_log = (measurement - measurement.mean()) / (measurement.std())
dmn_log = measurement.mean() - nom
cp_log = (usl - lsl) / (6 * measurement.std())
return Row(term_id, f_location, adapter, zscore_log, dmn_log, cp_log)
t_env.create_temporary_function("process_udaf", process_udaf)
table = table.window(
#here just use 100 as row offset and slide of 100 and perform calculation of counting the rows and then call correct processing udf call based on the row number
Slide.over("4.rows").every("4.rows").on(col("proctime")).alias("w")) \
.group_by(col("w"), col("TERM_ID"), col("F_LOCATION"), col("ADAPTER")) \
.select(process_udaf(col("TERM_ID"), col("F_LOCATION"), col("ADAPTER"), col("MEASUREMENT"), col("USL"), col("LSL"), col("NOM"), col("TESTSTATUS"), col("METHOD_COMPUTE")))
table.execute_insert('sink') \
.wait()
raghav tandon
12/01/2022, 8:28 PMAns Fida
12/01/2022, 10:26 PMTable table = tEnv.sqlQuery(query.getQuery());
// create a new column 'EventTime' of type Timestamp from 'EventTimetamp' which is a string
table = table.addColumns($("EventTimestamp").toTimestamp().as("EventTime"));
WindowGroupedTable windowedTable = table.window(Tumble.over("10.minutes").on($("EventTime").proctime())
.as("w"))
.groupBy($("w"), $("GroupingColumn"));
table = windowedTable.select($("*"));
but it doesn’t seem to work and results in this exception Expected LocalReferenceExpression. Got: EventTime
Tim
12/01/2022, 10:37 PM[java] Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for unregistered table do not match.
[java] Cause: Incompatible types for sink column 'f0' at position 0.
[java]
[java] Query schema: [EXPR$0: ARRAY<ROW<> NOT NULL>]
[java] Sink schema: [f0: RAW('java.util.List', ?)]
The typeinformation for the output type is Row(f0: List<Row(f0: String)>)
and the closest we can get is to define output = @DataTypeHint(value = "RAW", bridgedTo = List.class)
and the exception becomes
[java] Cause: Incompatible types for sink column 'f0' at position 0.
[java] Query schema: [EXPR$0: RAW('java.util.List', '...')]
[java] Sink schema: [f0: RAW('java.util.List', ?)]
Any suggestion to resolve this? Thanks!Steven Zhang
12/02/2022, 12:17 AMorg.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
HA class? I'm trying to upgrade a session cluster using Flink operator and it seems like the configmaps for the job I had running on it gets cleaned up when I update the FlinkDep CRD image field and the Job/Task manager get torn down. From the docs, it seems like this shouldn't be the case and the configmaps should be left behind https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/, but that's not what I'm seeing.
It doesn't seem like operator since it looks like the deleteHaData param is set to false in the reconciler.
private void deleteSessionCluster(FlinkDeployment deployment, Configuration effectiveConfig) {
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), false);
...
I'm running a session Flink cluster on 1.15.2 deployed in standalone modeJay Yang
12/02/2022, 4:27 AMSumit Nekar
12/02/2022, 11:35 AMPrasanth Kothuri
12/02/2022, 1:31 PMval mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.registerModule(new JavaTimeModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true)
val scanSource = KafkaSource.builder[scanInput]
.setProperties(properties)
.setTopicPattern(Pattern.compile(config.getInputScanTopic))
.setValueOnlyDeserializer( new JsonDeserializationSchema(classOf[scanInput],mapper))
.build()
which has the error
Type JsonDeserializationSchema takes type parameters
I think the problem is the type of the mapper I am passing to the constructor is not what is expected (SerializableSupplier<ObjectMapper>) by that method , how to fix this, thanksraghav tandon
12/02/2022, 1:32 PMMatt Weiss
12/02/2022, 1:56 PMFelix Angell
12/02/2022, 4:21 PMUnable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @4f9e5d6a
? I've started getting this when submitting jobs after bumping my version of Flink from 1.13. to 1.15. Full stacktrace inside this threadLily Liu
12/02/2022, 5:52 PM.inprogress
folder on gcs. Where should I start to look to optimise this? Thanks.Ans Fida
12/02/2022, 7:42 PMMarco Villalobos
12/02/2022, 10:27 PM