Hi All! I am trying to use flink pulsar connector ...
# troubleshooting
m
Hi All! I am trying to use flink pulsar connector for receiving streaming data from pulsar, using Java. Has anyone tried this. I used the sample code given in their documentation. It does not throw any error but is not reading any data from pulsar bus. Also trying to create a sink using the connector, which is not working either, no errors. Examples given online are mostly for batch data. Your help is appreciated.
g
This is an old repo I have with pulsar/flink examples. I haven't done pulsar in a long time so I'm not sure how outdated it is compared to later versions, but it might help https://github.com/polyzos/pulsar-flink-stateful-streams
m
I will check it out. Thank you!
Thank you for sharing the repo. My problem is a little different. I am trying to make use of flink-pulsar connectors as described here. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/pulsar/ Also my data is not csv files, its live streaming data coming from pulsar topic. So am looking for an example how to deal with streaming data.
g
csv data is to be ingested into kafka.. it consumes from kafka not csv.. and the code is the same. if you are not seeing data probably: 1. check the cursor i.e earliest vs latest 2. check the subscription hasnt already acked the data if you are just consuming the source to verify it works and you dont perform more operations, then you should see data. if you are performing more operations like time windows for example you need to set a watermark and also make sure the watermark is propagated properly
m
Copy code
mysource = PulsarSource.builder()
        .setServiceUrl(AppConfig.BROKER_SERVICE_URL)
        //.setAdminUrl(adminUrl)
        .setStartCursor(StartCursor.earliest())
        .setTopics(mytopic)
        .setDeserializationSchema(new SimpleStringSchema())
        .setSubscriptionName("my-subscription")
        .build();
This is what I have. Subscription is exclusive.
g
is it a newly created subscription and has messages to read?? and what kind of processing are you doing with the source?
m
Copy code
aisDataStream = env.fromSource(mysource, WatermarkStrategy.noWatermarks(), "My Source")
g
can u share the whole code script?
m
mysource = PulsarSource.builder() .setServiceUrl(AppConfig.BROKER_SERVICE_URL) .setStartCursor(StartCursor.earliest()) .setTopics(mytopic) .setDeserializationSchema(new SimpleStringSchema()) .setSubscriptionName("my-subscription") .build(); myDataStream = env.fromSource(mysource, WatermarkStrategy.noWatermarks(), "my Source"); PulsarSink<String> mySink = PulsarSink.builder() .setServiceUrl(AppConfig.BROKER_SERVICE_URL) .setTopics(my_output_topic) .setSerializationSchema(new SimpleStringSchema()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); myDataStream.sinkTo(mySink);
subscription got created but I didnt see any data being consumed. Also it didnt create output topic and sink didnt work.
g
i dont remember if the sink topic gets created automatically.. you can try creating it first.. and also check the subscription has messages to read (i.e it hasnt read and acknowledged any messages)
m
Copy code
<properties>
    <flink.version>1.16.0</flink.version>
    <pulsar.version>3.3.1</pulsar.version>
    <target.java.version>1.8</target.java.version>

    <maven.compiler.source>{target.java.version}</maven.compiler.source>
    <maven.compiler.target>{target.java.version}</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Pulsar client dependency -->
    <dependency>
        <groupId>org.apache.pulsar</groupId>
        <artifactId>pulsar-client-all</artifactId>
        <version>${pulsar.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>javax.activation</artifactId>
                <groupId>com.sun.activation</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jakarta.activation-api</artifactId>
                <groupId>jakarta.activation</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jakarta.ws.rs-api</artifactId>
                <groupId><http://jakarta.ws.rs|jakarta.ws.rs></groupId>
            </exclusion>
            <exclusion>
                <artifactId>jakarta.xml.bind-api</artifactId>
                <groupId>jakarta.xml.bind</groupId>
            </exclusion>
            <exclusion>
                <artifactId>validation-api</artifactId>
                <groupId>javax.validation</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jaxb-api</artifactId>
                <groupId>javax.xml.bind</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jcip-annotations</artifactId>
                <groupId>net.jcip</groupId>
            </exclusion>
            <exclusion>
                <artifactId>pulsar-package-core</artifactId>
                <groupId>org.apache.pulsar</groupId>
            </exclusion>
            <exclusion>
                <artifactId>jcommander</artifactId>
                <groupId>com.beust</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- Pulsar connector dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-pulsar</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- Java Compiler -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.10.1</version>
            <configuration>
                <source>${target.java.version}</source>
                <target>${target.java.version}</target>
            </configuration>
        </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.3.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
                <manifest>
                    <mainClass>Main</mainClass>
                </manifest>
            </archive>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
                <configuration>
                    <filters>
                        <filter>
                            META-INF/*.SF, META-INF/*.DSA, META-INF/*.RSA
                        </filter>
                    </filters>
                </configuration>
            </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.4.1</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <artifactSet>
                        <excludes>
                            <exclude>org.apache.flink:flink-shaded-force-shading</exclude>
                            <exclude>com.google.code.findbugs:jsr305</exclude>
                            <exclude>org.slf4j:*</exclude>
                            <exclude>org.apache.logging.log4j:*</exclude>
                        </excludes>
                    </artifactSet>
                    <filters>
                        <filter>
                            <!-- Do not copy the signatures in the META-INF folder.
                            Otherwise, this might cause SecurityExceptions when using the JAR. -->
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                     <transformers>
                         <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                         <mainClass>Main</mainClass>
                         </transformer>
                     </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
    <!-- <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.1.0</version>
          <executions>
              <execution>
                  <phase>package</phase>
                  <goals>
                      <goal>shade</goal>
                  </goals>
                  <configuration>
                      <minimizeJar>true</minimizeJar>
                      <filters>
                          <filter>
                              <artifact>*:*</artifact>
                              <excludes>
                                  <exclude>META-INF/*.SF</exclude>
                                  <exclude>META-INF/*.DSA</exclude>
                                  <exclude>META-INF/*.RSA</exclude>
                              </excludes>
                          </filter>
                      </filters>
                  </configuration>
              </execution>
          </executions>
      </plugin>-->
    <!-- <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
         <executions>
             <execution>
                 <phase>package</phase>
                 <goals>
                     <goal>single</goal>
                 </goals>
                 <configuration>
                     <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                     </descriptorRefs>
                 </configuration>
             </execution>
         </executions>
     </plugin> -->
    <!-- clean lifecycle, see <https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle> -->


  <!--  <plugin>
        <artifactId>maven-clean-plugin</artifactId>
        <version>3.1.0</version>
    </plugin>
     default lifecycle, jar packaging: see <https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging> -->
  <!--  <plugin>
        <artifactId>maven-resources-plugin</artifactId>
        <version>3.0.2</version>
    </plugin>
    <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.0</version>
    </plugin>
    <plugin>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.22.1</version>
    </plugin>
    <plugin>
        <artifactId>maven-jar-plugin</artifactId>
        <version>3.0.2</version>
         Build an executable JAR -->
     <!--   <configuration>
            <archive>
                <manifest>
                    <addClasspath>true</addClasspath>
                    <classpathPrefix>lib/</classpathPrefix>
                    <mainClass>Main</mainClass>
                </manifest>
            </archive>
        </configuration>
    </plugin>
    <plugin>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
    </plugin>
    <plugin>
        <artifactId>maven-deploy-plugin</artifactId>
        <version>2.8.2</version>
    </plugin>
     site lifecycle, see <https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle> -->
 <!--   <plugin>
        <artifactId>maven-site-plugin</artifactId>
        <version>3.7.1</version>
    </plugin>
    <plugin>
        <groupId>com.google.cloud.tools</groupId>
        <artifactId>jib-maven-plugin</artifactId>
        <version>3.4.3</version>
        <configuration>
            <to>
                <image>Main</image>
            </to>
        </configuration>
    </plugin>
    <plugin>
        <artifactId>maven-project-info-reports-plugin</artifactId>
        <version>3.0.0</version>
    </plugin> -->
</plugins>
</build>
Subscription has messages. I tried to manually create topic for the sink.
g
you better ask on the pulsar slack channel.. there is a channel for the connector there
m
Okay. Thank you.
d
Yes, use the pulsar admin to check for existence of topic/subscription
m
you mean pulsar client?
d
Connector might not have created the output topic in which case you can manually create it before running the job
yes …
Copy code
./pulsar-admin topics list <tenant>/<namespace>
if you installed locally or you can use rest api
Copy code
./pulsar-admin topics list public/default
make sure your flink job is actually deployed as well. You can check this in Flink admin side
Make sure AT_LEAST_ONCE aligns with expectation and that SimpleStringSchema matches the data you expect to send.
Enable debug loggins as well
Noticed you are not using watermarks. Ok if you don’t require event-time processing otherwise you need to implement watermarks
Enable Debug logging on both sides Pulsar and Flink
and yes for Pulsar specific stuff they also may have insights into this from their side.
m
I am using kubeforwarder where I can see the topic and data flowing inwards. FLink job is being deployed on the cluser, so can see it running. I have added logger to see messages/exceptions etc
data is message strings, so simplestringschema should work, right? Tried creating out topic manually, didint work.
At this time I am just trying to see if the pipeline works, so didnt add watermarks.
d
ok
make sure by viewing data that it really is just plain strings
main focus should be on logs of Flink and Pulsar to look for schema mismatches, connection attempts , or an errors of pulsar connector
m
Its josn, but I am treating it as string, will that make a difference?
in my producer (I am using pulsar producer that gets data from a stream api), I set schema as String
d
Probably not. I think the error probably lies elsewhere provided you can see it really is a json string
I think this is probably not the source of the issue. Best to see the logs
m
Its not throwing any errors, I am testing in intellij
d
how about failure to establish connections?
those are not necessarily errors?
maybe you can share logs that mention this connector?
m
how do I track those in intellij. coz I am trying to connect to pulsar that is setup in a cluster
d
how is flink and pular installed?
also what versions of these are in use?
m
I tried with Flink 1.19 and java 11. pulsar 3.3.1. Now I am planning to downgrade as I read in their documentation that the connectors are not stable on java 11. This is what I am about to try but our cluster is down so waiting on it LOL
Copy code
<flink.version>1.16.0</flink.version>
<pulsar.version>3.3.1</pulsar.version>
<target.java.version>1.8</target.java.version>
d
ok, are you running it on kubernetes?
k8s?
or docker?
m
k8s
d
if k8s do you have kubectl installed?
m
yup
d
ok so you can get logs using kubectl from the pods
Copy code
kubectl logs <pod-name>
and just need to list the pods names out
-f will stream it I think
m
I will surely try this when our cluster comes back up. But I was debugging my code in intellij so I wont have to deploy my image to the cluster each time I make a change. I was connecting to the cluser by forwarding url to locahost using kubeforwarder.
org.apache.pulsar.client.api.PulsarClientException$TimeoutException: Lookup request timeout {mytopic} Failed to get partitioned topic metadata Any clue why this may be happening.
I had a question about the flink-pulsar connector's pulsarSource. I understand that it wraps the pulsar client and consumer. I dont see an option to acknowledge messages. I was wondering as to what happens to those messages. Are they acknowledged when received by the pulsarsource, or is there a way to acknowledge messages by setting some property? At this time its flooding the pipeline with all the unacknowledged messages.
d
Concerning message acknowledgments it’s automated and tied to the checkpointing mechanism of Flink to ensure exactly-once processing semantics. When you use Flink with Pulsar, you typically don’t manually acknowledge messages as you would when using the Pulsar client directly. Messages are acknowledged automatically as part of the checkpointing process. Once a checkpoint is complete, the connector acknowledges (commits) the offsets for the processed messages to Pulsar. This means that the messages are considered successfully processed and will not be redelivered unless the checkpoint fails and needs to be retried.
Concerning the PulsarClientException$TimeoutException with the message “Lookup request timeout {mytopic} Failed to get partitioned topic metadata” This error indicates your Flink application is unable to retrieve the metadata for the specified topic within the configured timeout period. It happens while trying to connect. It could be for a few different reasons: 1. There could be network connectivity problems between your Flink application and the Pulsar broker. Check that the network path is clear and that the broker is reachable from the machine running the Flink job. 2. The Pulsar broker might be temporarily unavailable, overloaded, or not properly started. Check the Pulsar broker’s logs and health status to confirm it’s running without issues. 3. The error could occur if the topic does not exist in Pulsar or hasn’t been created by the time your Flink job starts. Pulsar topics are lazily created when using certain APIs or configurations, but if your environment requires topics to be explicitly created beforehand, ensure the topic exists. 4. Verify that the service URL (AppConfig.BROKER_SERVICE_URL) you’re using in your Flink job points to the correct Pulsar cluster and that there are no typos in the topic name. 5. The timeout could be too low for your network conditions or broker response times. Check the client configuration settings for timeouts (lookupTimeout, operationTimeout, etc.) and adjust them if necessary. These can be set when building the PulsarClient instance, although with Flink’s Pulsar connector, these are often managed internally and might not be directly configurable unless you’re customizing the connector setup yourself. 6. In a multi-broker setup, if the brokers are load balancing, it might take time for metadata about recently created topics or partitions to propagate across all brokers. Ensure that the topic metadata has propagated across all relevant brokers. For troubleshooting be sure to enable debug logging in both your Flink application and the Pulsar broker
m
Thank you so much @D. Draco O'Brien. Appreciate it.
I have tried above file with parallelism 1, memory 2048
🙌 1
So my flinkpulsar connectors are finally working. I can see data in my sink's topic. The only issue I am having is that it created multiple subscriptions for the source topic along the way. like more than 2000 subscriptions. Has anyone had this issue and know what may be causing this.
d
Could be how Flink manages its parallelism and checkpointing, especially in combination w Pulsar Flink connector config. Each parallel instance of your Flink job might create its own subscription to the Pulsar topic if not configured correctly. By default, Flink might scale tasks based on avail. resources, potentially leading to lots of subscriptions. Check that your Flink job’s parallelism matches your intended setup and that you’re not accidentally allowing uncontrolled scaling.
1
m
Thank you.