Slackbot
12/02/2021, 9:28 AMvanchhay
12/06/2021, 4:56 AM@Override
public String process(byte[] bytes, Context context) {
return "output-: " + new String(bytes);
}Then I had another 2 separated programs • producer program: which produces message to 'x' topic • consumer program: which consumes message from 'y' topic I am using FunctionLocalRunner(2.8.0).
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setName("testing");
functionConfig.setInputs(Collections.singletonList("x"));
functionConfig.setClassName(TestEffectivelyOnce.class.getName());
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
functionConfig.setOutput("y");
LocalRunner localRunner = LocalRunner.builder()
.functionConfig(functionConfig)
.brokerServiceUrl("pulsar://localhost:6650")
.build();
localRunner.start(true);While producer-program is producing the messages, I restart the FunctionLocalRunner. Result: judging from the log from consumer-program, there are duplicated messages. • duplicated message:
msg-1224
msg-1225
msg-1226
msg-1227
....Did I do something wrong or pulsar functions doesn't support this?
Slackbot
12/06/2021, 8:28 AMjerry cheng
12/09/2021, 7:29 PMSlackbot
12/09/2021, 9:22 PMSlackbot
12/10/2021, 10:22 AMSlackbot
12/13/2021, 10:22 AMSaurav Deb
12/13/2021, 10:43 AMSaurav Deb
12/13/2021, 10:45 AMSlackbot
12/13/2021, 10:46 AMSlackbot
12/14/2021, 7:30 AMSlackbot
12/14/2021, 2:46 PMSlackbot
12/15/2021, 10:16 AMSlackbot
12/16/2021, 1:55 PMSlackbot
12/19/2021, 5:12 AMSlackbot
12/20/2021, 8:53 AMSlackbot
12/27/2021, 4:36 AMSlackbot
12/29/2021, 6:34 PMSlackbot
12/30/2021, 6:59 AMRohith Gundala
01/01/2022, 10:33 AMhelm install pulsar apache/pulsar --version 2.7.2 --set initialize=true --set tls.enabled=true -n pulsar
I'm trying to use same process again to install pulsar in AKS cluster, but getting Errors
kubectl get pods -n pulsar
NAME READY STATUS RESTARTS AGE
pulsar-bookie-0 1/1 Running 0 4d21h
pulsar-bookie-1 1/1 Running 0 4d21h
pulsar-bookie-2 1/1 Running 0 4d21h
pulsar-bookie-3 1/1 Running 0 4d21h
pulsar-bookie-init-nn4zl 0/1 Completed 0 4d21h
pulsar-broker-0 0/1 CrashLoopBackOff 1221 4d9h
pulsar-broker-1 0/1 CrashLoopBackOff 1158 4d9h
pulsar-broker-2 0/1 CrashLoopBackOff 1201 4d9h
pulsar-grafana-99b4976f7-25jss 1/1 Running 0 4d21h
pulsar-prometheus-5f5fb9978b-l7rw2 1/1 Running 0 4d21h
pulsar-proxy-0 1/1 Running 0 4d21h
pulsar-proxy-1 1/1 Running 0 4d21h
pulsar-proxy-2 1/1 Running 0 4d21h
pulsar-pulsar-init-mqqv9 0/1 Completed 0 4d21h
pulsar-pulsar-manager-76c5cfb97f-8xzcg 1/1 Running 0 4d21h
pulsar-recovery-0 1/1 Running 2 4d21h
pulsar-toolset-0 1/1 Running 0 4d21h
pulsar-zookeeper-0 1/1 Running 0 4d21h
pulsar-zookeeper-1 1/1 Running 0 4d21h
pulsar-zookeeper-2 1/1 Running 0 4d21h
Error:
10:28:34.474 [pulsar-ordered-OrderedExecutor-1-0] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=pulsar-zookeeper:2181 sessionTimeout=30000 watcher=org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@198ff7fe
10:28:34.479 [pulsar-ordered-OrderedExecutor-1-0] INFO org.apache.zookeeper.common.X509Util - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
10:28:34.485 [pulsar-ordered-OrderedExecutor-1-0] INFO org.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 10485760 Bytes
10:28:34.542 [pulsar-ordered-OrderedExecutor-1-0] INFO org.apache.zookeeper.ClientCnxn - zookeeper.request.timeout value is 0. feature enabled=
10:28:54.559 [pulsar-ordered-OrderedExecutor-1-0-SendThread()] ERROR org.apache.zookeeper.client.StaticHostProvider - Unable to resolve address: pulsar-zookeeper:2181
java.net.UnknownHostException: pulsar-zookeeper: Temporary failure in name resolution
at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_282]
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) ~[?:1.8.0_282]
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) ~[?:1.8.0_282]
at java.net.InetAddress.getAllByName0(InetAddress.java:1277) ~[?:1.8.0_282]
at java.net.InetAddress.getAllByName(InetAddress.java:1193) ~[?:1.8.0_282]
at java.net.InetAddress.getAllByName(InetAddress.java:1127) ~[?:1.8.0_282]
at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92) ~[org.apache.pulsar-pulsar-zookeeper-2.7.2.jar:2.7.2]
at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147) [org.apache.pulsar-pulsar-zookeeper-2.7.2.jar:2.7.2]
at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375) [org.apache.pulsar-pulsar-zookeeper-2.7.2.jar:2.7.2]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137) [org.apache.pulsar-pulsar-zookeeper-2.7.2.jar:2.7.2]
10:28:54.570 [pulsar-ordered-OrderedExecutor-1-0-SendThread(pulsar-zookeeper:2181)] WARN org.apache.zookeeper.ClientCnxn - Session 0x0 for server pulsar-zookeeper:2181, unexpected error, closing socket connection and attempting reconnect
java.lang.IllegalArgumentException: Unable to canonicalize address pulsar-zookeeper:2181 because it's not resolvable
Do Anyone know about this Error? Please let me know how to Fix this Error?Slackbot
01/03/2022, 2:48 PMAnkita Chaudhari
01/13/2022, 10:30 AMSlackbot
01/13/2022, 5:26 PMSlackbot
01/19/2022, 11:30 AMAndrew Orlosky
01/19/2022, 3:29 PMSlackbot
01/23/2022, 11:49 PMSlackbot
01/24/2022, 9:56 AMSlackbot
01/25/2022, 5:45 AMSlackbot
01/18/2022, 1:29 PMKorn Pisey
01/31/2022, 12:26 PM@Override
public void becameActive(Consumer<?> consumer, int i) {
The problem is: I could not receive all messages from that partition via the given internal consumer of becameActive function.
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(TOPIC)
.subscriptionName("subscription-1")
.consumerEventListener(eventListener)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
Does anyone has any ideas on this matter?
I leave my test program here. You can try it with java testcontainers
dependency
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>