https://pinot.apache.org/ logo
Join Slack
Powered by
# flink-pinot-connector
  • x

    Xiang Fu

    03/16/2021, 7:00 PM
    this is the design doc: https://docs.google.com/document/d/1GVoFHOHSDPs1MEDKEmKguKwWMqM1lwQKj2e64RAKDf8/edit#heading=h.uvocz0dwkepo
    Pinot Connector
  • c

    Chinmay Soman

    03/16/2021, 7:01 PM
    πŸ‘
  • c

    Chinmay Soman

    03/16/2021, 10:44 PM
    Looks like this doc needs a lot of changes
  • y

    Yupeng Fu

    03/16/2021, 10:45 PM
    sure. i can update the doc to reflect the latest discussions
    πŸ‘ 1
  • z

    zongxing hu

    06/22/2021, 12:28 PM
    @User has left the channel
  • p

    Prateek Singhal

    10/13/2021, 10:01 PM
    @User has left the channel
  • a

    Alice

    04/28/2022, 1:17 AM
    Hi team, is flink connector available now?
  • a

    Alice

    04/28/2022, 1:18 AM
    I mean flink-pinot-connector.
  • n

    Neha Pawar

    04/28/2022, 2:36 AM
    It should be, after this commit https://github.com/apache/pinot/pull/8233
  • n

    Neha Pawar

    04/28/2022, 2:36 AM
    @User is there a user doc?
  • a

    Alice

    04/28/2022, 2:50 AM
    Thanks. Is there a timeline?
  • y

    Yupeng Fu

    04/28/2022, 3:14 AM
    no user doc yet
  • y

    Yupeng Fu

    04/28/2022, 3:14 AM
    but you can check out the readme
  • t

    Tommaso Garuglieri

    06/19/2022, 4:09 PM
    Hi, first of all thank you the awesome work on the flink integration πŸ₯³ We were looking forward for this change in order to insert data from our iceberg tables into pinot, so that we can improve our query latencies. For our use case the data in iceberg tables is partitioned by multiple fields, we rewrite partitions periodically to correct data (also performing delete operations ). Is there a way to replicate this ingestion logic into Pinot offline tables ? I undestand that Pinot ingestion is made for immutable data so this is probably a corner case that is not covered πŸ€• We need to rewrite completely data related to specific partitions, from my undestanding we should overwrite specific segments, ideally we should generate segment names based on our partition fields eg
    seg_<partition_a>_<partition_b>...
    but this is not supported at the moment. Is there any workaround, eg. custom segment name gen. implementation that we can leverage inside the flink ingestion job ?
    n
    r
    +2
    • 5
    • 29
  • r

    reallyonthemove tous

    11/24/2022, 12:26 AM
    hi folks, i was trying to test the pinot-flink-connector at pinot/pinot-connectors/pinot-flink-connector when submitting the jar though i see the exception below. Any idea whats going on? nkshah@docdbpogo1:~/Flinkexamples/flink-api-examples$ ./flink-1.16.0/bin/flink run -c org.apache.pinot.connector.flink.FlinkQuickStart ~/pinot/pinot-connectors/pinot-flink-connector/target/pinot-flink-connector-0.12.0-SNAPSHOT.jar java.lang.NoClassDefFoundError: org/apache/pinot/common/utils/http/HttpClient at org.apache.pinot.connector.flink.FlinkQuickStart.main(FlinkQuickStart.java:85) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168) Caused by: java.lang.ClassNotFoundException: org.apache.pinot.common.utils.http.HttpClient at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
  • r

    Rong R

    11/28/2022, 5:27 PM
    i think it has to do with how did you package the pinot-flink-connetor JAR.
  • r

    Rong R

    11/28/2022, 5:27 PM
    could you share your setup and your classpath?
  • r

    reallyonthemove tous

    11/28/2022, 5:32 PM
    Thanks Rong, I am using the jar created in target directory of pinot (pinot/pinot-connectors/pinot-flink-connector/target/pinot-flink-connector-0.12.0-SNAPSHOT.jar) and submitting it to flink using hte commandline below. ~/Flinkexamples/flink-api-examples$ ./flink-1.16.0/bin/flink run -c org.apache.pinot.connector.flink.FlinkQuickStart ~/pinot/pinot-connectors/pinot-flink-connector/target/pinot-flink-connector-0.12.0-SNAPSHOT.jar
  • r

    reallyonthemove tous

    11/28/2022, 5:35 PM
    i am not setting the classpath explicitly though since I was expecting the dependencies to be bundled in the jar. any suggestions to what it should point to?
  • r

    Rong R

    11/28/2022, 6:40 PM
    so you are building the jar using maven? i dont think that's going to be a bundle jar. so you might need to download the dist jar instead of build from source
  • r

    reallyonthemove tous

    11/28/2022, 10:18 PM
    Hi Rong, i am not seeing the pinot-flink-connector in the distribution i downloaded from https://pinot.apache.org/download/. Is there some other location i should look to get it from?
    πŸ‘‹ 1
  • r

    Rong R

    11/30/2022, 5:46 AM
    Hmm. Interesting I wasn't sure the release procedure. You could also try directly include the module in your own maven project and try. Please file a ticket for the exact reproduce step and I can take a look
  • h

    Harshit

    01/04/2023, 12:17 PM
    Hello, I am getting following error while ingesting data via Flink
    Could not find index for column: gKey, type: FORWARD_INDEX, segment: /tmp/data/pinotServerData/key1_OFFLINE/key1_3_
    Schema
    Copy code
    {
      "schemaName": "key",
      "dimensionFieldSpecs": [
        {
          "name": "rootKey",
          "dataType": "STRING"
        },
        {
          "name": "gKey",
          "dataType": "STRING"
        }
      ],
      "primaryKeyColumns": [
        "gKey"
      ]
    }
    Table config
    Copy code
    {
      "tableName": "key",
      "tableType": "OFFLINE",
      "isDimTable": true,
      "segmentsConfig": {
        "schemaName": "key",
        "segmentPushType": "REFRESH",
        "replication": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP"
      },
      "metadata": {
        "customConfigs": {}
      },
      "quota": {
        "storage": "200M"
      }
    }
    r
    • 2
    • 1
  • r

    Rashmin Patel

    03/17/2023, 6:48 AM
    Hello All Can we ingest data to REALTIME table using this connector ? We are facing below exception when doing this.
    Copy code
    Caused by: java.lang.NullPointerException: Failed to get partition id for segment: my_flink_sink_test_1678965238000_1678965238000_0_0 (upsert-enabled table: my_flink_sink_test_REALTIME)
    	at org.apache.pinot.shaded.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:787) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
    	at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.handleUpsert(RealtimeTableDataManager.java:410) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
    After diving into pinot's codebase, it looks like that
    SegmentUtils.getRealtimeSegmentPartitionId
    is returning null because it has a below check on the segmentName.
    Copy code
    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR); // SEPARATOR is `__`
        if (parts.length != 4) {
          return null;
        }
    While the segment name (
    my_flink_sink_test_1678965238000_1678965238000_0_0
    ) being generated in our case has
    _
    (single underscore) and hence it returns null. Env details: pinot-flink-connector version: 0.11.0 What could be the reason for this ? Any pointers/help on this will be appreciated πŸ™Œ
    y
    x
    +2
    • 5
    • 14
  • x

    Xiang Fu

    03/17/2023, 10:45 AM
    Real-time table push is supported from 0.12.0
    h
    l
    e
    • 4
    • 6
  • l

    Lakshmi Rao

    03/29/2023, 5:25 AM
    πŸ‘‹ thanks for the awesome work on the Flink-Pinot connector. Out of curiosity, is there a reason the sink lives in pinot vs. flink?