Hey everyone. I'm currently using the flink connec...
# troubleshooting
l
Hey everyone. I'm currently using the flink connector (https://github.com/apache/pinot/blob/master/pinot-connectors/pinot-flink-connector/ ) to add segments to an offline table of mine. I want to override all segments on every single run, but I see that my segments are currently being created with timestamps in the segment names (table_name_mintimevalue_maxtimevalue_sequenceId etc). What is the best approach to use to make sure I am always overriding all segments? There's a bunch of configuration options available, but I'm not sure which ones I should be using to achieve this.
n
@Yupeng Fu @Rong R
k
I believe you can set
segment-name.type
in the connector options to be
fixed
, and
segment-name.name
to be whatever you want the fixed portion of the segment name to be - the Flink subtask id gets appended, so the resulting segment names will be
<your fixed name>-0
,
<your fixed name>-1
, and so on.
So as long as the parallelism of the sink remains the same, you should get the same set of segment names.
l
That's perfect. I had assumed it would be something like that, but I couldn't see anywhere referencing the subtask id being appended to the fixed name generator
l
Yep found it there, that's great.
Thanks for your help @Ken Krugler
👍 1
r
+1. however you should know that the sequence ID being attached at the end still starts from 0. thus during the overwrite progress you might get inconsistent data.
k
I keep hoping https://github.com/apache/pinot/issues/7813 happens soon-ish :)
r
yeah, ultimately the backfill tool is not idempotent, as it has no knowledge of what the backfill data looks like. thus it is the users responsibility to ensure that the backfilling data overwrites the existing segment files correctly. one way I can think of is segment can be set to OFFLINE while uploading job runs without overwriting existing segments. then once finishes operator can switches the online/offline segments (and then subsequently operator can delete the old segments)
l
Thanks for the info, that's very helpful. Yeah, that might be a problem. I guess I could sort my splits to try avoid this issue, but it could still happen
Hey again, another question; Was there any specific reason why the flink pinot sink created only supports creating 1 segment per partition at a time?
It would be nice to be able to create multiple segments concurrently
With 2 billion rows and 4 partitions at the moment (15 million rows per segment), it's taking around 2 hours to finish one run. This is a bit worrisome for me, as I'm expecting to have around 207 billion rows for this table in the future, so it doesn't look like this will scale. Any ideas?
r
how do you mean by "4 partitions" ?
isn't the flink-connection produces segment and uploads to offline table? or do you mean you are trying to ingest into realtime table?
l
I am currently partitioning using murmur, taking a field, hashing it and % 4
No, this is for an offline table
r
in that case you can increase you flink sink parallelism, would that solve your problem?
l
I can't increase it past the numbers of partitions I have, right?
r
oh so the % 4 is a hard requirement?
l
I also tried increasing the amount of partitions, but I end up with hot partitions.
No it isn't a hard requirement
I can change the partitioning to whatever makes sense. However, my partitioning key (call it customerId) % partitions does not lead to even distribution of data
r
what's the goal you are trying to achieve by partitioning on a single field, hash it and then % 4 ?
oh I see.
l
Reduce the amount of segments required to lookup when querying the table
r
so your table config requires a partition column(s).
l
Yes
r
ahh. that makes more sense.
thanks for the explanation!
l
Can you have multiple partition columns?
r
yes you can .
l
😮
Well that would solve my problem
Can you do that for realtime as well?
l
ah beautiful
What release is this in?
r
since it was merged < 30 days ago i think it is not released yet
l
aw
Any tips on handling my current partitioning problem? I'm not entirely sure what to do to ensure I can ingest data in a timely manner, while still allowing segment pruning
r
i am not sure if there's any workaround. maybe @Yupeng Fu will have more insight^
l
Thank you 👍
I guess one potential option would be to forego partitioning, and use the bloom filter on my primary key..
but this creates a weird scenario where the amount of segments that need to be looked up isn't necessarily deterministic
y
not good ways, you need multiple column, as you need more info for partitioning
l
So I had an idea.. but I'm not sure if I'm missing something..
So I have 10 partitions, partitioned on customerId. There are multiple records per customerId, and these records have another id associated with it (Say recordId). Together, these form a composite key. If I keep the partitioning of my pinot table on the customerId with 10 partitions, I should be able to increase ingestion throughput through flink by simply keying the entire operation on that composite key, and increasing the amount of partitions/sinks in flink itself.. So it looks something like this..
Copy code
.keyBy(document -> {
   int customerIdPartitionKey = document.getCustomerId() % 10;
   int recordIdPartitionKey = document.getRecordId() % 40;
}
.addSink(sink)
.setParallelism(40);
Does that seem right to you?
This means that there are multiple sinks responsible for the same partition
If I understand this correctly
That definitely doesn't seem to work the way I expect.. 🙂
I guess I must be thinking about this incorrectly
What I had previously was this
Copy code
.partitionCustom(key -> murmur2(key), document -> key.getCustomerId()
.addSink(sink)
.setParallelism(10)
But this causes hot partitions
So the only other idea I have is to still use partition custom, but make the partitioner return a partition which corresponds to the primary key (customerId) partition + some factor. So for example, if I have 10 partitions for customerId but 40 sinks, I could map each partition to 4 sinks somehow, so 4*10 = 40... I'm not sure if I'm on a wild goose chase here
https://apache-pinot.slack.com/archives/C011C9JHN7R/p1652805610077389?thread_ts=1652803924.463889&amp;cid=C011C9JHN7R Hey @Ken Krugler. Just wanted you to know that this didn't work exactly as expected, as while the flink job does append the subtaskId to the segment name locally, it isn't carried over during upload when using the "fixed" segment name generator. I dug into it a bit more; Turns out the correct generator to use is "normalizedDate" as this generator excludes time information if the table type is set to REFRESH.
k
@Lars-Kristian Svenøy - how are you doing the upload? We generate batch segments (using Hadoop map-reduce), and then do a metadata push to upload the segments.
l
I'm using the flink connector @Ken Krugler. Currently I'm doing a tar push.. I could do a metadata push, but the bottleneck for me is the flink connector only being able to create one segment at a time per partition
k
@Lars-Kristian Svenøy Sorry, didn’t realize there was no way currently to disable upload as part of the segment build. But having the segment name changed after it’s generated seems odd. @Rong R is this expected?
@Lars-Kristian Svenøy so if I understand correctly, you’d like to build 40 segments, where the records for any given customerId are all in the same segment?
l
Well not 40 segments necessarily. Rather, 40 partitions mapping to X amount of segments. The amount of segments depends on the amount of records
k
What’s the cardinality of customerId?
Also, I remember now why it seemed weird to me to have the final segment name change - because we configure our job spec to use the
inputFile
segment name generator, as per:
Copy code
segmentNameGeneratorSpec:
  type: inputFile
  configs:
    file.path.pattern: '.+/(.+)\.gz'
    segment.name.template: '\${filePathPattern:\1}'
So our segment names wind up matching our CSV input file names.
Finally, we do something similar to what you want, but we solved it (pre-Flink connector) in our Flink job by writing out CSV files via a custom operator, where we could precisely control the partitioning and number of resulting segments (and their names)
I think one notable issue with using the Flink connector is that if your job fails for any reason partway through, you’ll have trashed your table (partial segment upload that is replacing a sub-set of the existing segments).
l
Yes you're right, Kevin
The CSV approach wouldn't work for us, as we're dealing with too much data. I'll be forking the connector and making the changes I need to make it more resilient
The cardinality of customerId isn't great, which is the reason we get hot partitions
k
For what it’s worth, the approach I’d use is to still separate out the generation of segments from the pushing of segments. When our batch job fails, it’s usually due to some input data issue (malformed, fun new skew creating issues, etc). Once all segments have been built, then it’s been pretty bomb-proof to do the metadata push of the resulting segments (which we store in HDFS).
And building segments in an operator vs. a sink provides more flexibility for partitioning. We can run that operator at an appropriate parallelism given our cluster size, but inside the operator we can build however many segments we want.
In our particular use case, we’re partitioning by country/month, and for the US (our “hot” country) we sub-partition based on a high cardinality field that’s also frequently used in queries. We also have to play games in the custom partitioner so that we do a better job of spreading the load out among Flink sub-tasks.
l
Yes that's what I'm planning on doing. But even if you exclude the pushing of the segments with the current connector, the sink is still bottlenecking on segment build.
I'm going to be writing my own connector to deal with the issue
The sub partitioning approach is the way I am going now
k
@Lars-Kristian Svenøy - just to clarify, we build the segments in a custom operator, not in a sink.
l
That makes sense
r
Hii @Lars-Kristian Svenøy I have a similar use-case and want to know if you are running flink job in BATCH execution mode ?
k
We run our Flink job in batch mode. But note that we’re generating CSV files, and doing that in an operator, so we’re not using the Flink sink.
l
Yes @Rashmin Patel. I've created my own flink sink for this. It uses the segment replacement protocol