Thomas Steinholz
10/18/2022, 4:02 PMtime_string
while the new (empty) column I added is message_time
.
I have set the new message_time
column as the time column of the table and defined table transforms and filters in both the realtime and the offline tables for the new time column to detect if the datetimes are valid and replace them with a secondary timestamp from the data row or a default value if all others are null when the timestamps are invalid, otherwise it just uses the value provided.
"transformFunction": "Groovy({
def default_time = '2014-01-01 00:00:00.000000';
def t_ingest = new groovy.json.JsonSlurper().parseText(message_str).get('__prop.t_ingest');
!time_string || time_string < default_time ?
(!t_ingest ? default_time : new Date(Long.valueOf(t_ingest)).format(\"yyyy-MM-dd'T'HH:mm:ss.SSS\")) :
time_string;
}, time_string, message_str)"
"filterFunction": "Groovy({
message_time=='2014-01-01 00:00:00.000000'
}, message_time)"
however, when I run the Realtime to Offline job I get the following warnings/error:
Default time: null does not comply with format: 1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSS, using current time: 2022-10-18T15:49:34.760 as the default time for table: uplinkpayloadevent_OFFLINE
Caught exception while executing task: Task_RealtimeToOfflineSegmentsTask_a2250009-a3b5-47c0-b495-085d15b405cf_1666108161685_0
java.lang.IllegalArgumentException: Invalid format: "null"
at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.spi.data.DateTimeFormatSpec.fromFormatToMillis(DateTimeFormatSpec.java:303) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.core.segment.processing.timehandler.EpochTimeHandler.handleTime(EpochTimeHandler.java:56) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.core.segment.processing.mapper.SegmentMapper.writeRecord(SegmentMapper.java:143) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.core.segment.processing.mapper.SegmentMapper.map(SegmentMapper.java:126) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework.process(SegmentProcessorFramework.java:96) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments.RealtimeToOfflineSegmentsTaskExecutor.convert(RealtimeToOfflineSegmentsTaskExecutor.java:163) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor.executeTask(BaseMultipleSegmentsConversionExecutor.java:165) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor.executeTask(BaseMultipleSegmentsConversionExecutor.java:62) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.minion.taskfactory.TaskFactoryRegistry$1.runInternal(TaskFactoryRegistry.java:113) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.pinot.minion.taskfactory.TaskFactoryRegistry$1.run(TaskFactoryRegistry.java:89) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at org.apache.helix.task.TaskRunner.run(TaskRunner.java:75) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
I cannot tell if the error is because the transform is running and giving a null value (which shouldn’t be possible) or if this error is because the new time column is actually null and the transforms are not being run on the rows as they are not being ingested, just processedThomas Steinholz
10/18/2022, 4:55 PMtime_string
column filtering out the bad timestamps seemed to work:
"filterFunction": "Groovy({ !time_string || time_string < '2014-01-01 00:00:00.000000' }, time_string)"