https://pinot.apache.org/ logo
Join Slack
Powered by
# feat-pravega-connector
  • k

    Kishore G

    05/18/2020, 7:40 PM
    it might be better to schedule 2 separate zoom calls to just go over Pinot and Pravega.
    ➕ 1
  • k

    Kishore G

    05/18/2020, 7:41 PM
    In one call, you can talk about how Pravega consumers work
  • k

    Kishore G

    05/18/2020, 7:41 PM
    and we can talk about existing Kafka connectors and how our design was influenced by Kafka
  • k

    Kishore G

    05/18/2020, 7:42 PM
    give us a week or two to read about pravega
  • k

    Kishore G

    05/18/2020, 7:42 PM
    and we can schedule something end of may or early june
  • s

    Subbu Subramaniam

    05/18/2020, 8:11 PM
    Nice e-meeting you @User, looking forward to working on this
  • f

    Flavio Junqueira

    06/10/2020, 12:30 PM
    <!channel> sounds like we lost the history of the channel, but I think we didn't have anything super important there. In the interest of progress, let me recap my conversation with @User and where we left off. When we spoke, one of the main challenges was to map Pinot splits to a Pravega stream. I don't know whether splits in Pinot are always expected to be statically defined (a bounded data source) or they can be dynamic (an unbounded data source, a stream). In the case they are expected to be bounded. We can use
    SegmentIterator
    . The idea is the following: • We obtain all iterators via
    BatchClientFactory.getSegments(...)
    •
    getSegments
    returns a
    StreamSegmentsIterator
    , which iterates over
    SegmentRange
    instances, one for each stream segment •
    SegmentRange
    contains the information we need to define a split • A split can be read using
    BatchClientFactory.readSegment(...)
    , which takes a
    SegmentRange
    instance that we can build from the split information This should be simple to implement as we can distribute the split load by sending the segment range data to workers. A couple of relevant references: https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/BatchClientFactory.java https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/batch/SegmentRange.java
  • k

    Kishore G

    06/10/2020, 3:32 PM
    @User lets start a doc and save this
    👍 1
  • s

    Subbu Subramaniam

    06/10/2020, 5:38 PM
    Done, shared with Kishore. @User please share it with Flavio as well
  • s

    Subbu Subramaniam

    06/10/2020, 5:39 PM
    https://docs.google.com/document/d/1cYW6H0cKgG3NNPEFqsQM0Mc2xfKbzK64knsgUClUrgI/edit?usp=sharing
  • f

    Flavio Junqueira

    06/11/2020, 3:20 PM
    cool, thanks @User
  • f

    Flavio Junqueira

    12/17/2020, 10:17 AM
    @User @User On the Kinesis integration document, I might be able to produce a Pravega connector with the batch API of Pravega rather than the the event stream API. With the batch API, I'm able to get a list of segments within the bounds of stream cuts (a reference to a position in the stream across segments): https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/BatchClientFactory.java#L70 I have one question about
    PartitionGroupMetadata
    in the document, and I thought the description was too long to do it as a comment in the document, but if you prefer like that, then I can raise it in the document directly.
    PartitionGroupMetadata
    has a
    groupID
    attribute, which I don't understand how it maps to a group of shards/segments. For instance, say that I'm able to list segments via the iterator that
    getSegments
    returns:
    Copy code
    StreamSegmentsIterator getSegments(Stream stream, StreamCut fromStreamCut, StreamCut toStreamCut);
    https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/BatchClientFactory.java#L70 How do Pinot map those segments to a partition group? Also, such a list of segments is flat, meaning that it is not preserving a predecessor-successor order of segments. When you have scaling, there is a natural order of segments that needs to be followed to avoid breaking per-key order. The event stream API in Pravega makes such a guarantee, but the batch API doesn't. How do you do it for Kinesis?
  • k

    Kishore G

    12/17/2020, 8:14 PM
    its upto the implementation on how to map segments to partition group, by default its 1-1 mapping which is the only things we support today but we this redesign we are trying to support grouping multiple stream shards into one segment in pinot
  • f

    Flavio Junqueira

    12/17/2020, 8:18 PM
    its upto the implementation on how to map segments to partition group
    where would you implement this logic? in the implementation of the
    PartitionGroupMetadata
    interface? also, could you please comment on this:
    such a list of segments is flat, meaning that it is not preserving a predecessor-successor order of segments. When you have scaling, there is a natural order of segments that needs to be followed to avoid breaking per-key order. The event stream API in Pravega makes such a guarantee, but the batch API doesn't. How do you do it for Kinesis?
  • k

    Kishore G

    12/17/2020, 8:19 PM
    where would you implement this logic? in the implementation of the 
    PartitionGroupMetadata
     interface? • yes
  • k

    Kishore G

    12/17/2020, 8:21 PM
    such a list of segments is flat, meaning that it is not preserving a predecessor-successor order of segments. When you have scaling, there is a natural order of segments that needs to be followed to avoid breaking per-key order. The event stream API in Pravega makes such a guarantee, but the batch API doesn't. How do you do it for Kinesis?
    we provide the current segments to the interface, so the implementation should not return the new segments until the old segments have reached EOF
  • f

    Flavio Junqueira

    12/17/2020, 8:23 PM
    interesting, that's essentially the logic that the event stream reader implements, which I can't use because it requires direct access to the segments. it feels like pravega is being punished for doing the right thing because others don't do it...
  • f

    Flavio Junqueira

    12/17/2020, 8:24 PM
    I have to reimplement that logic as part of the pinot spi implementation to be able to preserve the predecessor-successor order, that's what I'm hearing, is that right?
  • k

    Kishore G

    12/17/2020, 8:27 PM
    interesting, that's essentially the logic that the event stream reader implements, which I can't use because it requires direct access to the segments. it feels like pravega is being punished for doing the right thing because others don't do it...
    😞 I think there are challenges with both models. Its basically comes down to exposing the right API
  • k

    Kishore G

    12/17/2020, 8:28 PM
    in some cases, high level abstraction is good but for systems like Pinot, KSQLdb etc low level API is better
  • f

    Flavio Junqueira

    12/17/2020, 8:47 PM
    direct access to segments/partitions give an idea of simplicity, but managing it isn't simple. given a list of segments, the implementation needs to figure out the order of segments and if pinot asks for a replay for a single segment, then the interface implementation now needs to figure out the history of successors to be able to serve the right data. and, as the stream scales up or down, there needs to be some coordination across groups otherwise groups might end up with an unbalanced load. implementing this logic is non-trivial and from this discussion it is necessary, so I'm not convinced that it is a good idea to expect this logic to be implemented as part of the pinot spi rather than relying on the streaming system to do it. the upstream streaming system should have this information and do a better job at the coordination. I'm unclear right now as how it affects the other requirements you had for which you decided to pursue this low-level approach.
  • s

    Subbu Subramaniam

    12/17/2020, 9:02 PM
    A dumb question here: Is a pravega
    segment
    same as a kinesis
    shard
    (or, kafka
    partition
    )? Just so I get the terms right
  • f

    Flavio Junqueira

    12/17/2020, 9:11 PM
    A Pravega segment is closer to a Kinesis shard in that it can be sealed (closed) and rescaled. A segment in Pravega stores a sequence of bytes, so it is possible to write and read bytes without events or messages. A Pravega segment is as sequence of events when using the event stream API.
  • f

    Flavio Junqueira

    12/17/2020, 10:58 PM
    For the sake of progress and give that conversations are ephemeral on free slack, I'm wondering what the best way to proceed is at this point. I feel that the current API is making it difficult to use Pravega features and if we follow the direction we are discussing here, the one of using the batch API, then it will be underutilized and Pinot won't be able to benefit as much from it. I'm particularly concerned about the direction of dealing with segments/shards and order of the same at the spi level. In general, Pravega is open to suggestions of API changes and I'd love to see Pravega fully utilized with Pinot. Let me know how you want to proceed.
  • k

    Kishore G

    12/17/2020, 10:59 PM
    i have been thinking about this a lot, cant come up with a solution that will work without making lot of changes in Pinot
  • k

    Kishore G

    12/17/2020, 10:59 PM
    I was going back to the other suggestion you had made
  • k

    Kishore G

    12/17/2020, 10:59 PM
    one consumer consumer from pravega and then writes to Pinot
  • k

    Kishore G

    12/17/2020, 11:00 PM
    we plan to add a write api soon
  • k

    Kishore G

    12/17/2020, 11:00 PM
    may be we should integrate with Pravega once we have write API?
  • f

    Flavio Junqueira

    12/18/2020, 12:57 PM
    one consumer consumer from pravega and then writes to Pinot
    I'm not sure which suggestion was that, can you elaborate?
    we plan to add a write api soon
    may be we should integrate with Pravega once we have write API?
    How will the write API change the picture? To restate my concerns: • Dealing with scaling segments/shards/partitions is difficult and it is not a difficulty introduced by Pravega specifically, it is inherent to a stream changing its parallelism dynamically. The current API changes proposed seem to expect the plugin implementation to both expose segments individually so that they can be arranged in groups at table creation time and to deal transparently with the order of segments/shards/partitions. • It might add to the end-to-end latency if we process events in batches by requesting data between a start and an end. I wonder if that will affect the latency perceived by real-time queries. At least for Pravega, we would be able to address those concerns in the case we use the event stream API. This API requires the use of either coarse-grained checkpointing or opaque position objects, and the assignment of segments isn't deterministic in the case of rollbacks. I believe these are the main contention points. One concern on the Pinot side is that you need Pinot segments to be deterministically written. I'm wondering if we added a hint to the event saying from which segment it is coming from would help to achieve this goal in the absence of a deterministic schedule. A segment is owned by a single reader at a time and that reader is responsible for adding to that segment. If the segment changes ownership, then the change happens at checkpoint time. At a checkpoint, all readers in the group receive a special event indicating the checkpoint. Would something like this work?