Ravi Teja
01/13/2023, 9:39 AM`doesn't support consuming update changes which is produced by node GroupAggregate`.
It will be great if the community can guide us on best practices for resolving the above issue.
Thanks.Abhinav sharma
01/13/2023, 12:50 PMNathanael England
01/13/2023, 3:09 PMNathanael England
01/13/2023, 7:00 PMkeyby
, do the watermarks of those partitioned streams evolve separately from one another?Vinay Agarwal
01/13/2023, 7:02 PMMichael Parrott
01/13/2023, 9:25 PMKeyedCoProcessFunction
and based on these docs, it recommends setting the logger to static
. if the parallelism of the operator for the coproessfunction is > 1, the operator might exist on multiple nodes, so how would a static variable work in this case?Bhaarat
01/14/2023, 8:19 PMSumit Nekar
01/15/2023, 6:00 AMLei Su
01/16/2023, 2:32 AMLei Su
01/16/2023, 3:38 AMAmenreet Singh Sodhi
01/16/2023, 5:15 AMsun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
Has anyone faced the same issue, How to resolve it, do I need to add some new certificates to cacerts? ThanksRishab Sachdeva
01/16/2023, 5:59 AMAbdelhakim Bendjabeur
01/16/2023, 10:53 AMSELECT
...
FROM A
JOIN B ON B.aid = A.id
AND A.created_datetime <= B.updated_datetime
I can maybe use temporal joins on a 1-year period but it has a risk of data loss in case some entities have updates that span over more than 1 year.
Anyone with experience on this or just ideas on how to properly design such a pipeline?Nathanael England
01/16/2023, 4:46 PMMatt Weiss
01/16/2023, 7:03 PMNathanael England
01/17/2023, 12:07 AM.process
, that will be run in parallel, right? I don't need to maintain any state in this and the only discussion I'm seeing on parallelism restrictions are when you want to do windowing.Sergii Mikhtoniuk
01/17/2023, 1:23 AMTable
or a DataStream<Row>
into DataStream<RowData>
?
I'm trying this:
DataStream<Row> resultStream = tEnv.toChangelogStream(resultTable);
RowRowConverter converter = RowRowConverter.create(resultTable.getResolvedSchema().toSinkRowDataType());
DataStream<RowData> resultStreamRowData = resultStream.map(
converter::toInternal,
InternalTypeInfo.of(resultTable.getResolvedSchema().toSinkRowDataType().getLogicalType())
);
but I think I'm assigning TypeInformation
incorrectly as it crashes with type casting errors.
Context: I want to periodically save my table's changelog stream into parquet
format. The only way I found so far is to go from Row
to RowData
only then to Avro::GenericRecord
and then to use AvroParquetWriters
yuck :(kingsathurthi
01/17/2023, 5:54 AMP S
01/17/2023, 8:04 AMMehul Batra
01/17/2023, 1:46 PMAbdelhakim Bendjabeur
01/17/2023, 1:53 PM) WITH (
'connector' = 'kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = 'host.docker.internal:29092',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://host.docker.internal:8765/apis/ccompat/v6>', -- apicurio
'topic' = 'my-topic',
'properties.group.id' = 'mmy-consumer-group'
);
Here is the error
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.avro.registry.confluent.shaded.io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: No content with id/hash 'contentId-0' was found.; error code: 40403
The endpoint runs fine
curl <http://localhost:8765/apis/ccompat/v6/schemas/ids/1>
{"schema":"{\"type\":\"record\",\
...
}
But it seems that Flink isn't able to read the bytes containing the schema ID. It keeps reading 0
instead of 1, 2, ...
Has anyone ever faced this issue, and is there a config to add to tell Flink to look at the first 8 bytes for the artefact ID?Reme Ajayi
01/17/2023, 3:25 PMsrc_ddl = """
CREATE TABLE source_kafka (
viewtime INT,
registertime BIGINT,
pageid STRING
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = '<http://localhost:8081/>',
'properties.group.id' = 'test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);"""
But I keep running into this error.
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = pageviews, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1673563915562, serialized key size = 2, serialized value size = 21, headers = RecordHeaders(headers = [RecordHeader(key = task.generation, value = [48]), RecordHeader(key = task.id, value = [48]), RecordHeader(key = current.iteration, value = [53])], isReadOnly = false), key = [B@4ee8961e, value = [B@1a9f96a).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
Any insight on how to fix would be really helpful?Adrian Chang
01/17/2023, 5:21 PMpostgres-cdc
but confirmed_flush_lsn
is not increasing even if I update data on the table and the update is received on Flink. This is causing full storage on Postgres.
This is my table on Postgres
CREATE TABLE IF NOT EXISTS aerial_core_development.thing_meta
(
id integer NOT NULL DEFAULT nextval('aerial_core_development.thing_meta_id_seq'::regclass),
thing_serial character varying(64) COLLATE pg_catalog."default" NOT NULL,
floor integer,
location character varying(32) COLLATE pg_catalog."default",
location_custom text COLLATE pg_catalog."default",
name character varying(64) COLLATE pg_catalog."default",
device_type character varying(64) COLLATE pg_catalog."default",
timezone character varying(64) COLLATE pg_catalog."default",
CONSTRAINT thing_meta_pkey PRIMARY KEY (id),
CONSTRAINT thing_meta_thing_serial_fkey FOREIGN KEY (thing_serial)
REFERENCES aerial_core_development.thing (serial) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
)
WITH (
OIDS = FALSE
)
TABLESPACE pg_default;
ALTER TABLE IF EXISTS aerial_core_development.thing_meta
OWNER to aerial;
REVOKE ALL ON TABLE aerial_core_development.thing_meta FROM flink;
GRANT ALL ON TABLE aerial_core_development.thing_meta TO aerial;
GRANT DELETE, INSERT, UPDATE, SELECT ON TABLE aerial_core_development.thing_meta TO aerial_core;
GRANT SELECT ON TABLE aerial_core_development.thing_meta TO flink;
This is how I use the connector
CREATE TABLE thing_meta (
thing_serial VARCHAR NOT NULL,
timezone VARCHAR
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '54321',
'username' = 'flink_bridge',
'password' = <password>,
'database-name' = 'aerial_core_development',
'schema-name' = 'aerial_core_development',
'table-name' = 'thing_meta',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'thing_meta_adrian'
)
Postgres engine version is 10.21
SELECT
*,
pg_current_wal_lsn(),
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(),restart_lsn)) AS replicationSlotLag
FROM pg_replication_slots;
Do you have any idea why confirmed_flush_lsn
is not advancing ?
ThanksNathanael England
01/17/2023, 5:59 PM{device: <id>, time: <event time>}
, broadcast that side output stream and connect it to the stream that feeds the second process function.
• Generate multiple events out of the first process function like {rule: <id>, device: <id>, time: <event time>}
where all that changes is rule ID so that this side output can be connected to the other side output that is keyed by the same namespace.
• Something else?
The former seems easier but perhaps not the proper intent of broadcast state. The latter seems like a lot of extra traffic/noise in the system.Colin Williams
01/17/2023, 6:21 PMorg.apache.flink.table.api.ValidationException: Column types of query result and sink for 'temporaryCatalog.default.enrichmentOut' do not match.
Cause: Different number of columns.
Query schema: [id: STRING NOT NULL, enrichment: STRING NOT NULL, ts: TIMESTAMP(3) *ROWTIME*]
Sink schema: [id: STRING, status: STRING, enrichment: STRING, ts: TIMESTAMP(3)]
Based on https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+time+attribute+in+SQL+DDL
I think I need to set a watermark strategy and perhaps rowtime field for my table job. Can I set this on the Table schema below? Is there a flink doc that shows this or would someone please kindly provide the syntax for doing this with the Table API?
Schema.newBuilder()
.column("id", DataTypes.STRING().notNull())
.column("status", DataTypes.STRING().notNull())
.column("enrichment", DataTypes.STRING().notNull())
.column("ts",DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
.primaryKey("id")
.watermark("ts","ts")
.build();
Jason Politis
01/17/2023, 7:57 PMKrish Narukulla
01/18/2023, 3:52 AMZeyu Qiu
01/18/2023, 4:17 AMDataStream.map()
in python seems doesn’t work. I write a Python demo like:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.enable_checkpointing(1000)
source = KafkaSource.builder().build()
stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "team_config_source")
sink = FileSink \
.for_row_format('/opt/result/', Encoder.simple_string_encoder("UTF-8")) \
.with_output_file_config(OutputFileConfig.builder()
.with_part_prefix("team_config")
.with_part_suffix(".json")
.build()) \
.with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024 ** 3, rollover_interval=15 * 60 * 1000,
inactivity_interval=5 * 60 * 1000)) \
.build()
def mapping(data):
return data
stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
env.execute()
But Flink gives me this exception:
2023-01-18 11:34:34 Traceback (most recent call last):
2023-01-18 11:34:34 File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
2023-01-18 11:34:34 return _run_code(code, main_globals, None,
2023-01-18 11:34:34 File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
2023-01-18 11:34:34 exec(code, run_globals)
2023-01-18 11:34:34 File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 223, in <module>
2023-01-18 11:34:34 process2()
2023-01-18 11:34:34 File "/tmp/pyflink/00606d52-b6c1-4e13-b7cb-73ee8e196db6/42be79fb-c8bb-4de1-b0fb-c89a7702cddc/flink_driver.py", line 218, in process2
2023-01-18 11:34:34 stream.map(mapping, BasicTypeInfo.STRING_TYPE_INFO()).sink_to(sink)
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 312, in map
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/data_stream.py", line 654, in process
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 991, in _find_and_load
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 655, in _load_unlocked
2023-01-18 11:34:34 File "<frozen importlib._bootstrap>", line 618, in _load_backward_compatible
2023-01-18 11:34:34 File "<frozen zipimport>", line 259, in load_module
2023-01-18 11:34:34 File "/opt/flink/opt/python/pyflink.zip/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
2023-01-18 11:34:34 AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
According to the exception, I look into the flink_fn_execution_pb2.py
, and I found the code in this file’s beginning was:
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile('xxx')
_INPUT = DESCRIPTOR.message_types_by_name['Input']
But seems DescriptorPool.AddSerializedFile()
doesn’t have a return statement, so DESCRIPTOR
is always None
. Am I using the DataStream.map()
in a wrong way or there is something wrong?
My environment is:
• Flink 1.16.0
• Python Package: apache-flink==1.16.0Lei Su
01/18/2023, 4:31 AMSumit Nekar
01/18/2023, 5:11 AMkubernetes.operator.job.upgrade.last-state-fallback.enabled
is set to false
). If the last checkpoint is not available, the job upgrade will fail.
So is it always better to use savepoint as it falls back last state mode in case when job is unhealthy.?
In savepoint upgrade , always savepoint is taken before upgrade/restart. Can it increase job estarting time?
Which one is preferred for a statefull stream processing flink job?