https://pulsar.apache.org/ logo
Join Slack
Powered by
# flink-connector
  • s

    Slackbot

    04/18/2022, 9:11 PM
    This message was deleted.
  • s

    Slackbot

    05/04/2022, 12:35 PM
    This message was deleted.
  • s

    Slackbot

    05/18/2022, 4:24 PM
    This message was deleted.
  • s

    Slackbot

    07/10/2022, 5:14 PM
    This message was deleted.
  • s

    Slackbot

    07/15/2022, 10:15 PM
    This message was deleted.
  • s

    Slackbot

    07/21/2022, 6:25 PM
    This message was deleted.
  • s

    Slackbot

    10/12/2022, 9:43 AM
    This message was deleted.
    b
    • 2
    • 1
  • s

    Slackbot

    12/14/2022, 10:09 AM
    This message was deleted.
  • s

    Slackbot

    12/29/2022, 3:35 AM
    This message was deleted.
  • s

    Slackbot

    01/13/2023, 9:19 PM
    This message was deleted.
    y
    j
    • 3
    • 6
  • s

    Slackbot

    02/13/2023, 11:43 AM
    This message was deleted.
    d
    s
    s
    • 4
    • 4
  • s

    Simi Ily

    03/08/2023, 2:28 AM
    Hi, Please help with Flink-Pulsar Table API in PyFlink I am getting following error in my PyFlink app which creates Table on Pulsar Topic. I tried adding both Flink and Streamnative flink-sql-connector-pulsar to the classpath. ############################### Error ############################### py4j.protocol.Py4JJavaError: An error occurred while calling o79.execute. : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.emp'. Table options are: 'admin-url'='http://192.168.1.10:8080' 'connector'='pulsar' 'format'='avro' 'service-url'='pulsar://192.168.1.10:6650' 'subscription_name'='test' 'topics'='persistent://cows/markets/emp' at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:166) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:191) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) ... ... Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='pulsar' at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:735) at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:709) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:162) ... 47 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'pulsar' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. Available factory identifiers are: blackhole datagen filesystem print python-input-format ############################### Code ############################### env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) tbl_env = StreamTableEnvironment.create(env) # Flink SQL Pulsar Connector #tbl_env.get_config().set("pipeline.classpaths", "file:///opt/flink-lib/flink-sql-connector-pulsar-1.16.1.jar") # Streamnative Flink Pulsar SQL Connector #tbl_env.get_config().set("pipeline.classpaths", "file:///opt/flink-lib/flink-sql-connector-pulsar-1.15.1.4.jar") tbl_env.get_config().set("pipeline.classpaths", "file:///opt/flink-lib/flink-sql-connector-pulsar-1.16.0.0.jar") tbl_emp_sql = """ CREATE TABLE emp( emp_id INT, emp_name VARCHAR ) WITH ( 'connector' = 'pulsar', 'admin-url' = 'http://192.168.1.10:8080', 'service-url' = 'pulsar://192.168.1.10:6650', 'topics' = 'persistent://public/default/emp', 'format' = 'avro' )""" tbl_env.execute_sql(tbl_emp_sql) emp_source = tbl_env.from_path("emp") result_table = emp_source.select(col("duration_window")).execute().print()
  • j

    Jacek Wiślicki

    04/25/2023, 1:27 PM
    Hi, everyone! On our PulsarSource we read org.apache.pulsar.client.api.Message<byte[]> in order to have access to the properties created on the Pulsar side. Unfortunately, it appears that we don't have information about the original schema that the message has in the topic (only org.apache.pulsar.client.impl.schema.BytesSchema). In order to properly decode the message to the format expected in our Flink functions, we need the source schema as it can differ from the expected one (several schema version on the topic). Also, neither Message.getData()/Message.getValue() seem to be valid Avro bytes. As a workaround, we simply serialise the original Avro schema as another message property and use it for deserialisation in our custom deserialisation schema. Still, maybe there is some "proper" way for handling this case? Thank you for any hints!
    👀 2
  • y

    Yu Wei Sung

    05/01/2023, 7:48 PM
    @Yu Wei Sung has left the channel
  • b

    bhasvij

    07/10/2023, 2:13 PM
    Pulsar Flink Sink is always sending only 200 messages. Any issue?
  • b

    bhasvij

    07/10/2023, 2:14 PM
    We tried increasing Batch message size and there is no use. Is it a known issue?
  • b

    bhasvij

    07/10/2023, 2:16 PM
    pulsar version: 1.10.2 and flink: 1.16.2
  • b

    bhasvij

    07/20/2023, 4:17 PM
    I am not sure any one is using flink pulsar sink in production. I am seeing issues
  • b

    bhasvij

    07/20/2023, 4:17 PM
    Its performance is very poor
  • b

    bhasvij

    07/20/2023, 4:22 PM
    Can any one throw some light on this???
  • s

    Slackbot

    07/20/2023, 4:29 PM
    This message was deleted.
    b
    • 2
    • 1
  • s

    Slackbot

    07/21/2023, 6:13 PM
    This message was deleted.
    b
    • 2
    • 4
  • s

    Slackbot

    08/16/2023, 6:27 PM
    This message was deleted.
    n
    p
    • 3
    • 6
  • r

    Rafał Trójczak

    11/27/2023, 9:18 AM
    Hello, All! We have a Flink job that has a source using the pulsar connector from the Flink project (version 1.15.3) that reads from a Pulsar topic and a sink that stores data on a Pulsar topic. The source is configured with the following configuration data: • PULSAR_AUTO_COMMIT_CURSOR_INTERVAL = 1000 • PULSAR_ACK_RECEIPT_ENABLED = true • PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE = true We also have the checkpointing set to 1000 ms with checkpointing mode set to EXACTLY_ONCE. The problem is that when a job restarts (for any reason) we lost some events - they are read from an input topic but not stored on the output topic. Are there any configuration options that we may use to make this not happening? Thanks in advance :)
  • s

    Sébastien

    12/05/2023, 8:48 AM
    Hello everyone ! I need help building a Sink Producer with flink pulsar connector. I would like to send messages into Pulsar with a key message to consume it by the Key_Shared subscription type. I tried to create a schema like this :
    Copy code
    Schema<CustomTypeClass> schema =
              AvroSchema.of(
                  SchemaDefinition.<CustomTypeClass>builder()
                      .withPojo(CustomTypeClass.class)
                      .withJSR310ConversionEnabled(true)
                      .build());
    
    
    PulsarSink.<KeyValue<String, CustomTypeClass>>builder()
                .setServiceUrl(pulsarServiceURL)
                .setAdminUrl(pulsarAdminURL)
                .setProducerName("flink-producer")
                .setTopics(topicName)
                .setSerializationSchema(Schema.KeyValue(Schema.STRING, schema, KeyValueEncodingType.SEPARATED))
    But on the consumer side, I see all messages are consumed by all consumers. I'm wondering if I might have a problem with the schema declaration. PulsarWriter seems to add a key based on the key message (https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java#L198), but I have the feeling that it doesn't because the schema context is incorrect (https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java#L143C83-L143C83). So I would like to know If KeyValueSchemaFactory or KeyValueSchemaImpl.of ( ... ) is the correct way or if I'm missing something... Thanks !
  • r

    Ruturaj

    02/06/2024, 12:09 PM
    https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1707220926420399 We are currently stuck with this issue ☝️ any help would be appreciated 🥺
  • r

    Rami Youssef

    02/11/2024, 7:34 PM
    @Rami Youssef has left the channel
  • m

    monicadeshmukh99

    09/11/2024, 1:50 PM
    Hi All! I am new to Flink. Am trying to use flink pulsar connector for receiving streaming data from pulsar, using Java. Has anyone tried this. I used the sample code given in their documentation. It does not throw any error but is not reading any data from pulsar bus. Also trying to create a sink using the connector, which is not working either, no errors. Examples given online are mostly for batch data. Your help is appreciated.
    d
    • 2
    • 49
  • h

    Henry

    10/11/2024, 3:07 AM
    Hi All. Is anyone here using python? I have a message Id that I want to read back but it's an error. This is my code. from pulsar import MessageId message_id = (456, 3623, 0, -1) pulsar_message_id = MessageId(partition=0,ledger_id=456,entry_id=3623,batch_index=-1) message = consumer.receive(pulsar_message_id) ERROR: Argument timeout_millis is expected to be of type 'int' and not 'MessageId' ---> Looks like this is the wrong way to read messageId right?
  • k

    KimB

    04/04/2025, 10:54 AM
    Is anyone working on Apache Flink Pulsar Connector thats compatible with Flink 1.20 or the 2.x release?
    s
    v
    • 3
    • 2