Sanyog Singh
04/04/2024, 7:32 AMNir Bar On
04/05/2024, 11:52 AM{
"type": "longSum",
"name": "packets",
"fieldName": "packets"
}
question - what is the different between “name” and “fieldName” , what propose of each .. ?Bhavok
04/05/2024, 12:42 PMAnshu Makkar
04/06/2024, 8:06 AMChris
04/07/2024, 3:26 PMParth Agrawal
04/08/2024, 2:45 PM28 Mar 2024 @ 15:58:05.260 UTC Got shutdown request for task[index_kafka_<datasource>_cd0cec90105b458_<D2>]. Asking worker[<indexer-URL>] to kill it.
28 Mar 2024 @ 15:58:05.260 UTC Shutdown [index_kafka_<datasource>_cd0cec90105b458_<D2>] because: [An exception occurred while waiting for task [index_kafka_<datasource>_cd0cec90105b458_<D2>] to pause: [org.apache.druid.java.util.common.ISE: Task [index_kafka_<datasource>_cd0cec90105b458_<D2>] failed to change its status from [READING] to [PAUSED], aborting]]
28 Mar 2024 @ 15:58:05.261 UTC Shutdown [index_kafka_<datasource>_cd0cec90105b458_<D2>] because: [shut down request via HTTP endpoint]
From the code, we see that it goes into this part:
"// Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active
// now after all, for two possible reasons:
//
// 1) A replica may have beat us to publishing these segments. In this case we want to delete the
// segments we pushed (if they had unique paths) to avoid wasting space on deep storage.
// 2) We may have actually succeeded, but not realized it due to missing the confirmation response
// from the overlord. In this case we do not want to delete the segments we pushed, since they are
// now live!"
and we see the log line:
28 Mar 2024 @ 16:03:05.262 UTC. Encountered exception in run() before persisting.
28 Mar 2024 @ 16:03:05.263 UTC Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName='index_kafka_<datasource>_cd0cec90105b458_0', assignments=[], startOffsets={226=1034753252873, 46=1680429116193}, exclusiveStartPartitions=[], endOffsets={226=1034767190850, 46=1680445277001}, sentinel=false, checkpointed=true}]
28 Mar 2024 @16:03:05.266 UTC. Failed publish, not removing segments: [<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104, <datasource>_2024-03-28T15:40:00.000Z_2024-03-28T15:50:00.000Z_2024-03-28T15:40:00.047Z_282]
28 Mar 2024 @ 16:08:39.422 UTC. Found existing pending segment [<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104] for sequence[index_kafka_<datasource>_cd0cec90105b458_0] (previous = [null]) in DB
Now, the successor tasks which are created for these tasks, say E1 and E2 (replicas of each other), are directly trying to publish the segments corresponding to sequence 5. Thus, they fail because the offset in the metadata store is still at startOffset of sequence_0 . So, now we are in a state, where, after every intermediateHandoff
time, whenever the task tries to publish segments, it fails because the offsets do not match with the value stored in the metadata store.
However, the tasks are able to get the sequence-segment mapping and are committing metadata corresponding to it. We also see SegmentAllocateAction
run for all the sequences in E1 task.
Sequence_0 corresponds to the segment telemetry_metrics_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104
. This info becomes available to the successor tasks, as can be seen here:
28 Mar 2024 @ 16:35:42.453 UTC. Committing metadata[AppenderatorDriverMetadata{segments={index_kafka_<datasource>_cd0cec90105b458_0=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:40:00.000Z_2024-03-28T15:50:00.000Z_2024-03-28T15:40:00.047Z_282, state=APPENDING}, SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_1=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_135, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_2=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_227, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_3=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_231, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_4=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_233, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_5=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_254, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_6=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_325, state=APPENDING}], index_kafka_<datasource>_cd0cec90105b458_7=[SegmentWithState{segmentIdentifier=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_329, state=APPENDING}]}, lastSegmentIds={index_kafka_<datasource>_cd0cec90105b458_0=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104, index_kafka_<datasource>_cd0cec90105b458_1=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_135, index_kafka_<datasource>_cd0cec90105b458_2=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_227, index_kafka_<datasource>_cd0cec90105b458_3=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_231, index_kafka_<datasource>_cd0cec90105b458_4=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_233, index_kafka_<datasource>_cd0cec90105b458_5=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_254, index_kafka_<datasource>_cd0cec90105b458_6=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_325, index_kafka_<datasource>_cd0cec90105b458_7=<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_329}, callerMetadata={nextPartitions=SeekableStreamEndSequenceNumbers{stream='<streamName>', partitionSequenceNumberMap={226=1034768328624, 46=1680444294138}}}}] for sinks[<datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_227:1, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_104:64, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_325:1, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_135:8, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_254:2, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_233:1, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_231:1, <datasource>_2024-03-28T15:40:00.000Z_2024-03-28T15:50:00.000Z_2024-03-28T15:40:00.047Z_282:58, <datasource>_2024-03-28T15:50:00.000Z_2024-03-28T16:00:00.000Z_2024-03-28T15:50:00.039Z_329:1].
Thus, the sequenceMap had been updated by D2 task while the offsetMap had not been updated. We tried to restart overlord, but that did not help us mitigate this issue. We ultimately come out of this issue by setting the replicas to 1 and when the taskGroup (denoted by its prefix cd0cec90105b458
) changes.
The final state we arrive at is the same as the one described in this issue: https://github.com/apache/druid/issues/8139#issuecomment-801706838
the system gets into a bad state where the supervisor is firing up new tasks based on the offsets that the previous tasks should've worked their way up to, but the previous tasks are not around to complete their job.
We have these queries:
1. How is this sequenceMap passed across tasks and how do the E1 task become aware that they should start publishing directly from sequence 5 instead of sequence 0 ? My expectation was that the follow-up tasks would try to publish all the sequences starting from sequence_0 as its offset matches with the value stored in the MDS.
2. Why is D1 directly publishing for sequence 4? It should also have published for sequences 0, 1, 2, 3.
3. What caused the sequenceMap and the offsetMap to go out of sync? Is the process of publishing segments and updating Metadata store with sequences and offsets not atomic?
4. Why do we have so many sequences created for such a small window? Even the offset difference is small for the later sequences. Who creates a new sequence? When is it triggered?
5. If we kill a task manually from the UI, is the atomicity of the transaction respected?
cc: @Xavier, @PANKAJ KUMARKrishna
04/09/2024, 3:18 AMindex_hadoop
job
2024-04-09T03:06:09,556 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2024-04-09T03:06:09,562 WARN [task-runner-0-priority-0] org.apache.hadoop.mapreduce.JobResourceUploader - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2024-04-09T03:06:10,052 WARN [DataStreamer for file /tmp/hadoop-yarn/staging/d_cidruid/.staging/job_1712631488122_0002/job.split] org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
java.lang.NoSuchMethodError: 'sun.misc.Cleaner sun.nio.ch.DirectBuffer.cleaner()'
at org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40) ~[hadoop-common-2.8.5.jar:?]
at org.apache.hadoop.crypto.CryptoOutputStream.freeBuffers(CryptoOutputStream.java:291) ~[hadoop-common-2.8.5.jar:?]
at org.apache.hadoop.crypto.CryptoOutputStream.close(CryptoOutputStream.java:225) ~[hadoop-common-2.8.5.jar:?]
at java.io.FilterOutputStream.close(FilterOutputStream.java:188) ~[?:?]
at java.io.FilterOutputStream.close(FilterOutputStream.java:188) ~[?:?]
at org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:987) ~[hadoop-hdfs-client-2.8.5.jar:?]
at org.apache.hadoop.hdfs.DataStreamer.endBlock(DataStreamer.java:625) ~[hadoop-hdfs-client-2.8.5.jar:?]
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:801) ~[hadoop-hdfs-client-2.8.5.jar:?]
Exception in thread "DataStreamer for file /tmp/hadoop-yarn/staging/d_cidruid/.staging/job_1712631488122_0002/job.split" java.lang.NoSuchMethodError: 'sun.misc.Cleaner sun.nio.ch.DirectBuffer.cleaner()'
at org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
at org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:683)
at org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:317)
at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
at org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:996)
at org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:839)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:834)
2024-04-09T03:06:10,073 WARN [DataStreamer for file /tmp/hadoop-yarn/staging/d_cidruid/.staging/job_1712631488122_0002/job.splitmetainfo] org.apache.hadoop.hdfs.DataStreamer - DataStreamer Exception
java.lang.NoSuchMethodError: 'sun.misc.Cleaner sun.nio.ch.DirectBuffer.cleaner()'
at org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40) ~[hadoop-common-2.8.5.jar:?]
at org.apache.hadoop.crypto.CryptoOutputStream.freeBuffers(CryptoOutputStream.java:291) ~[hadoop-common-2.8.5.jar:?]
at org.apache.hadoop.crypto.CryptoOutputStream.close(CryptoOutputStream.java:225) ~[hadoop-common-2.8.5.jar:?]
Krishna
04/09/2024, 7:02 AM{
"ingestionStatsAndErrors": {
"type": "ingestionStatsAndErrors",
"taskId": "index_hadoop_datsource_dadkjpdk_2024-04-09T05:48:36.827Z",
"payload": {
"ingestionState": "BUILD_SEGMENTS",
"unparseableEvents": null,
"rowStats": {
"determinePartitions": {
"rowsProcessedWithErrors": 0,
"rowsProcessed": 2554582,
"rowsUnparseable": 0,
"rowsThrownAway": 0
},
"buildSegments": {
"rowsProcessedWithErrors": 0,
"rowsProcessed": 2554582,
"rowsUnparseable": 0,
"rowsThrownAway": 0
}
},
"errorMsg": "java.lang.RuntimeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask.runInternal(HadoopIndexTask.java:507)\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask.runTask(HadoopIndexTask.java:297)\n\tat org.apache.druid.indexing.common.task.AbstractTask.run(AbstractTask.java:169)\n\tat org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:477)\n\tat org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:449)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask.renameSegmentIndexFilesJob(HadoopIndexTask.java:594)\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask.runInternal(HadoopIndexTask.java:472)\n\t... 8 more\nCaused by: java.lang.reflect.InvocationTargetException\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask.renameSegmentIndexFilesJob(HadoopIndexTask.java:588)\n\t... 9 more\nCaused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found\n\tat org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2369)\n\tat org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)\n\tat org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2810)\n\tat org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)\n\tat org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)\n\tat org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)\n\tat org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)\n\tat org.apache.druid.indexer.JobHelper.renameIndexFilesForSegments(JobHelper.java:649)\n\tat org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner.runTask(HadoopIndexTask.java:958)\n\t... 14 more\nCaused by: java.lang.ClassNotFoundException: Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found\n\tat org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273)\n\tat org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)\n\t... 22 more\n",
"segmentAvailabilityConfirmed": false,
"segmentAvailabilityWaitTimeMs": 0
}
}
}
Krishna
04/09/2024, 7:03 AMPeter Marshall
04/09/2024, 8:27 AMJianshu Chi
04/09/2024, 6:32 PMSegmentMeatadata
query, i found that minmax is not supported for numeric columns (links here). Any reason we are not supporting minmax for numeric columns?Rahul Sharma
04/10/2024, 5:53 AMMax ZorN
04/10/2024, 9:00 AMSadananda Aithal
04/10/2024, 9:14 AMUday Singh Matta
04/11/2024, 4:35 AM{
"queryType": "groupBy",
"dataSource": "EnergyMeterST501",
"intervals": [
"2023-12-23T00:00+03:00/2024-03-23T23:59:59+03:00"
],
"granularity": {
"type": "period",
"period": "P1D",
"timeZone": "Asia/Riyadh"
},
"dimensions": [
"deviceID"
],
"metric": "count",
"aggregations": [
{
"type": "count",
"name": "count",
"fieldName": "deviceID"
}
],
"having": {
"type": "filter",
"filter": {
"type": "in",
"dimension": "deviceID",
"values": [
"2020070100"
]
}
}
}
Slack ConversationSwapnil Jokhakar
04/11/2024, 2:26 PMSELECT
dim_field_1,
LOOKUP(CONCAT(dim_field_1, ''), 'lookup_1') AS "lookup_1_val",
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_2') AS "lookup_2_val",
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_3') AS "lookup_3_val",
dim_field_2,
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_4') AS "lookup_4_val",
(SUM(cost)) AS "cost"
FROM test_table
WHERE
dim_field_2 IN (2040)
AND __time >= '2024-03-01 00:00:00' AND __time <= '2024-03-31 23:59:59'
AND dim_field_1 IN ('1608826050', '1608826054', '1608826048', '1608818855', '1608818825', '1609091475', '1608818895', '1608818852', '1608818865', '1608818753', '1608818775', '1608818724', '1609027835', '1608273535', '1608916347', '1608268938', '1608818799', '1608818766', '1608818852', '1609569765', '1608273487', '1609347997', '1609477098', '1608826096', '1609559940', '1608916420')
GROUP BY
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_3'),
dim_field_1,
LOOKUP(CONCAT(dim_field_1, ''), 'lookup_1'),
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_3'),
LOOKUP(CONCAT(dim_field_2, ''), 'lookup_4'),
dim_field_2
In above query I have added all fields available in the select clause except the field on with aggregation function in group by clause in the different order then the field added to the select clause and in this case this query returns records with 1:M mapping between dim_field_2 field (key) and lookup_2 value, if I change the order of the group by and use the same order in which fields are added in the select clause then query returns records without including 1:M mapping. This seems to be an odd behavior. Could anyone please help here to understand the reason why the above query is returning records with 1:M mapping between dim_field_2 field (key) and lookup_2 value?
I have also tried to query the lookup_2 by adding the filter on value field in whare clause with the list of values which I am getting as 1:M mapping in the above query result and this query return result with 1:1 mapping only, this is an expected behavior as Druid Lookups can have 1:1 mapping between the key and value.Ashish Kumar
04/12/2024, 7:10 AMindex-parallel
& msqe
in two different tables.
The Total data size in the table which is ingested through index-parallel is ~ 190mb although the total data size in the table which is i*ngested using the msqe is ~166 mb.*
Do anyone knows why the data size is different in both the table, despite count of rows are same.Bharat
04/12/2024, 8:56 AMAmit Jain
04/12/2024, 10:30 AMPio Salvatore MORRONE (KEBULA)
04/12/2024, 1:25 PMPINNURI SAIKRISHNA PRASAD
04/13/2024, 5:11 AMIgor Berman
04/15/2024, 10:22 AMMax ZorN
04/15/2024, 11:39 AMsriramdas sivasai
04/16/2024, 6:31 AMNimrod Lahav
04/16/2024, 9:42 AM29.0.1
)
• https://hub.docker.com/r/apache/druid
• docker file: https://github.com/apache/druid/blob/master/distribution/docker/Dockerfile
and security scans showing 35 Critical
and 84 High
wanted to ask how did you manage to deploy this on an enterprise / prod env that wont allow critical findings, or is there any other image we can use?AR
04/16/2024, 3:15 PMKai Sun
04/17/2024, 12:03 AMdruid.processing.buffer.sizeBytes
my understanding of this config is the size of intermediate buffer or merge buffer size. The question is this, if we set a small value, say 10M, will it cause groupby or topN query failure?
The underlying reason is that each thread in the processing pool when processing the hydrants/segment, it would use one piece of this buffer. Thus a large configuration of threads say 100 with buffer size as 500M would use 50G. That is pretty huge, meaning one middle manager server may hold only one or two peons.AR
04/17/2024, 5:23 AMAR
04/17/2024, 5:32 AMJRob
04/17/2024, 3:35 PM