https://pinot.apache.org/ logo
Join Slack
Powered by
# general
  • k

    Kishore G

    11/18/2019, 6:46 PM
    That does not matter.
  • k

    Kishore G

    11/18/2019, 6:47 PM
    The requirement is just about logical clock (offset). Event time can be out of order, we don’t rely on that and user don’t have to worry that
  • k

    Kishore G

    11/18/2019, 6:48 PM
    I see how that line can be interpreted differently
  • k

    Kishore G

    11/18/2019, 6:48 PM
    We should fix that doc
  • a

    Alex

    11/18/2019, 6:50 PM
    got it, thank you!
  • a

    Alex

    11/18/2019, 11:17 PM
    @User so, looking again at our configs. The reason we went with highlevel is cause that we couln’t find any docs on low level. Examples are either using simple (0.9) or high level. des this config look sane?:
    Copy code
    "streamType": "kafka",
          "stream.kafka.consumer.type": "LowLevel",
          "stream.kafka.topic.name": "topic_name",
          "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "kafka_zk_ip:2181/",
          "stream.kafka.broker.list": "kafka_ip:9092",
          "realtime.segment.flush.threshold.time": "1h",
          "realtime.segment.flush.threshold.size": "0",
          "realtime.segment.flush.desired.size": "150M",
  • s

    Subbu Subramaniam

    11/18/2019, 11:19 PM
    In Pinot, LowLevel == Simple. It used to be that we accepted only "Simple" or "HighLevel". We have now changed to accept "LowLevel" also (replacing simple). I will clarify in the documentation.
  • s

    Subbu Subramaniam

    11/18/2019, 11:24 PM
    And yes, the config looks fine to me. You don;t need the broker list for lowlevel
  • s

    Subbu Subramaniam

    11/18/2019, 11:24 PM
    ah no,i take it back, you do,.
  • s

    Subbu Subramaniam

    11/18/2019, 11:24 PM
    so it loos fine, yes.
  • s

    Subbu Subramaniam

    11/18/2019, 11:28 PM
    Given that you are using automatic segment size determination, i would change the flush threshold to a much larger value than 1h (depending on your kafka retention tme). Say, 24h
  • a

    Alex

    11/18/2019, 11:31 PM
    @User got it. So, no broker list, only zookeper list?
  • s

    Subbu Subramaniam

    11/18/2019, 11:31 PM
    both are needed, I believe. It does not hurt to have extra configs
  • s

    Subbu Subramaniam

    11/18/2019, 11:32 PM
    checking the code now.
  • a

    Alex

    11/18/2019, 11:33 PM
    don’t see it in the KafkaPartitionLevelStreamConfig (which I guess is low level), but will try
  • a

    Alex

    11/18/2019, 11:35 PM
    and got:
    Copy code
    {
      "code": 500,
      "error": "Failed to fetch the offset for topic: flattened-orders-json-seconds, partition: 0 with criteria: OffsetCriteria{_offsetType=CUSTOM, _offsetString='earliest'}"
    }
  • a

    Alex

    11/18/2019, 11:36 PM
    when submitted realtime table config
  • s

    Subbu Subramaniam

    11/18/2019, 11:36 PM
    canu post the stack trace ?
  • a

    Alex

    11/18/2019, 11:38 PM
    Copy code
    java.lang.IllegalStateException: Failed to fetch the offset for topic: flattened-orders-json-seconds, partition: 0 with criteria: OffsetCriteria{_offsetType=CUSTOM, _offsetString='earliest'}
    	at org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.getPartitionOffset(PinotLLCRealtimeSegmentManager.java:585) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.setupNewPartition(PinotLLCRealtimeSegmentManager.java:983) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager.setUpNewTable(PinotLLCRealtimeSegmentManager.java:222) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(PinotTableIdealStateBuilder.java:115) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.helix.core.PinotHelixResourceManager.ensureRealtimeClusterIsSetUp(PinotHelixResourceManager.java:1193) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.helix.core.PinotHelixResourceManager.addTable(PinotHelixResourceManager.java:1079) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at org.apache.pinot.controller.api.resources.PinotTableRestletResource.addTable(PinotTableRestletResource.java:122) ~[pinot-controller-0.2.0-SNAPSHOT.jar:0.2.0-SNAPSHOT-eb45b438c5053f5caaf289614f386706a472947e]
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_232]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_232]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_232]
    	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_232]
    	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.internal.Errors.process(Errors.java:292) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.internal.Errors.process(Errors.java:274) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.internal.Errors.process(Errors.java:244) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265) ~[jersey-common-2.28.jar:?]
    	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679) ~[jersey-server-2.28.jar:?]
    	at org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer.service(GrizzlyHttpContainer.java:353) ~[jersey-container-grizzly2-http-2.28.jar:?]
    	at org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:200) ~[grizzly-http-server-2.4.4.jar:2.4.4]
    	at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:569) ~[grizzly-framework-2.4.4.jar:2.4.4]
    	at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:549) ~[grizzly-framework-2.4.4.jar:2.4.4]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
  • a

    Alex

    11/18/2019, 11:39 PM
    from the controller
  • s

    Subbu Subramaniam

    11/18/2019, 11:39 PM
    Can you shjare the value of stream.kafka.consumer.prop.auyo.offset.reset? I suggest you set it to "largest"
  • a

    Alex

    11/18/2019, 11:39 PM
    earliest
  • s

    Subbu Subramaniam

    11/18/2019, 11:39 PM
    auto (not auyo) 🙂
  • s

    Subbu Subramaniam

    11/18/2019, 11:39 PM
    Can yo uchange it to "smallest"?
  • s

    Subbu Subramaniam

    11/18/2019, 11:40 PM
    if ou want to consume from earliest offset?
  • a

    Alex

    11/18/2019, 11:40 PM
    that worked 🙂
  • s

    Subbu Subramaniam

    11/18/2019, 11:41 PM
    ok, I am not sure where the doc says "ear;iest' or if it says anything at all, but I will fix it.
  • s

    Subbu Subramaniam

    11/18/2019, 11:42 PM
    When creating new table it is good to use "latest" otherwise (depending on kafka retention) you coiuld be using a lot of cpu consuming in frenzy, and not have room to serve queries. In production, while creating new tables. I think Kafka 2.0 also supports setting it to (say) "2d" for example, (to get an offset that is roughly 2 days ago). Kafka documents will hopefully say better.
  • a

    Alex

    11/18/2019, 11:45 PM
    it doesn’t have earliest in the docs. I think it was called smallest before.
  • s

    Subbu Subramaniam

    11/18/2019, 11:45 PM
    https://pinot.readthedocs.io/en/latest/tableconfig_schema.html#streamconfigs-section
1...9899100...160Latest