Yarden Rokach
San Kumar
03/25/2025, 1:27 PMSan Kumar
03/26/2025, 12:42 PMSan Kumar
03/26/2025, 12:42 PMKC Satish
03/27/2025, 9:28 AMpiby
03/28/2025, 9:47 AMMayank
charlie
03/28/2025, 7:43 PMYarden Rokach
telugu bharadwaj
04/04/2025, 8:30 AMSan Kumar
04/04/2025, 8:46 AM"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"schedule": "*/15 * * ** ?",
"bucketTimePeriod": "6h",
"bufferTimePeriod": "1d"
}
}
when we create off line table on same schema then what will file format it will store what file format?
in offline table we have config
"ingestionConfig": {
"continueOnError": false,
"rowTimeValueCheck": false,
"batchIngestionConfig": {
"consistentDataPush": false,
"segmentIngestionType": "APPEND",
"segmentIngestionFrequency": "HOURLY"
},
is there anything we have to specify for file format in offline table when there is an hybrid table.How I ensure that offline table also use same file compression as real time.
in summary
Hybrid table definition if Realtime --> proto buffet --> what will offline table segment format type.is it csv /json/proto. How I can confirm
Hybrid table definition if real time Avro ===>> what will offline table segment typeSan Kumar
04/04/2025, 8:52 AMShiva Sharana
04/04/2025, 9:35 AMSan Kumar
04/05/2025, 1:10 PMSan Kumar
04/06/2025, 4:50 AMGeorgi Andonov
04/06/2025, 9:34 PMkranthi kumar
04/09/2025, 8:54 AMkranthi kumar
04/09/2025, 9:43 AMSan Kumar
04/10/2025, 9:27 AMevent_time, source, country, event_type
epochmilli1, nokia, ind, event1
epochmilli1, nokia, ind, event2
epochmilli2, samsung, USA, event2
We uploaded this data to Pinot, resulting in the following records:
epochmilli1, nokia, ind, event2
epochmilli2, samsung, USA, event0
Subsequently, we received new data:
epochmilli1, nokia, ind, event3
epochmilli1, Apple, ind, event0
Our program processed this new data and prepared it as follows:
epochmilli1, nokia, ind, event3
epochmilli1, Apple, ind, event0
We aim to update the segment to reflect the final records, which should be:
epochmilli1, nokia, ind, event3
epochmilli2, samsung, USA, event0
epochmilli1, Apple, ind, event0
To clarify, we want to replace the record for epochmilli1
where the device type is nokia
and add any new records for device types that are not already in the database.
Could you please provide guidance on how to achieve this update in the Pinot table?
we will use offline table only no kafka topic use and no upserttableLakshya Devani
04/10/2025, 9:41 AM2025/04/10 09:34:36.948 INFO [HelixInstanceDataManager] [HelixTaskExecutor-message_handle_thread_21] Deleted segment: ruleset_evaluation_shadow_test_realtime_2__2__47__202
50410T0520Z from table: ruleset_evaluation_shadow_test_realtime_2_REALTIME
2025/04/10 09:34:37.177 INFO [RealtimeSegmentDataManager_ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [ruleset_evaluation_shadow_test_realtime_2__2__4
9__20250410T0716Z] Recreating stream consumer for topic partition ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2, reason: Too ma
ny transient errors
2025/04/10 09:34:37.177 ERROR [KafkaConsumer] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [Consumer clientId=ruleset_evaluation_shadow_test_realtime
_2_REALTIME-topic.ruleset.evaluation.shadow.test-2, groupId=msk_consumer] Failed to close coordinator
org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1026) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync(ConsumerCoordinator.java:1087) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:919) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2366) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2333) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2283) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.close(KafkaPartitionLevelConnectionHandler.java:118) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.closePartitionGroupConsumer(RealtimeSegmentDataManager.java:1256) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.recreateStreamConsumer(RealtimeSegmentDataManager.java:1860) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.handleTransientStreamErrors(RealtimeSegmentDataManager.java:436) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:484) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:766) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.InterruptedException
... 18 more
2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics scheduler closed
2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Closing reporter org.apache.kafka.common.metrics.JmxReporter
2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics reporters closed
2025/04/10 09:34:37.178 INFO [AppInfoParser] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] App info kafka.consumer for ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2 unregistered
what is happening in these logs
kranthi kumar
04/10/2025, 11:36 AMCaused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from any of the providers in the chain AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(), EnvironmentVariableCredentialsProvider(), WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])), ContainerCredentialsProvider(), InstanceProfileCredentialsProvider()]) : [SystemPropertyCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., EnvironmentVariableCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., WebIdentityTokenCredentialsProvider(): Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set., ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])): Profile file contained no credentials for profile 'default': ProfileFile(sections=[]), ContainerCredentialsProvider(): Cannot fetch credentials from container - neither AWS_CONTAINER_CREDENTIALS_FULL_URI or AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variables are set., InstanceProfileCredentialsProvider(): Failed to load credentials from IMDS.]
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:130)
at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:129)
at software.amazon.awssdk.auth.credentials.AwsCredentialsProvider.resolveIdentity(AwsCredentialsProvider.java:54)
at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.lambda$trySelectAuthScheme$4(S3AuthSchemeInterceptor.java:163)
at software.amazon.awssdk.core.internal.util.MetricUtils.reportDuration(MetricUtils.java:77)
at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.trySelectAuthScheme(S3AuthSchemeInterceptor.java:163)
at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.selectAuthScheme(S3AuthSchemeInterceptor.java:84)
at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.beforeExecution(S3AuthSchemeInterceptor.java:64)
at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.lambda$beforeExecution$1(ExecutionInterceptorChain.java:59)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.beforeExecution(ExecutionInterceptorChain.java:59)
at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.runInitialInterceptors(AwsExecutionContextBuilder.java:248)
at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:138)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:67)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$0(BaseSyncClientHandler.java:62)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:60)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:52)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:60)
at software.amazon.awssdk.services.s3.DefaultS3Client.getObject(DefaultS3Client.java:5570)
at org.apache.pinot.plugin.filesystem.S3PinotFS.copyToLocalFile(S3PinotFS.java:569)
at org.apache.pinot.spi.filesystem.NoClosePinotFS.copyToLocalFile(NoClosePinotFS.java:98)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:263)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:212)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1047)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1047)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2539)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
at org.apache.spark.scheduler.Task.run(Task.scala:152)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
My JobSpec.
executionFrameworkSpec:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner'
extraConfigs:
stagingDir: s3://aws-logs-686118793080-us-east-1/metadata/staging/
jobType: SegmentCreationAndMetadataPush
inputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/basic-attribute/date=2025-04-10/'
outputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/Output_Metadata/'
overwriteOutput: true
pinotFSSpecs:
- scheme: s3
className: org.apache.pinot.plugin.filesystem.S3PinotFS
configs:
region: 'us-east-1'
recordReaderSpec:
dataFormat: 'json'
className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'
tableSpec:
tableName: 'bat_backfill_emr_REALTIME'
schemaURI: 's3://aws-logs-686118793080-us-east-1/metadata/batSchema.json'
tableConfigURI: 's3://aws-logs-686118793080-us-east-1/metadata/batTable.json'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
pushJobSpec:
pushParallelism: 2
pushAttempts: 20
pushRetryIntervalMillis: 1000 .
My spark submit command
spark-submit --deploy-mode cluster --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand --master yarn --conf spark.driver.extraJavaOptions=-Dplugins.dir=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/ --conf spark.driver.extraClassPath=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar --jars s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-tools-1.1.0.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-spi-1.1.0.jar --files s3://aws-logs-686118793080-us-east-1/metadata/executionFrameworkSpec4.yaml local://pinot-all-0.11.0-jar-with-dependencies.jar -jobSpecFile executionFrameworkSpec4.yamlGeorgi Andonov
04/11/2025, 2:51 PM{
"REALTIME": {
"tableName": "TestRollup_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"schemaName": "TestRollup",
"replication": "1",
"replicasPerPartition": "1",
"timeColumnName": "ValueTimestamp",
"minimizeDataMovement": false
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant",
"tagOverrideConfig": {}
},
"tableIndexConfig": {
"invertedIndexColumns": [],
"noDictionaryColumns": [],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.topic.name": "rollup-price-aggregation",
"stream.kafka.broker.list": "kafka:9092",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.segment.rows": "500",
"realtime.segment.flush.threshold.time": "20m"
},
"aggregateMetrics": false,
"enableDefaultStarTree": false,
"nullHandlingEnabled": false,
"bloomFilterColumns": [],
"onHeapDictionaryColumns": [],
"rangeIndexColumns": [],
"sortedColumn": [],
"varLengthDictionaryColumns": [],
"rangeIndexVersion": 2,
"optimizeDictionaryForMetrics": false,
"optimizeDictionary": false,
"autoGeneratedInvertedIndex": false,
"createInvertedIndexDuringSegmentGeneration": false,
"loadMode": "MMAP",
"enableDynamicStarTreeCreation": false,
"columnMajorSegmentBuilderEnabled": true,
"optimizeDictionaryType": false,
"noDictionarySizeRatioThreshold": 0.85
},
"metadata": {},
"quota": {},
"task": {
"taskTypeConfigsMap": {
"MergeRollupTask": {
"5m_2m.mergeType": "rollup",
"5m_2m.bucketTimePeriod": "5m",
"5m_2m.bufferTimePeriod": "2m",
"5m_2m.roundBucketTimePeriod": "1m",
"10m_2m.mergeType": "rollup",
"10m_2m.bucketTimePeriod": "10m",
"10m_2m.bufferTimePeriod": "12m",
"10m_2m.roundBucketTimePeriod": "5m",
"Price.aggregationType": "sum",
"schedule": "0 * * * * ?"
}
}
},
"routing": {},
"query": {},
"ingestionConfig": {
"continueOnError": false,
"rowTimeValueCheck": false,
"segmentTimeValueCheck": true
},
"isDimTable": false
}
}
{
"schemaName": "TestRollup",
"enableColumnBasedNullHandling": false,
"dimensionFieldSpecs": [
{
"name": "Id",
"dataType": "INT",
"fieldType": "DIMENSION",
"notNull": false
}
],
"metricFieldSpecs": [
{
"name": "Price",
"dataType": "DOUBLE",
"fieldType": "METRIC",
"notNull": false
}
],
"dateTimeFieldSpecs": [
{
"name": "ValueTimestamp",
"dataType": "TIMESTAMP",
"fieldType": "DATE_TIME",
"notNull": false,
"format": "TIMESTAMP",
"granularity": "1:MILLISECONDS"
}
]
}
I am ingesting data from a Kafka topic and all the records have a ValueTimestamp equal to now. From my understanding, with that configuration, the merge rollup tasks in the config will be executed once a minute and create merged segments based on the provided configuration - if there are records whose timestamp is within the timeframe of (bucketStart -> bucketStart + 5m or bucketStart -> bucketStart + 10m) and are older than 2m or older than 12m they will be added to the merged segment, is that correct? Also, for the rounding of the timestamp, the records in the merged segment will have timestamps that are rounded to 1m or rounded to 5m (for example: they will have timestamps like 2025-04-11 1150/1151 or 2025-04-11 1150/1155)?San Kumar
04/13/2025, 10:30 AMSan Kumar
04/13/2025, 10:31 AMGeorgi Varbanov
04/14/2025, 12:04 PMkranthi kumar
04/15/2025, 12:23 PMjava.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.18.jar:?]
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.18.jar:?]
at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:48) ~[spark-common-utils_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) ~[spark-core_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:509) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:268) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:937) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:936) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at java.security.AccessController.doPrivileged(AccessController.java:712) [?:?]
at javax.security.auth.Subject.doAs(Subject.java:439) [?:?]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953) [hadoop-client-api-3.4.0-amzn-2.jar:?]
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:936) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
I have almost 200k files with 5000 records each , is there any Limitation on Pinot side ? thats causing this issue ?Yarden Rokach
Mannoj
04/18/2025, 8:07 AMSaravanan Subburayal
04/19/2025, 9:01 AMMannoj
04/21/2025, 9:12 AM