B S Mohammed Ashfaq
08/20/2025, 1:30 AMBrice Loustau
08/20/2025, 9:22 PMIan Stewart
08/22/2025, 6:43 PMElad
08/24/2025, 8:15 PMClemens Valiente
08/25/2025, 2:27 AM2025-08-22 06:51:30,525 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.google.protobuf.ByteString does not contain a getter for field hash
2025-08-22 06:51:30,525 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class com.google.protobuf.ByteString does not contain a setter for field hash
2025-08-22 06:51:30,526 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class com.google.protobuf.ByteString cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
the hash field definitely shouldn't be set in the first place, and I couldn't figure out how to write/define a custom TypeExtractor? 🤔Clemens Valiente
08/25/2025, 2:37 AMClemens Valiente
08/25/2025, 2:37 AMClemens Valiente
08/25/2025, 4:40 AMkryoRegistrations
"streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource" -> {KryoRegistration@26135}
key = "streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
value = {KryoRegistration@26135}
registeredClass = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
serializerClass = {Class@21986} "class com.grab.grabdefence.acorn.proto.ScalaPbEnumSerializer"
serializableSerializerInstance = null
serializerDefinitionType = {KryoRegistration$SerializerDefinitionType@26334} "CLASS"
but for some reason a different serializer is used
type = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
kryo = {KryoBase@21592} com.twitter.chill.KryoBase@7a852
objSer = {ObjectSerializer@26028} com.twitter.chill.ObjectSerializer@11d3192e
with a debug checkpoint on copy
here:
try {
checkKryoInitialized();
try {
return kryo.copy(from);
} catch (KryoException ke) {
Jashwanth S J
08/25/2025, 8:59 AMFabricio Lemos
08/25/2025, 8:16 PMDheeraj Panangat
08/26/2025, 11:41 AMBrad Murry
08/26/2025, 3:33 PML P V
08/27/2025, 8:57 AMDennis Sosnoski
08/27/2025, 9:05 AMYoshi Nagasaki
08/27/2025, 3:41 PMfinal var watermarkStrategy = WatermarkStrategy.<CDCEvent>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(1))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
final var lateTag = new OutputTag<Metric>("late") {
};
final var workflow = env.fromSource(dynamoDbStreamsSource, watermarkStrategy, "DynamoDB Streams source")
.returns(TypeInformation.of(CDCEvent.class))
.uid("custom-uid")
.filter(...)
.map(...compute deltas based on changes...)
.filter(...)
.flatMap(...map deltas to 1 or more "Metrics"...)
.keyBy(...key by the metric key...)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(10)))
.sideOutputLateData(lateTag)
.aggregate(new MetricAggregator()); // this is an AggregateFunction<> that merges the delta values across metric instances in the window
final var lateMetrics = workflow.getSideOutput(lateTag)
.sinkTo(new PrintSink<>(true));
workflow.sinkTo(...dynamo sink...);
As you can see, nothing fancy, pretty straightforward...
My problem is: when I set the InitialPosition
of the stream to TRIM_HORIZON
, I see in the debug output that it's receiving, processing, and sinking all the historical events in the last 24 hours... but once it reaches the end, it is completely stuck. I make requests to the DDB table that result in new CDC events, I see in the output that it receives the event immediately, does all the stateless processing (all the filters and maps before the windowing), and then it just disappears and nothing happens. I have logging in the MetricAggregator
and my custom sink, and neither logs get triggered for these new events. When I set the InitialPosition
to LATEST
and feed new events, it flows through fine! (I still have to submit a new change before the previous change gets fully processed, despite the idleness setting in the watermark strategy, but at least I can get it to process.)
My custom sink is modeled closely after the existing DDB connector, and has these properties:
maxBatchSize. = 10
maxInFlightRequests = 20
maxBufferedRequests = 1000
maxTimeInBufferMS. = 5000
Just adding this in case it's relevant but I don't think it should matter.
Based on what I've read on watermarking, I assume this has to do with the strategy and specifically the idleness, which I set arbitrarily low, but it doesn't seem to matter/do anything (with either TRIM_HORIZON
or LATEST
). I've tried both built-in watermarking strategies and have the same problem.
Any ideas? Thanks.raphaelauv
08/27/2025, 4:34 PMSafeConfluentRegistryAvroSerializationSchema
is my custom serialization class
KafkaSink.<Car>>builder()
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(
new SafeConfluentRegistryAvroSerializationSchema(
Car.class,
"flink-output-value",
"<http://confluent-schema-registry-local:8081>",
sr_settings
))
thanks allGeorge Leonard
08/28/2025, 6:28 AMRushikesh Gulve
08/28/2025, 7:08 AMUrs Schoenenberger
08/28/2025, 8:36 AMGeorge Leonard
08/28/2025, 10:07 AMGeorge Leonard
08/28/2025, 10:07 AMGeorge Leonard
08/28/2025, 11:03 AMINSERT INTO c_paimon.outbound.children
SELECT
JSON_VALUE(data, '$._id') as _id,
JSON_VALUE(data, '$.name') as name,
JSON_VALUE(data, '$.surname') as surname,
JSON_VALUE(data, '$.gender') as gender,
JSON_VALUE(data, '$.dob') as dob,
JSON_VALUE(data, '$.nationalid') as nationalid,
JSON_VALUE(data, '$.family_id') as family_id,
JSON_VALUE(data, '$.father_nationalid') as father_nationalid,
JSON_VALUE(data, '$.mother_nationalid') as mother_nationalid,
ROW(
JSON_VALUE(data, '$.address.street_1'),
JSON_VALUE(data, '$.address.street_2'),
JSON_VALUE(data, '$.address.neighbourhood'),
JSON_VALUE(data, '$.address.town'),
JSON_VALUE(data, '$.address.county'),
JSON_VALUE(data, '$.address.province'),
JSON_VALUE(data, '$.address.country'),
JSON_VALUE(data, '$.address.country_code'),
JSON_VALUE(data, '$.address.postal_code'),
JSON_VALUE(data, '$.address.parcel_id')
) as address,
created_at
FROM postgres_catalog.inbound.children;
George Leonard
08/28/2025, 1:26 PMGeorge Leonard
08/29/2025, 5:42 AMJashwanth S J
08/29/2025, 8:57 AMGeorge Leonard
08/29/2025, 12:06 PMVikas Patil
08/30/2025, 10:26 PMOr Keren
09/01/2025, 6:48 AMZeyu Qiu
09/01/2025, 8:02 AMparallelism = 1
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(parallelism) # I don't know what's that, try 2
env.enable_checkpointing(10 * 60 * 1000) # milliseconds
# checkpoints have to complete within one minute, or are discarded
env.get_checkpoint_config().set_checkpoint_timeout(int(10 * 60 * 1000))
env.get_checkpoint_config().set_checkpointing_mode(
CheckpointingMode.EXACTLY_ONCE
)
env.disable_operator_chaining() # If drop this line, create multiple pipeline in one python job will raise error in Flink GUI
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
ss = t_env.create_statement_set()
source_sql = f"""
CREATE TABLE mysql_source (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
id BIGINT,
content STRING,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '{db_secret["host"]}',
'port' = '{db_secret["port"]}',
'username' = '{db_secret["username"]}',
'password' = '{db_secret["password"]}',
'database-name' = 'my_db',
'table-name' = 'my_table',
'server-id' = '{server_id}',
'debezium.snapshot.mode' = 'schema_only',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '{binlog_start_time}',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.key-column' = 'id'
);
"""
t_env.execute_sql(source_sql)
sink_sql = f"""
CREATE TABLE hudi_sink (
id BIGINT,
content STRING,
update_time TIMESTAMP(3),
hudi_ts double,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = '<s3a://xxxx>',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'hudi_ts',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'clean.retain_commits' = '6',
'hoodie.keep.min.commits' = '7',
'hoodie.keep.max.commits' = '8',
'compaction.async.enabled' = 'true',
'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
'index.partition.regex' = 'false',
'index.bootstrap.enabled' = 'false',
'write.rate.limit' = '2000'
);
"""
t_env.execute_sql(sink_sql)
insert_sql = f"""
INSERT INTO hudi_sink
SELECT id,content,update_time,
UNIX_TIMESTAMP() as hudi_ts FROM mysql_source where db_name = 'my_db'
and table_name = 'my_table';
"""
ss.add_insert_sql(insert_sql)
ss.attach_as_datastream()
env.execute(f'mysql_cdc_to_hudi')
After I submit this job to Flink, I expect the Hudi table in S3 could have real-time data same as MySQL table. But in fact, when I execute an update statement in MySQL, there will be 2 duplicate rows appear in Hudi table. For example here’s data in MySQL:
id |content |update_time
1 |a |2025-08-01 00:00:00
And I execute:
update my_table set content = 'b' where id = 1
Then the data in Hudi table will looks like that:
id |content |update_time
1 |a |2025-08-01 00:00:00
1 |b |2025-09-01 00:00:00
Do you guys have any idea about why that’s happening? As I’ve set 'hoodie.datasource.write.recordkey.field' = 'id'
, I suppose there shouldn’t be duplicate row in the Hudi table. And this error cannot be reproduced stably. Sometimes the result is normal, and sometimes there is an error. This is especially likely to occur when there are multiple Hudi sinks in a job.
I’m using Flink 1.16.2
, Hudi 0.13.0
, FlinkCDC 3.1.1
In addition, I found some log in Flink like:
2025-09-01 01:10:20,612 INFO org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.
Not sure if this related to the problem I metGeorge Leonard
09/01/2025, 12:24 PM