https://pinot.apache.org/ logo
m

Machhindra

05/26/2021, 3:41 AM
@Neha Pawar I need help in Kafka streaming with avro schema. Here is my avro schema. Main object is Metric. It contains one MetricSource nested object. I could stream Metric.asvc fields like product/productversion/metricPath. I dont know how to map MetricSource. I would like to map as follows - MetricSource_time, MetricSource_metric, MetricSource_metricValue, MetricSource_category, MetricSource_subCategory, Metric_product, Metric_productVersion, Metric_metricpath
Copy code
Metric.avsc
===========
{
 "namespace": "com.blah",
 "name": "Metric",
 "type": "record",
 "fields": [{
    "name": "product",
    "type": ["string", "null"]
    },{
    "name": "productVersion",
    "type": ["string", "null"]
    },{
    "name": "MetricSource",
    "type": ["com.blah.MetricSource", "null"]
    },{
    "name": "metricPath",
    "type":{
       "type": "array",
       "items": ["string", "null"]
    }
   }]
}

MetricSource.avsc
===========
{
 "namespace": "com.blah",
 "name": "MetricSource",
 "type": "record",
 "fields": [{
     "name": "metric",
     "type": ["string", "null"]
     },{
     "name": "metricValue",
     "type": ["string", "null"]
     },{
     "name": "time",
     "type": "long"
     },{
     "name": "timeOffset",
     "type": "double"
     },{
     "name": "category",
     "order": "ignore",
     "type": ["null", "string"],
     "default": null
     },{
     "name": "subCategory",
     "order": "ignore",
     "type": ["null", "string"],
     "default": null
     }
    ]
 }
n

Neha Pawar

05/26/2021, 3:54 AM
Use
transformFunction: jsonPathString(MetricSource, '$.time', <default value>)
This is an example to extract "time"
m

Machhindra

05/26/2021, 4:12 AM
Do I need to extract at query time? I was hoping to map the fields at the ingestion time.
r

RK

05/26/2021, 5:06 AM
@Machhindra you need to do this while ingestion in your table-config-file.json.
Like this.
👍 1
m

Machhindra

05/26/2021, 6:02 PM
@RK thanks for suggestion. I added the transformFunction to table schema as follows. It worked.
Copy code
{
  "name": "metric",
  "dataType": "STRING",
  "transformFunction": "jsonPathString(MetricSource, '$.metricClass', 'null')"
}
n

Neha Pawar

05/26/2021, 6:05 PM
Put this in table confg, not schema
m

Machhindra

05/26/2021, 6:06 PM
okay. It is strange that it worked even I put it in table schema.. I will move to table config.
yes, it used to be in schema. We have deprecated it. So better to move to table
1
m

Machhindra

05/26/2021, 6:07 PM
Ah..
@Neha Pawar I managed to get data from our kafka topic to Pinot. So far good progress. Here is how my table look like as of today -
Copy code
metric  metricValue       entityMap                                     metricTime
======================================================================================
CPUTIME	  2210	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000
WAITTIME	12	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000
MQBPUSE%	50	{"MqBPool":"2","MqQmgr":"MQA3"}	                        1622076960000
MQBPUSE%   110	{"MqBPool":"3","MqQmgr":"MQA3"}	                        1622076960000
WLMPI	   0.5	{"ZosWmSysSum":"jobs"}	                                1622076960000
MQQFULL%	10	{"MqQueue":"myqueue","MqQmgr":"CSQ4"}	                1622076960000
....
....
CPUTIME	  2220	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622083650000
WAITTIME	12	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622083650000
MQBPUSE%	50	{"MqBPool":"2","MqQmgr":"MQA3"}	                        1622083650000
MQBPUSE%   110	{"MqBPool":"3","MqQmgr":"MQA3"}	                        1622083650000
WLMPI	   0.5	{"ZosWmSysSum":"jobs"}	                                1622083650000
MQQFULL%	10	{"MqQueue":"myqueue","MqQmgr":"CSQ4"}	                1622083650000
....
This is operational metric data. Just like Prometheus. 300K/Minute. There are different metrics, being collected at regular intervals. The metrics are measured for given entity in the entityMap. e.g CPUTIME metric is collected for a particular plan, package and Db2 server in the first row. Here are some of the queries, I wish to fire on this table.
Copy code
# Get me timeseries for the given metric, plan, package and server
select * from metricTable where metric='CPUTIME' and Plan='myplan' and Pkg='mypkg' and Server='myDb2' and metrictime < 1622076960000

# Get me two timeseries for the given two metric, plan, package and server
select * from metricTable where metric IN ['CPUTIME', 'WAITTIME'] and Plan='myplan' and Pkg='mypkg' and Server='myDb2' and metricTime < 1622076960000

# Get me all timeseries for the given plan irrespective of package or server
select * from metricTable where metric='CPUTIME' and Plan='myplan' and metricTime < 1622076960000

# Get me top 10 highest CPUTIME package for a given Server
# Get me average CPUTIME for packages for a given Server with time window of 10 minutes
# Get me list of Plans for a given Server
# Get total CPUTIME used for a given Server
How should I flatten the “entityMap” column so that I can satisfy my queries? Not sure if multi-valued column would be helpful.
r

RK

05/27/2021, 5:38 AM
@Machhindra while selecting try giving column name using json_format(column- name)
n

Neha Pawar

05/27/2021, 6:01 AM
@Kishore G does this look like a good fit for using json index?
k

Kishore G

05/27/2021, 6:06 AM
yes, this is perfect case for json index
m

Machhindra

05/27/2021, 4:21 PM
Thanks Neha. I reading through the documentation. Once you apply, JSON index, I will have to use queries like
Copy code
SELECT ... FROM metricTable WHERE metric='CPUTIME' and JSON_MATCH(entityMap, '"$.Plan"=''myplan''')
Instead of
Copy code
select * from metricTable where metric='CPUTIME' and Plan='myplan'
n

Neha Pawar

05/27/2021, 5:40 PM
correct. Tagging the author @Jackie for more help if you need
m

Machhindra

05/27/2021, 5:41 PM
I was hoping to run queries as. - select * from metricTable where metric=‘CPUTIME’ and Plan=‘myplan’
this is more intuitive and work seamlessly with Superset
I am trying to learn how can I use query - “SELECT ... FROM metricTable WHERE metric=‘CPUTIME’ and JSON_MATCH(entityMap, ’“$.Plan”=‘’myplan’’')” from superset.
j

Jackie

05/27/2021, 6:19 PM
We are working on an effort to add the
JSON
data type and simplify the query from
SELECT ... FROM metricTable WHERE metric='CPUTIME' and JSON_MATCH(entityMap, '"$.Plan"=''myplan''')
to
SELECT ... FROM metricTable WHERE metric='CPUTIME' and entityMap.Plan='myplan')
👍 1
Currently it is not supported yet
@Xiang Fu Can we do the
JSON_MATCH
within the superset?
x

Xiang Fu

05/27/2021, 6:33 PM
I think you can
just check the generated query from superset
m

Machhindra

06/03/2021, 2:33 PM
@Neha Pawar @Xiang Fu @Jackie @RK Added JSON index as suggested.
Copy code
"jsonIndexColumns": [
        "entityMap"
      ],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "metric"
      ],
Here is how my table looks -
Copy code
select * from metric_v6 where entityMap!='null' limit 10

category	      entityMap	          groupId	                                 metric	
Type74Subtype1	{"Volser":"HFSMND"}	1aeadda1-86e1-4ee4-aa4e-d067a381db0a	DCTAVG_VOLSER
Type74Subtype1	{"Volser":"NMD008"}	632afe50-a42c-4c15-b0cd-9c6a4e0e08b1	DRTAVG_VOLSER
Type74Subtype1	{"DeviceNo":"0x6E06"}	eadee40b-3c02-49f8-a295-0b059606a3f5	DQTAVG_DEVNUM
Db2Subsys	    {"Db2Bufp":"BP42","Db2Ssid":"DH3G"}	f2a9ccc7-6aa6-4713-8382-f7b96f62c124	IDBBREAD
Notice that the entityMap JSON object differs for each row. Following query fails -
Copy code
SELECT * FROM metric_v6 WHERE JSON_MATCH(entityMap, '"$.Volser"=''HFSMND''')

[
  {
    "errorCode": 200,
    "message": "QueryExecutionError:\njava.lang.RuntimeException: Caught exception while running CombinePlanNode.\n\tat org.apache.pinot.core.plan.CombinePlanNode.run(CombinePlanNode.java:157)\n\tat org.apache.pinot.core.plan.InstanceResponsePlanNode.run(InstanceResponsePlanNode.java:33)\n\tat org.apache.pinot.core.plan.GlobalPlanImplV0.execute(GlobalPlanImplV0.java:45)\n\tat org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl.processQuery(ServerQueryExecutorV1Impl.java:294)\n\tat org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl.processQuery(ServerQueryExecutorV1Impl.java:215)\n\tat org.apache.pinot.core.query.executor.QueryExecutor.processQuery(QueryExecutor.java:60)\n\tat org.apache.pinot.core.query.scheduler.QueryScheduler.processQueryAndSerialize(QueryScheduler.java:157)\n\tat org.apache.pinot.core.query.scheduler.QueryScheduler.lambda$createQueryFutureTask$0(QueryScheduler.java:141)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat shaded.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)\n\tat shaded.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)\n\tat shaded.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)"
  }
]
After facing these challenges, I decided to create new REALTIME table with the indexes on from the very beginning. The number of rows are 2.8M vs 93M on the previous table. Now I dont receive the ERRORS but no result is shown either.
For the community - Following query -
Copy code
'SELECT * FROM metric_v7 WHERE JSON_MATCH(entityMap, 'Db2Bufp=''BP20''')'
works now. Team Pinot recently changed the syntax of the JSON_MATCH which is not included in 
0.7.1
It looks like, I hit another bump. I cant fire simple query in console today. This may be due to sortedcolumn or json index.
Copy code
select * from metric_v7 limit 10

[
  {
    "errorCode": 200,
    "message": "QueryExecutionError:\njava.lang.IndexOutOfBoundsException\n\tat java.nio.Buffer.checkIndex(Buffer.java:551)\n\tat java.nio.DirectByteBuffer.getInt(DirectByteBuffer.java:684)\n\tat org.apache.pinot.core.segment.memory.PinotByteBuffer.getInt(PinotByteBuffer.java:144)\n\tat org.apache.pinot.core.io.util.FixedByteValueReaderWriter.getInt(FixedByteValueReaderWriter.java:35)\n\tat org.apache.pinot.core.segment.index.readers.sorted.SortedIndexReaderImpl.getDictId(SortedIndexReaderImpl.java:83)\n\tat org.apache.pinot.core.segment.index.readers.sorted.SortedIndexReaderImpl.readDictIds(SortedIndexReaderImpl.java:108)\n\tat org.apache.pinot.core.segment.index.readers.sorted.SortedIndexReaderImpl.readDictIds(SortedIndexReaderImpl.java:33)\n\tat org.apache.pinot.core.common.DataFetcher$ColumnValueReader.readStringValues(DataFetcher.java:439)\n\tat org.apache.pinot.core.common.DataFetcher.fetchStringValues(DataFetcher.java:146)\n\tat org.apache.pinot.core.common.DataBlockCache.getStringValuesForSVColumn(DataBlockCache.java:194)\n\tat org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet.getStringValuesSV(ProjectionBlockValSet.java:94)\n\tat org.apache.pinot.core.common.RowBasedBlockValueFetcher.createFetcher(RowBasedBlockValueFetcher.java:64)\n\tat org.apache.pinot.core.common.RowBasedBlockValueFetcher.<init>(RowBasedBlockValueFetcher.java:32)\n\tat org.apache.pinot.core.operator.query.SelectionOnlyOperator.getNextBlock(SelectionOnlyOperator.java:82)"
  }
]
j

Jackie

06/04/2021, 5:23 PM
This is unexpected. I think it is caused by adding a sorted column during the consumption
Can you please file an issue for this with the details?
m

Machhindra

06/04/2021, 5:25 PM
I will try. zookeeper exited. I restarted it now. Restarted server. Lets see. As I am running in the docker container, now I might have lost the data as well.
@Jackie I restarted the pinot cluster with the clean start. This time I did not add the sorted index on “metric” column. There are 52M records. I dont face the problem noted above. I suspect ‘sorted’ index is the culprit.
j

Jackie

06/08/2021, 4:20 PM
@Machhindra Have you tried starting a new table with sorted index on the metric? We don't usually put sorted index on metrics, but it should work
m

Machhindra

06/08/2021, 4:53 PM
I will try once I finish my benchmarking.
We don’t usually put sorted index on metrics
Can you please explain why? here is my table. There are about 1500 unique metrics. These metrics are collected at 1 min+ interval. Most of the queries will involve metric in predicate.
Copy code
metric  metricValue       entityMap                                     metricTime
======================================================================================
CPUTIME	  2210	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000
WAITTIME	12	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000
MQBPUSE%	50	{"MqBPool":"2","MqQmgr":"MQA3"}	                        1622076960000
MQBPUSE%   110	{"MqBPool":"3","MqQmgr":"MQA3"}	                        1622076960000
WLMPI	   0.5	{"ZosWmSysSum":"jobs"}	                                1622076960000
MQQFULL%	10	{"MqQueue":"myqueue","MqQmgr":"CSQ4"}	                1622076960000
....
....
CPUTIME	  2220	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622083650000
WAITTIME	12	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622083650000
MQBPUSE%	50	{"MqBPool":"2","MqQmgr":"MQA3"}	                        1622083650000
MQBPUSE%   110	{"MqBPool":"3","MqQmgr":"MQA3"}	                        1622083650000
WLMPI	   0.5	{"ZosWmSysSum":"jobs"}	                                1622083650000
MQQFULL%	10	{"MqQueue":"myqueue","MqQmgr":"CSQ4"}	                1622083650000
....
j

Jackie

06/08/2021, 5:11 PM
Oh, so
metric
is actually a dimension column
Sorted index should work. Please let us know if you run into the same problem after adding the sorted index
m

Machhindra

06/08/2021, 5:17 PM
yes.. I am reading from my Kafka topic. I need to flatten the schema. I wish if I could create a table as follows -
Copy code
CPUTIME  WAITTIME   MQBPUSE% ...'1400 metrics..'  Plan     Pkg    Server  MQBPool MqQmgr  ZosWmSysSum .. metricTime
======================================================================================================================== 
2210                                              myplan   mypkg   myDb2                                 1622076960000
         12                                       myplan   mypkg   myDb2                                 1622076960000
                     50                                                     2     MQA3                   1622076960000
This table has METRICS, DIMENSIONS and TIMESTAMP. Data is similar to prometheus metrics.
j

Jackie

06/08/2021, 5:50 PM
Any reason why you want to flatten the metrics? Flattening all 1500 metrics would cause a too wide table
n

Neha Pawar

06/08/2021, 6:04 PM
have you tried using
jsonPathString
from the initial suggestions for getting the fields out of entityMap? this is a finite set right
Plan     Pkg    Server  MQBPool MqQmgr  ZosWmSysSum
?
m

Machhindra

06/08/2021, 6:06 PM
Ultimately, I want to enable dashboards on this data using Superset or other BI tool. Queries are going to be -
Copy code
get me average of CPUTIME for PLAN Blah and PKG Blah for a time range
get me top 10 PLANS which has highest CPUTIME usage
get me list of PKGS where PLAN Blah
I need to expose metrics vs dimensions to user, so that with Superset they can easily create charts.
Copy code
With Short Table and JSON_PATH and JSON Index

metric  metricValue       entityMap                                     metricTime
======================================================================================
CPUTIME	  2210	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000
WAITTIME	12	{"Plan":"myplan","Pkg":"mypkg", "Server":"myDb2"}	    1622076960000

Query is going to be -
SELECT AVG(metricValue) from metricTable where JSON_PATH(entityMap, 'Plan=''myplan''') and metric='CPUTIME';
Copy code
With Wide Table. No JSON

CPUTIME  WAITTIME   MQBPUSE% ...'1400 metrics..'  Plan     Pkg    Server  MQBPool MqQmgr  ZosWmSysSum .. metricTime
======================================================================================================================== 
2210                                              myplan   mypkg   myDb2                                 1622076960000
         12                                       myplan   mypkg   myDb2                                 1622076960000

Query is going to be -
SELECT AVG(CPUTIME) from metricTable where Plan='myplan';
n

Neha Pawar

06/08/2021, 6:10 PM
how many distinct “metric” do you expect? and how many distinct fields inside “entityMap” ?
you can keep metric and metricValue as it is, but still flatten the entityMap
j

Jackie

06/08/2021, 6:10 PM
The query for short table should be:
SELECT AVG(metricValue) FROM metricTable WHERE JSON_MATCH(entityMap, 'Plan=''myplan''') and metric='CPUTIME'
with JSON index
1
m

Machhindra

06/08/2021, 6:12 PM
@Neha Pawar - JSON_PATH works. BUT look at the way users need to query. The ‘wide table’ query looks very natural. Today there are 80 such EntityMap (dimensions) columns. Note that not all dimentions will be filled for each incoming metric. CPUTIME metric is for DB2 Plan and Package. Where as MQBPUSE is meant for Buffers.
how many distinct “metric” do you expect? and how many distinct fields inside “entityMap” ?
Distinct Metrics = 1500 Distinct fields inside “entityMap” = 80
n

Neha Pawar

06/08/2021, 6:21 PM
if that is more natural, then you can extract them all, or maybe the important ones?
are there a set of fields in entityMap that are going to be more frequently called than others?
m

Machhindra

06/08/2021, 6:24 PM
The query for short table should be: 
SELECT AVG(metricValue) FROM metricTable WHERE JSON_MATCH(entityMap, 'Plan=''myplan''') and metric='CPUTIME'
 with JSON index
Yes. That’s what I am using now. In Superset, this is how user uses it -
x

Xiang Fu

06/08/2021, 6:30 PM
In superset, you can create columns with pinot expressions
e.g. a column called
Plan
the expression is
JSON_MATCH(entityMap, 'Plan=''myplan''')
m

Machhindra

06/09/2021, 4:47 AM
@Neha Pawar @Xiang Fu
if that is more natural, then you can extract them all, or maybe the important ones?
Following query works. It allows me to filter the dimensions and metric and gets me the average metric value.
SELECT AVG(metricValue) FROM metricTable WHERE JSON_MATCH(entityMap, 'Plan=''myplan''') and metric='CPUTIME'
However above method does not help, if I want to create a new metric (CPUTIME * WAITTIME/10) . What will be the query? The wide table enables that possibility. e.g -
Copy code
SELECT CPUTIME * WAITTIME/1 AS newMetric FROM metricTable WHERE Plan='myplan';

OR 

SELECT  CPUTIME * WAITTIME/1 AS newMetric FROM metricTable WHERE JSON_MATCH(entityMap, 'Plan=''myplan''')
If we keep the dimensions in the JSON format then how would I list all the Plans available?
Copy code
SELECT  JSON_MATCH(entityMap, 'Plan=''myplan''') AS Plans FROM metricTable
x

Xiang Fu

06/09/2021, 5:10 AM
from superset side, you can create a new column and set the expression then reuse that
you can replace the column with the expression
for
CPUTIME * WAITTIME
, as they are not in the same record, so you need to align the docs then do join to make them same line then multiplier. You can use presto to achieve this
for extract unique values youcan do
SELECT  distinct(JSON_MATCH(entityMap, 'Plan=''myplan''')) AS Plans FROM metricTable
or
SELECT  JSON_MATCH(entityMap, 'Plan=''myplan''') AS Plans, count(*) as cnt FROM metricTable GROUP BY JSON_MATCH(entityMap, 'Plan=''myplan''') AS Plans
m

Machhindra

06/10/2021, 9:19 PM
@Xiang Fu - Above queries does not fetch any result.
x

Xiang Fu

06/10/2021, 9:37 PM
Hmm, ok can you try to play around with the single quote?
6 Views