Hi team, does anyone know if one real time table c...
# getting-started
e
Hi team, does anyone know if one real time table can ingest data from two kafka topics?
f
nope 🙂 Question asked many times here 😉
e
ahh… so if I really want to consume data from two topics, do I have to merge the two streams before ingestion like using Flink to combine the two streams first?
My use case is: • I have one main event stream
A
, which has the primary key
c
, a boolean field
d
(default is
false)
, and some other fields • I have another lower volume stream
B
, which also has the same primary key
c
and the boolean field
d
. • I want to have a real-time table that uses stream A as the source, and use the stream
B
to update the boolean field
d
based on the primary key. Any suggestions how to achieve this?
f
ingest in seperate table then join with trino 🙂 Or even using now multistage query directly in pinot
n
@Eric Liu currently, a single pinot cannot ingest from more than one input topic. Iiuc, you want to achieve a stream-stream join during ingestion. This can be done via stream processing frameworks prior to ingestion or as @francoisa suggested, ingest into separate tables and join at query time. for the data in your lookup stream (low volume
B
), is the data mostly static, meaning are there going to be multiple updates for the same primary key ? I am trying to understand if this is an actual stream-stream join or more like a data enrichment usecase.
e
Thanks Francoisa and Navina for the answers. The low volume lookup stream
B
is also
id
based event stream, not static. It can have multiple updates (at most 2 times for the same primary key, e.g. first time an event from
B
comes in w/
d
as false, and later another event comes in as
true
. The combinations of the possibility are true + false, true + true, or just true). The high volume stream A can be hundreds of GBs or TBs, and the low volume B might be ten to hundreds of GBs, but potentially to grow larger. Some questions: • How is the performance look like in general for joins at query time at this scale using either Trino or V2 query engine? • How far is the V2 query engine from production ready?
n
@Rong R can you help answer the above questions from Eric ? ☝️
r
• If both table are partition by the primary join key it should be fairly fast. If you join with not the partition key it would be much slower and would probably perform similar than trino. • We are targeting the end of July as our production "beta" ready. The copartition optimizer is still under development. But you can try it out now and see
e
Thanks all! I will try out the V2 query first and see.
r
Thank you please share your feedback so we can improve upon the use case :-)
e
Will do.
If both table are partition by the primary join key it should be fairly fast.
Rong, could you clarify a bit about the
primary join key
? In my case, the primary key field of the high volume table is a composite key, let’s say
column1
+
column2
. For the low volume table, I can make the primary key be
column1
+
column2
as well, or just be
column1
by rolling it up. Do you mean I need to partition both tables by •
column1
and
column2
• or just
column1
since
column1
is the primary join key?
One more question:
The intermediate stages of the multi-stage engine are running purely on heap memory, thus executing a large table join will cause potential out-of-memory errors
• Because of this, the table scan phase for join queries is limited to 10 million rows.
Is this just a current limitation, or a long-term thing? Does the
scan phase
mean the rows being scanned is limited, or after the scanning the rows being fetched into the next phase is limited? (still trying to get familiar w/ Pinot’s terms…) If it’s the former one, then the table in my case is much larger than that scale. Would scaling servers help resolve/mitigate the issue?
r
Good question. Our goal is to tackle queries that could potentially push down to leaf stage to filter out and exclusively send the necessary data over the wire. If we were to ship giant amount of data across the wire it is not going to be efficient anyway
In this case a pre-partition by the join key will help a lot
👍 1