George Leonard
05/17/2025, 12:22 PMc.NotebookApp.ip = '0.0.0.0' # Listens on all network interfaces
c.NotebookApp.open_browser = False
George Leonard
05/17/2025, 1:02 PMt_env.execute_sql("""
CREATE CATALOG fluss_catalog WITH (
'type' = 'fluss',
'bootstrap.servers' = 'coordinator-server:9123'
)
""")
this fails badly. error stack pretty much says don't know about type = fluss.umar farooq
05/19/2025, 2:37 PMSandeep Devarapalli
05/22/2025, 5:15 AMAndré Casimiro
06/03/2025, 2:51 PMMasha A
06/03/2025, 5:37 PMChiara
06/06/2025, 2:56 PMPedro Mázala
06/12/2025, 3:27 PMGeorge Leonard
06/15/2025, 1:20 PMGeorge Leonard
06/25/2025, 5:31 AMGiannis Polyzos
07/02/2025, 7:44 AMVikas Patil
07/02/2025, 7:10 PMZhong Chen
07/03/2025, 5:34 PMsecurityContext:
readOnlyRootFilesystem: true
for my flink jobs running in the k8s cluster. Has anyone done this before?George Leonard
07/08/2025, 3:32 PMCREATE TABLE hive.snmp.snmp_device_info_redis (
device_id VARCHAR(255) PRIMARY KEY NOT ENFORCED -- Unique identifier for the device
,ip_address VARCHAR(45) NOT NULL -- IP address of the device (IPv4 or IPv6)
,hostname VARCHAR(255) -- Hostname of the device
,device_location VARCHAR(1000) -- Physical location of the device
,device_type VARCHAR(100) -- Type of device (e.g., "router", "switch", "server")
,vendor VARCHAR(100) -- Device vendor (e.g., "Cisco", "Juniper", "HP")
,devmodel VARCHAR(100) -- Device model
,firmware_version VARCHAR(100) -- Firmware or OS version
,last_updated_ts TIMESTAMP(3) -- Timestamp of the last update to this device's info
) WITH (
'connector' = 'redis', -- This is a placeholder; connector name might vary
'hostname' = 'redissnmp', -- Replace with your Redis host
'port' = '6379', -- Replace with your Redis port
'database' = '0', -- Redis database number
'data-type' = 'JSON', -- Or 'STRING' if you're storing full JSON strings
'lookup.cache.max-rows' = '5000', -- Cache lookup results for performance
'lookup.cache.ttl' = '10min', -- How long cached entries are valid
'lookup.max-retries' = '3', -- Max retries for failed lookups
'key.prefix' = 'device:', -- Prefix for your Redis keys (e.g., 'device:router-core-01')
'value.format' = 'json' -- Specify that the value is JSON
-- Add any other Redis specific configurations like password, SSL, connection pool settings
-- 'password' = 'your_redis_password'
);
flink-connector-redis_2.11-1.0.jar
flink-connector-redis-1.4.3.jar
Flink SQL> select * from hive.snmp.snmp_device_info_redis;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: One or more required options are missing.
Missing required options are:
command
Flink SQL>
Chiara
07/14/2025, 6:55 PMjq l
07/18/2025, 6:15 AMFLINK SQL/CREATE TABLE oracle_prod_testQWE(
COLUMN1 BIGINT,
COLUMN2 STRING,
PRIMARY KEY (COLUMN1) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '<http://10.xxx|10.xxx>',
'port' = '1521',
'username' = 'c##xxx',
'password' = 'xxx',
'database-name' = 'orclCdb',
'schema-name' = 'APP_USER',
'table-name' = 'TABLEZXC',
'debezium.database.pdb.name' = 'db1cdb',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.snapshot.mode' = 'initial'
);
FLINK SQL/select * from oracle_prod_testQWE;
My Flink job always reports this error:
xxxxxxxxxx Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 moreCaused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275) at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
Sure, I'd be happy to help! Could you please provide more details about the error message? What specific error are you encountering, and what steps have you taken so far to troubleshoot the issue? The more information you can give me, the better I can assist you.😭😭😭jq l
07/18/2025, 7:47 AMAbdulaziz Alqahtani
07/23/2025, 10:26 AMVinod Sherikar
08/05/2025, 6:18 AMJaime López Gutiérrez
08/15/2025, 10:13 AMZNXQR9KOXR18
👉 https://flink-forward.org/barcelona-2025
Come hang out with fellow stream processing folks, hear what others are building, and swap ideas over some tapas and sunshine ☀️
Hope to see you there!Richard Moorhead
08/16/2025, 2:25 AMGeorge Leonard
08/26/2025, 5:14 PMGeorge Leonard
08/26/2025, 5:18 PMGeorge Leonard
08/28/2025, 9:08 AMGeorge Leonard
08/28/2025, 9:12 AMtaskmanager-2 | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema [] - Relation '16399' resolved to table 'public.children'
taskmanager-2 | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.base.ChangeEventQueue [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265891760, lsn_commit=265823328, lsn=265891760, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=4589582A}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37455,nationalid=4589582A,data={"_id": "be9ba377-0f0e-4bc3-8ddc-b21c4e6c63d5", "dob": "19/08/21", "name": "Saul", "gender": "Male", "address": {"town": "Dublin City", "county": "Dublin", "country": "Ireland", "province": "Leinster", "street_1": "70 Brislane Street Street", "street_2": "", "parcel_id": "D04-95530", "postal_code": "D04", "country_code": "IE", "neighbourhood": "Sandymount"}, "surname": "O'Sweeney", "family_id": "686dc1ab-b322-4500-9b20-819d9dc1b954", "nationalId": "4589582A", "father_nationalId": "1776049U", "mother_nationalId": "4415915B"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265891760"],schema=public,table=children,txId=1419,lsn=265891760},op=c,ts_ms=1756358814379}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
taskmanager-2 | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema [] - Relation '16399' resolved to table 'public.children'
taskmanager-2 | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.base.ChangeEventQueue [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265892680, lsn_commit=265823328, lsn=265892680, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=6390341K}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37456,nationalid=6390341K,data={"_id": "6f3cfc10-d21d-4770-b784-349cafdb6460", "dob": "21/01/03", "name": "Robyn", "gender": "Female", "address": {"town": "Galway City", "county": "Galway", "country": "Ireland", "province": "Connacht", "street_1": "48 Loughnane Street Street", "street_2": "", "parcel_id": "H91 F2X4-68453", "postal_code": "H91 F2X4", "country_code": "IE", "neighbourhood": "The Claddagh"}, "surname": "Bridson", "family_id": "1e8f3dea-349b-4a75-a9df-44d8b266908b", "nationalId": "6390341K", "father_nationalId": "7859141U", "mother_nationalId": "2642305V"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265892680"],schema=public,table=children,txId=1419,lsn=265892680},op=c,ts_ms=1756358814379}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
taskmanager-2 | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema [] - Relation '16399' resolved to table 'public.children'
taskmanager-2 | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.base.ChangeEventQueue [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265893512, lsn_commit=265823328, lsn=265893512, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=0282428R}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37457,nationalid=0282428R,data={"_id": "43065c0b-8388-4fbb-8f3f-e20f4aa88093", "dob": "20/10/13", "name": "Colleen", "gender": "Female", "address": {"town": "Galway City", "county": "Galway", "country": "Ireland", "province": "Connacht", "street_1": "48 Loughnane Street Street", "street_2": "", "parcel_id": "H91 F2X4-68453", "postal_code": "H91 F2X4", "country_code": "IE", "neighbourhood": "The Claddagh"}, "surname": "Bridson", "family_id": "1e8f3dea-349b-4a75-a9df-44d8b266908b", "nationalId": "0282428R", "father_nationalId": "7859141U", "mother_nationalId": "2642305V"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265893512"],schema=public,table=children,txId=1419,lsn=265893512},op=c,ts_ms=1756358814380}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
taskmanager-2 | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.postgresql.PostgresSchema [] - Relation '16399' resolved to table 'public.children'
taskmanager-2 | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.base.ChangeEventQueue [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265894344, lsn_commit=265823328, lsn=265894344, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=5306865M}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37458,nationalid=5306865M,data={"_id": "0f5b8d85-21b5-4707-9424-5c2fa6f72e1f", "dob": "20/09/28", "name": "Hamish", "gender": "Male", "address": {"town": "Roscommon", "county": "Roscommon", "country": "Ireland", "province": "Connacht", "street_1": "19 Black Street Street", "street_2": "", "parcel_id": "F42 N4T1-58527", "postal_code": "F42 N4T1", "country_code": "IE", "neighbourhood": "Athlone Road"}, "surname": "Drummy", "family_id": "2de4bbf7-3745-4e1d-89f2-868945ffc655", "nationalId": "5306865M", "father_nationalId": "0038926T", "mother_nationalId": "2789043O"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265894344"],schema=public,table=children,txId=1419,lsn=265894344},op=c,ts_ms=1756358814380}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
^Cmake: *** [logsf] Error 255
George Leonard
08/28/2025, 9:22 AMCREATE OR REPLACE TABLE postgres_catalog.inbound.children (
id BIGINT
,nationalId VARCHAR(14) -- NOT NULL
,data STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'children'
,'slot.name' = 'test1'
,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature: incremental snapshot (default of
,'scan.startup.mode' = 'latest-offset' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
,'decoding.plugin.name' = 'pgoutput'
,'scan.incremental.snapshot.enabled' = 'true'
);
George Leonard
08/28/2025, 3:52 PMGeorge Leonard
08/29/2025, 5:42 AMGeorge Leonard
08/29/2025, 5:42 AMGeorge Leonard
08/29/2025, 9:45 AM