francoisa
03/25/2022, 4:26 PMMayank
SegmentPurger
minion job that you may want to customize to query what records to purge.francoisa
03/28/2022, 9:07 AMrecordPurger
Mayank
Jack
03/29/2022, 4:50 PMMinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(new YourCustomRecordPurgerFactory());
francoisa
03/30/2022, 6:32 AMfrancoisa
03/30/2022, 8:57 AMMayank
francoisa
04/05/2022, 3:40 PM"task": {
"taskTypeConfigsMap": {
"PurgeTask": {
"schedule": "0 */10 * * * ?"
}
}
},
francoisa
04/05/2022, 3:42 PMCaused by: java.lang.IllegalStateException: Task type: PurgeTask is not registered
😕 Any idea welcomed 😕 After understanding all this I will redact a document to help future people like me 😄francoisa
04/05/2022, 4:01 PMfrancoisa
04/06/2022, 9:18 AMMayank
francoisa
04/06/2022, 1:16 PMBaseMinionStarter
in the start()
methodJack
04/06/2022, 5:07 PMRecordPurgerFactory
which overrides the getRecordPurger()
method in your pinot-minion code base. Then, before initializing MinionStarter
, you need to register your customized RecordPurgerFactory
like this:
// Prepare your own PinotConfig
PinotConfiguration config = new PinotConfiguration();
...
// Set your own RecordPurgerFactory
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(new YourCustomRecordPurgerFactory());
// Initialize minion starter
BaseMinionStarter minionStater = new MinionStarter(helixClusterName, zkStr, config);
minionStarter.start();
In your pinot-controller code base, you’d have to implement your own PurgeTaskGenerator
which implements PinotTaskGenerator
interface. The most important logic is in public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
method, ie. given a list of table configs, generates the list of PinotTaskConfig
for your tables.
Then in your controller codebase:
// Initialize a controller starter
ControllerStarter controllerStarter = new ControllerStarter(pinotConfiguration);
controllerStarter.start();
// Register Purge Task
PinotTaskManager taskManager = controllerStarter.getTaskManager();
PurgeTaskGenerator purgeTaskGenerator = PurgeTaskGenerator.getInstance();
purgeTaskGenerator.init(taskManager.getClusterInfoAccessor());
taskManager.registerTaskGenerator(purgeTaskGenerator);
francoisa
04/06/2022, 5:36 PMPurgeTaskGenerator
is the missing part 😕 I’ve already done the minon part (just a stupid logic to test 😉 )
public void start()
throws Exception {
<http://LOGGER.info|LOGGER.info>("Starting Pinot minion: {}", _instanceId);
Utils.logVersions();
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(new SegmentPurger.RecordPurgerFactory() {
@Override
public SegmentPurger.RecordPurger getRecordPurger(String rawTableName) {
SegmentPurger.RecordPurger p = new SegmentPurger.RecordPurger() {
@Override
public boolean shouldPurge(GenericRow row) {
return true;
}
};
return p;
}
});
Jack
04/06/2022, 5:37 PMfrancoisa
04/06/2022, 6:11 PMpublic List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
seems to be really complex 😕francoisa
04/07/2022, 7:31 AMControllerStarter.java
looks like
//INIT CONTROLLER STARTER
ControllerStarter starter = new ControllerStarter(conf);
starter.start();
//Register PurgeTask
PinotTaskManager taskManager = starter.getTaskManager();
PurgeTaskGenerator purgeTaskGenerator = new PurgeTaskGenerator();
purgeTaskGenerator.init(taskManager.getClusterInfoAccessor());
taskManager.registerTaskGenerator(purgeTaskGenerator);
return starter;
And I keep exactly the same error Task type: PurgeTask is not registered
. @Userfrancoisa
04/07/2022, 1:54 PM2022/04/07 15:40:00.021 ERROR [ErrorLogger] [DefaultQuartzScheduler_Worker-4] Job (PurgeTask.actions_REALTIME threw an exception.
org.quartz.SchedulerException: Job threw an unhandled exception.
at org.quartz.core.JobRunShell.run(JobRunShell.java:213) [pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor.getClusterConfig(String)" because "this._clusterInfoAccessor" is null
at org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator.getTaskTimeoutMs(BaseTaskGenerator.java:46) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
at org.apache.pinot.controller.helix.core.minion.PinotTaskManager.scheduleTask(PinotTaskManager.java:457) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
at org.apache.pinot.controller.helix.core.minion.PinotTaskManager.scheduleTask(PinotTaskManager.java:515) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
at org.apache.pinot.controller.helix.core.minion.CronJobScheduleJob.execute(CronJobScheduleJob.java:53) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
... 1 more
francoisa
04/07/2022, 1:55 PMfrancoisa
04/07/2022, 2:33 PMBaseSingleSegmentConversionExecutor.java
francoisa
04/08/2022, 8:01 AM"INFO": "org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 5 attempts",
After a bit more log checking I’ve found that the segmentUpload is trying to push in offline table 😕 Any way to get it pushing to real time
Processing upload request for segment: actions__0__0__20220405T0951Z of table: actions_OFFLINE from client: localhost, ingestion descriptor: null
Jack
04/08/2022, 6:10 PMfrancoisa
04/08/2022, 6:53 PMfrancoisa
04/25/2022, 2:38 PMMayank
vishal
01/03/2023, 9:44 AMfrancoisa
01/03/2023, 9:55 AMRecordPurgerFactory
(in this factory you will have to define you own business logic )in the BaseMinionStarter
Then in the table definition you will have to set the schedule of this task. this for now only works on offline table. I’ve made an implementation witch works on realtime but no more time to work on it 😕 PR still on github 😉vishal
01/03/2023, 9:56 AMvishal
01/03/2023, 10:49 AMfrancoisa
01/03/2023, 10:55 AMvishal
01/03/2023, 12:12 PMvishal
01/03/2023, 12:16 PMfrancoisa
01/03/2023, 12:29 PMvishal
01/03/2023, 12:32 PMvishal
01/03/2023, 12:33 PM"task": {
"taskTypeConfigsMap": {
"PurgeTask": {
"schedule": "0 */10 * * * ?"
}
}
},
is this enough?