https://flink.apache.org/ logo
Join Slack
Powered by
# random
  • g

    George Leonard

    05/17/2025, 12:22 PM
    got it working. created the following file. nano ~/.jupyter/jupyter_notebook_config.py
    Copy code
    c.NotebookApp.ip = '0.0.0.0'  # Listens on all network interfaces
    c.NotebookApp.open_browser = False
  • g

    George Leonard

    05/17/2025, 1:02 PM
    able to paste python commands in notebook up to the below.
    Copy code
    t_env.execute_sql("""
            CREATE CATALOG fluss_catalog WITH (
                'type'              = 'fluss',
                'bootstrap.servers' = 'coordinator-server:9123'
            )
        """)
    this fails badly. error stack pretty much says don't know about type = fluss.
  • u

    umar farooq

    05/19/2025, 2:37 PM
    Hi Everyone , Is there a way we can deploy batch jobs using the Flink kubernetes operator ?
  • s

    Sandeep Devarapalli

    05/22/2025, 5:15 AM
    Hi all, want to understand this, does Flink CDC use Debezium internally?
    l
    • 2
    • 1
  • a

    André Casimiro

    06/03/2025, 2:51 PM
    Hi, is there any initiative aiming at running Flink with Java 21 virtual threads? My particular use case is running Flink in a single JVM not a distributed cluster, and seems thread waiting/switching is the next bottleneck for me.
    p
    • 2
    • 1
  • m

    Masha A

    06/03/2025, 5:37 PM
    Your story belongs on stage. The Call for Papers for Current 2025 – New Orleans is officially open! If you’ve wrangled data in production, shipped something cool under pressure, or helped a team unlock real-time Flink magic! ✅ Engineers, architects, OSS contributors — this stage is yours. 📅 CFP Deadline: June 15, 2025 🧠 Office Hours: June 13, 8–9 AM PDT — come get feedback or bounce ideas at #speakers-office-hours No fluff. Just honest talks from people doing the work. 👉 Submit your story through sessionize
  • c

    Chiara

    06/06/2025, 2:56 PM
    Hi all, got a cool open-source analytics project? Come speak at OSA Con 2025! We’re looking for devs & data folks building with open source tools. Last year we had 2,000+ devs register for the conference, this year we are hoping for more. Join us :) 👉 Submit your talk: https://sessionize.com/osacon-2025/
  • p

    Pedro Mázala

    06/12/2025, 3:27 PM
    Hey there, folks! 👋 I’m on a mission to learn how cool kids weave metadata + policy enforcement into their data stacks. If you’ve ever built or run stuff like: • A metadata catalog that actually talks to a policy engine • Data-lineage views that spell out where a rule was applied (or ignored 🥸) • Any clever tricks that make auditors smile instead of sigh …I’d love to swap stories. Drop a 👍 below or ping me in DMs if you’re up for a quick chat.
  • g

    George Leonard

    06/15/2025, 1:20 PM
    guys, are there a specific channel for connectors ?
  • g

    George Leonard

    06/25/2025, 5:31 AM
    curious. has anyone ever done a api endpoint "connector" in a flink, I mean a endpoint running as one or more jobs on a flink cluster, each on it's own port, to which data is pushed.
    d
    • 2
    • 8
  • g

    Giannis Polyzos

    07/02/2025, 7:44 AM
    Anyone here that has used Apache Fory (prev. Fury) with Apache Flink for serdes? And what was the experience? https://fory.apache.org/
    a
    • 2
    • 1
  • v

    Vikas Patil

    07/02/2025, 7:10 PM
    Which version of zookeeper I need to use for Flink 1.19 ? I am not able to find proper documentation for this.
    r
    m
    • 3
    • 3
  • z

    Zhong Chen

    07/03/2025, 5:34 PM
    I am trying to enable
    Copy code
    securityContext:
                readOnlyRootFilesystem: true
    for my flink jobs running in the k8s cluster. Has anyone done this before?
  • g

    George Leonard

    07/08/2025, 3:32 PM
    quick question, Flink 1.20.1 on Java 17 I have a Redis database and purely want to expose the data to Flink, I have the below table create.... but it's not working, error below. I have the following jar in my lib folder.
    Copy code
    CREATE TABLE hive.snmp.snmp_device_info_redis (
         device_id                  VARCHAR(255) PRIMARY KEY NOT ENFORCED          -- Unique identifier for the device
        ,ip_address                 VARCHAR(45) NOT NULL                           -- IP address of the device (IPv4 or IPv6)
        ,hostname                   VARCHAR(255)                                   -- Hostname of the device
        ,device_location            VARCHAR(1000)                                  -- Physical location of the device
        ,device_type                VARCHAR(100)                                   -- Type of device (e.g., "router", "switch", "server")
        ,vendor                     VARCHAR(100)                                   -- Device vendor (e.g., "Cisco", "Juniper", "HP")
        ,devmodel                   VARCHAR(100)                                   -- Device model
        ,firmware_version           VARCHAR(100)                                   -- Firmware or OS version
        ,last_updated_ts            TIMESTAMP(3)                                   -- Timestamp of the last update to this device's info
    ) WITH (
        'connector'             = 'redis',                                      -- This is a placeholder; connector name might vary
        'hostname'              = 'redissnmp',                                  -- Replace with your Redis host
        'port'                  = '6379',                                       -- Replace with your Redis port
        'database'              = '0',                                          -- Redis database number
        'data-type'             = 'JSON',                                       -- Or 'STRING' if you're storing full JSON strings
        'lookup.cache.max-rows' = '5000',                                       -- Cache lookup results for performance
        'lookup.cache.ttl'      = '10min',                                      -- How long cached entries are valid
        'lookup.max-retries'    = '3',                                          -- Max retries for failed lookups
        'key.prefix'            = 'device:',                                    -- Prefix for your Redis keys (e.g., 'device:router-core-01')
        'value.format'          = 'json'                                        -- Specify that the value is JSON
        -- Add any other Redis specific configurations like password, SSL, connection pool settings
        -- 'password' = 'your_redis_password'
    );
    Copy code
    flink-connector-redis_2.11-1.0.jar
    flink-connector-redis-1.4.3.jar
    Copy code
    Flink SQL> select * from hive.snmp.snmp_device_info_redis;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: One or more required options are missing.
    
    Missing required options are:
    
    command
    
    Flink SQL>
  • c

    Chiara

    07/14/2025, 6:55 PM
    Will you be in the San Jose area on July 31st? Come join the Open Source Analytics Community at the Open Source Analytics Festival! 📍 IBM Silicon Valley Lab, San Jose 🗓️ Thursday, July 31, 2025 | 5:00 – 9:00 PM PT 💬 In-person • Interactive Sessions • Networking 💵 Free Admission Live talks on ClickHouse®, StarRocks, Presto, and Apache Gluten. Get fresh insights, bold opinions, and chilled drinks. Hope to see you there! Register here: https://lu.ma/wnsi90or
  • j

    jq l

    07/18/2025, 6:15 AM
    I have a question that has been bothering me for 3 days, and I still haven't managed to solve it.😭😭😭 Flink version: 1.2, flinkcdc: 3.2.1, Oracle: 19c (CDB+PDB mode). When I use the following statement
    Copy code
    FLINK SQL/CREATE TABLE oracle_prod_testQWE(
      COLUMN1 BIGINT,
      COLUMN2 STRING,
      PRIMARY KEY (COLUMN1) NOT ENFORCED
    ) WITH (
      'connector' = 'oracle-cdc',
      'hostname' = '<http://10.xxx|10.xxx>',
      'port' = '1521',
      'username' = 'c##xxx',
      'password' = 'xxx',
      'database-name' = 'orclCdb', 
      'schema-name' = 'APP_USER',
      'table-name' = 'TABLEZXC',
      'debezium.database.pdb.name' = 'db1cdb',
      'debezium.log.mining.strategy' = 'online_catalog',
      'debezium.snapshot.mode' = 'initial'
       );
       
    FLINK SQL/select * from oracle_prod_testQWE;
    Copy code
    My Flink job always reports this error:
    
    xxxxxxxxxx Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)    ... 6 moreCaused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275)    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118)    at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
    Sure, I'd be happy to help! Could you please provide more details about the error message? What specific error are you encountering, and what steps have you taken so far to troubleshoot the issue? The more information you can give me, the better I can assist you.😭😭😭
  • j

    jq l

    07/18/2025, 7:47 AM
    Question: When my Oracle version is 19c and I have enabled the CDB+PDB mode, can I still use FlinkCDC to synchronize data?
  • a

    Abdulaziz Alqahtani

    07/23/2025, 10:26 AM
    Hi everyone , I'm new to Flink and currently exploring it for a fraud detection use case. I had a couple of questions I’d appreciate guidance on: 1. Large time windows: we’re looking to compute aggregates over windows as large as 6–12 months, with an ingest rate of around 10K events/sec. How feasible is this in Flink in terms of state size and performance? Any tips on managing that scale? 2. Prewarming aggregates: is there a clean way to bootstrap state from historical data so new jobs or aggregates can start with warm state instead of empty? Any resources or real-world examples would be super helpful, thanks in advance
    l
    d
    p
    • 4
    • 7
  • v

    Vinod Sherikar

    08/05/2025, 6:18 AM
    Hi All, I want to consume messages from RabbitMQ (super streams) using Flink connectors. I explored RabbitMQ connectors for Flink at https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/connectors/datastream/rabbitmq/ but this page only explains on consuming messages from RabbitMQ queue and not from the RabbitMQ super streams. More details on RabbitMQ streams can be found here https://www.rabbitmq.com/docs/streams I'm looking out for an out of the box Flink connector which provides fault tolerance and scalability to connect to RabbitMQ streams and consume messages using Flink?
  • j

    Jaime López Gutiérrez

    08/15/2025, 10:13 AM
    👋 Hey Flink community members! I'm Jaime López and I run Marketing at Ververica. We are the organizers of Flink Forward 2025, we’ll be gathering in beautiful Barcelona from October 13–16, and I’d love to see as many of you there as possible—whether you’re building with Flink, contributing to the OS project, or just want to learn more about it. 🎟️ As a little thank-you to this awesome community, we’re offering a 10% discount code for the first 30 people who register for a conference-only pass:
    ZNXQR9KOXR18
    👉 https://flink-forward.org/barcelona-2025 Come hang out with fellow stream processing folks, hear what others are building, and swap ideas over some tapas and sunshine ☀️ Hope to see you there!
  • r

    Richard Moorhead

    08/16/2025, 2:25 AM
    documentation links from nightlies seem to be down: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/try-flink-kubernetes-operator/quick-start/
  • g

    George Leonard

    08/26/2025, 5:14 PM
    hi all. any chance anyone has build a docker compose build using polaris as catalog with Paimon (paimon-flink-1.20-1.2.0) as the open table format that can advise on flink sql to create the catalog and what jar's they used. i'm working with flink 1.20.1
  • g

    George Leonard

    08/26/2025, 5:18 PM
    hmmm, polaris is suppose to be a REST based catalog, but looking like it's only available for iceberg at OTF ????
  • g

    George Leonard

    08/28/2025, 9:08 AM
    guys, urgently looking for someone that knows the Flink CDC stack well, sitting with a problem on inbound data from Postgresql. see the flink-cdc channel, feels like a bug...
  • g

    George Leonard

    08/28/2025, 9:12 AM
    i'm finding this is the taskmanager logs
    Copy code
    taskmanager-2    | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema              [] - Relation '16399' resolved to table 'public.children'
    taskmanager-2    | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.base.ChangeEventQueue                  [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265891760, lsn_commit=265823328, lsn=265891760, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=4589582A}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37455,nationalid=4589582A,data={"_id": "be9ba377-0f0e-4bc3-8ddc-b21c4e6c63d5", "dob": "19/08/21", "name": "Saul", "gender": "Male", "address": {"town": "Dublin City", "county": "Dublin", "country": "Ireland", "province": "Leinster", "street_1": "70 Brislane Street Street", "street_2": "", "parcel_id": "D04-95530", "postal_code": "D04", "country_code": "IE", "neighbourhood": "Sandymount"}, "surname": "O'Sweeney", "family_id": "686dc1ab-b322-4500-9b20-819d9dc1b954", "nationalId": "4589582A", "father_nationalId": "1776049U", "mother_nationalId": "4415915B"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265891760"],schema=public,table=children,txId=1419,lsn=265891760},op=c,ts_ms=1756358814379}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
    taskmanager-2    | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema              [] - Relation '16399' resolved to table 'public.children'
    taskmanager-2    | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.base.ChangeEventQueue                  [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265892680, lsn_commit=265823328, lsn=265892680, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=6390341K}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37456,nationalid=6390341K,data={"_id": "6f3cfc10-d21d-4770-b784-349cafdb6460", "dob": "21/01/03", "name": "Robyn", "gender": "Female", "address": {"town": "Galway City", "county": "Galway", "country": "Ireland", "province": "Connacht", "street_1": "48 Loughnane Street Street", "street_2": "", "parcel_id": "H91 F2X4-68453", "postal_code": "H91 F2X4", "country_code": "IE", "neighbourhood": "The Claddagh"}, "surname": "Bridson", "family_id": "1e8f3dea-349b-4a75-a9df-44d8b266908b", "nationalId": "6390341K", "father_nationalId": "7859141U", "mother_nationalId": "2642305V"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265892680"],schema=public,table=children,txId=1419,lsn=265892680},op=c,ts_ms=1756358814379}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
    taskmanager-2    | 2025-08-28 05:26:54,379 DEBUG io.debezium.connector.postgresql.PostgresSchema              [] - Relation '16399' resolved to table 'public.children'
    taskmanager-2    | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.base.ChangeEventQueue                  [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265893512, lsn_commit=265823328, lsn=265893512, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=0282428R}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37457,nationalid=0282428R,data={"_id": "43065c0b-8388-4fbb-8f3f-e20f4aa88093", "dob": "20/10/13", "name": "Colleen", "gender": "Female", "address": {"town": "Galway City", "county": "Galway", "country": "Ireland", "province": "Connacht", "street_1": "48 Loughnane Street Street", "street_2": "", "parcel_id": "H91 F2X4-68453", "postal_code": "H91 F2X4", "country_code": "IE", "neighbourhood": "The Claddagh"}, "surname": "Bridson", "family_id": "1e8f3dea-349b-4a75-a9df-44d8b266908b", "nationalId": "0282428R", "father_nationalId": "7859141U", "mother_nationalId": "2642305V"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265893512"],schema=public,table=children,txId=1419,lsn=265893512},op=c,ts_ms=1756358814380}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
    taskmanager-2    | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.postgresql.PostgresSchema              [] - Relation '16399' resolved to table 'public.children'
    taskmanager-2    | 2025-08-28 05:26:54,380 DEBUG io.debezium.connector.base.ChangeEventQueue                  [] - Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=postgres_cdc_source}, sourceOffset={transaction_id=null, lsn_proc=265894344, lsn_commit=265823328, lsn=265894344, txId=1419, ts_usec=1756358814149129}} ConnectRecord{topic='postgres_cdc_source.public.children', kafkaPartition=null, key=Struct{nationalid=5306865M}, keySchema=Schema{postgres_cdc_source.public.children.Key:STRUCT}, value=Struct{after=Struct{id=37458,nationalid=5306865M,data={"_id": "0f5b8d85-21b5-4707-9424-5c2fa6f72e1f", "dob": "20/09/28", "name": "Hamish", "gender": "Male", "address": {"town": "Roscommon", "county": "Roscommon", "country": "Ireland", "province": "Connacht", "street_1": "19 Black Street Street", "street_2": "", "parcel_id": "F42 N4T1-58527", "postal_code": "F42 N4T1", "country_code": "IE", "neighbourhood": "Athlone Road"}, "surname": "Drummy", "family_id": "2de4bbf7-3745-4e1d-89f2-868945ffc655", "nationalId": "5306865M", "father_nationalId": "0038926T", "mother_nationalId": "2789043O"},created_at=2025-08-28T05:26:54.144503Z},source=Struct{version=1.9.8.Final,connector=postgresql,name=postgres_cdc_source,ts_ms=1756358814149,db=demog,sequence=["265823328","265894344"],schema=public,table=children,txId=1419,lsn=265894344},op=c,ts_ms=1756358814380}, valueSchema=Schema{postgres_cdc_source.public.children.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
    ^Cmake: *** [logsf] Error 255
  • g

    George Leonard

    08/28/2025, 9:22 AM
    Copy code
    CREATE OR REPLACE TABLE postgres_catalog.inbound.children (
         id             BIGINT
        ,nationalId     VARCHAR(14) -- NOT NULL
        ,data           STRING
        ,created_at     TIMESTAMP_LTZ(3)
        ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
    ) WITH (
         'connector'                            = 'postgres-cdc'
        ,'hostname'                             = 'postgrescdc'
        ,'port'                                 = '5432'            -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
        ,'username'                             = 'dbadmin'
        ,'password'                             = 'dbpassword'
        ,'database-name'                        = 'demog'
        ,'schema-name'                          = 'public'
        ,'table-name'                           = 'children'
        ,'slot.name'                            = 'test1'
        ,'scan.incremental.snapshot.enabled'    = 'false'           -- experimental feature: incremental snapshot (default of
        ,'scan.startup.mode'                    = 'latest-offset'         -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
        ,'decoding.plugin.name'                 = 'pgoutput'
        ,'scan.incremental.snapshot.enabled'    = 'true'
    );
    g
    • 2
    • 9
  • g

    George Leonard

    08/28/2025, 3:52 PM
    anyone strong with flink sql. pls. https://apache-flink.slack.com/archives/C03G7LJTS2G/p1756387568558039
  • g

    George Leonard

    08/29/2025, 5:42 AM
    anyone..
  • g

    George Leonard

    08/29/2025, 5:42 AM
    i'd like to unpack my original postgress structure into a flink and into a paimon structured record. pls help
  • g

    George Leonard

    08/29/2025, 9:45 AM
    guys. in the Flink UI. when we click on a job's logs, it goes to showing all logs. is it not possible to have it filter/display only log messages for that particular job/thread.