Jacek Wiślicki
01/13/2023, 9:19 PMSongv
02/13/2023, 11:43 AMSimi Ily
03/08/2023, 2:28 AMJacek Wiślicki
04/25/2023, 1:27 PMYu Wei Sung
05/01/2023, 7:48 PMbhasvij
07/10/2023, 2:13 PMbhasvij
07/10/2023, 2:14 PMbhasvij
07/10/2023, 2:16 PMbhasvij
07/20/2023, 4:17 PMbhasvij
07/20/2023, 4:17 PMbhasvij
07/20/2023, 4:22 PMsijieg
07/20/2023, 4:29 PMNeng
07/21/2023, 6:13 PMPaul Moore
08/16/2023, 6:27 PM023-08-15 13:14:20,533 ERROR org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase [] - Error in polling message from pulsar consumer.
java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: {"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://***][***] Transaction:(13,7569) try to ack message:336:5002 in pending ack status.","reqId":2226901394758118694, "remote":"***/***:6650", "local":"/***:59728"}
at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?]
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:97) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110) [blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56) [blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) [flink-connector-files-1.16.1.jar:1.16.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) [flink-connector-files-1.16.1.jar:1.16.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.1.jar:1.16.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: {"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [***][***] Transaction:(13,7569) try to ack message:336:5002 in pending ack status.","reqId":2226901394758118694, "remote":"***/***:6650", "local":"/***:59728"}
at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1143) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:430) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[blob_p-3d5b8b2aa78a0dba35d4947498382a701aad6098-9ee449b3d9b05a02251c357af86bc7dc:?]
... 1 more
Rafał Trójczak
11/27/2023, 9:18 AMSébastien
12/05/2023, 8:48 AMSchema<CustomTypeClass> schema =
AvroSchema.of(
SchemaDefinition.<CustomTypeClass>builder()
.withPojo(CustomTypeClass.class)
.withJSR310ConversionEnabled(true)
.build());
PulsarSink.<KeyValue<String, CustomTypeClass>>builder()
.setServiceUrl(pulsarServiceURL)
.setAdminUrl(pulsarAdminURL)
.setProducerName("flink-producer")
.setTopics(topicName)
.setSerializationSchema(Schema.KeyValue(Schema.STRING, schema, KeyValueEncodingType.SEPARATED))
But on the consumer side, I see all messages are consumed by all consumers. I'm wondering if I might have a problem with the schema declaration.
PulsarWriter seems to add a key based on the key message (https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java#L198),
but I have the feeling that it doesn't because the schema context is incorrect (https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java#L143C83-L143C83).
So I would like to know If KeyValueSchemaFactory or KeyValueSchemaImpl.of ( ... ) is the correct way or if I'm missing something...
Thanks !Ruturaj
02/06/2024, 12:09 PMRami Youssef
02/11/2024, 7:34 PM