its in your ingestion job
# general
x
its in your ingestion job
y
that leads to a heavier migration. in the short term, is it possible to do the input type conversion at query time?
x
Then it’s still fasthll
I remember the ingestion job did hyperloglog object computation
Then convert it to string
Then dump to Kafka to be consumed by Pinot
y
i see. so sth like
distinctcounthll(toByte(column))
is equivalent to
fasthll(column)
distinctcounthll
just expects the serialization done early in the ingestion phase?
x
Yes
y
what happened is our user followed the guide, and simply changed
fasthll
to
distinctcounthll
. the query runs but they saw drastically different result,
x
I will read source code and confirm
If this toBytes is just simple java string to bytes
y
i think some checker in the
disintcounthll
to validate the expected format would be helpful, and reduce such user confusion
either failing the query, or fall back to
fasthll
would be good
x
there is certain conventions to convert string to bytes for hll object
Copy code
private static HyperLogLog convertStringToHLL(String value) {
    char[] chars = value.toCharArray();
    int length = chars.length;
    byte[] bytes = new byte[length];
    for (int i = 0; i < length; i++) {
      bytes[i] = (byte) (chars[i] - BYTE_TO_CHAR_OFFSET);
    }
    return ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytes);
  }
there is no simple way to do it heuristically to fallback as people may want to directly apply disctinctHLL on a string column like city_name
y
this is used in
FastHLLAggregationFunction
but not
distinctcounthll
?
x
yes, FastHLLAggregationFunction
y
my point is that if user uses
distinctounthll
on string value directly
x
right
say I may store uuid into string
and wanna do disctincouthll
y
hmm, you can at least to type checking of bytes instead of string?
x
Copy code
@Override
  public void aggregate(int length, AggregationResultHolder aggregationResultHolder, BlockValSet... blockValSets) {
    DataType valueType = blockValSets[0].getValueType();
    if (valueType != DataType.BYTES) {
      HyperLogLog hyperLogLog = getDefaultHyperLogLog(aggregationResultHolder);
      switch (valueType) {
        case INT:
          int[] intValues = blockValSets[0].getIntValuesSV();
          for (int i = 0; i < length; i++) {
            hyperLogLog.offer(intValues[i]);
          }
          break;
        case LONG:
          long[] longValues = blockValSets[0].getLongValuesSV();
          for (int i = 0; i < length; i++) {
            hyperLogLog.offer(longValues[i]);
          }
          break;
        case FLOAT:
          float[] floatValues = blockValSets[0].getFloatValuesSV();
          for (int i = 0; i < length; i++) {
            hyperLogLog.offer(floatValues[i]);
          }
          break;
        case DOUBLE:
          double[] doubleValues = blockValSets[0].getDoubleValuesSV();
          for (int i = 0; i < length; i++) {
            hyperLogLog.offer(doubleValues[i]);
          }
          break;
        case STRING:
          String[] stringValues = blockValSets[0].getStringValuesSV();
          for (int i = 0; i < length; i++) {
            hyperLogLog.offer(stringValues[i]);
          }
          break;
        default:
          throw new IllegalStateException(
              "Illegal data type for DISTINCT_COUNT_HLL aggregation function: " + valueType);
      }
    } else {
      // Serialized HyperLogLog
      byte[][] bytesValues = blockValSets[0].getBytesValuesSV();
      try {
        HyperLogLog hyperLogLog = aggregationResultHolder.getResult();
        if (hyperLogLog != null) {
          for (int i = 0; i < length; i++) {
            hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
          }
        } else {
          hyperLogLog = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[0]);
          aggregationResultHolder.setValue(hyperLogLog);
          for (int i = 1; i < length; i++) {
            hyperLogLog.addAll(ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize(bytesValues[i]));
          }
        }
      } catch (Exception e) {
        throw new RuntimeException("Caught exception while merging HyperLogLogs", e);
      }
    }
  }
in
DistinctCountHLLAggregationFunction
, type check is on bytes
if it’s bytes, then convert it to hll object directly
otherwise, will just treat it as a normal column which requires entry by entry aggregation
y
hmm, then this doesn't quite explain why we saw different results from
discountCountHLL
and
fastHLL
x
DistinctCountHLL will treat each row as a string and do cardinality estimation
One thing you can try is to limit filtering to select only one record
And see the results
y
cool. thanks. @Ting Chen ^
x
We can setup a video call if you wanna schedule one
I can explain how that works
also how the ingestion works