Hi amazing pinot team. If I read well the docs the...
# general
f
Hi amazing pinot team. If I read well the docs there is a mecanism for GDPR compliance. Have you any example to share arround here. For eg I’ve customer_Id I want to purge. How can I do that ? And what happen for the curent consumming segment ? Thx for your help 😉
m
There is a
SegmentPurger
minion job that you may want to customize to query what records to purge.
f
Ok. i’ve found the github associated file. But is there anywhere a doc to show how it works with an exemple ? I’m a bit lost with the way to implement the
recordPurger
m
@User ^^
j
When starting up the minion service, you can register your custom factory like this:
Copy code
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(new YourCustomRecordPurgerFactory());
f
Ok. I’m a bit far away to be a Pinot dev ... I will look a bit around what have to be implemented in this factory. Thanks for you feedback.
Other questions linked with this one. 1-How does this affect consuming segment and comited segment ? Both are purged ? 2-What is the best way to grab the ID list to purge ? Any best practice to share ?
m
1. Only committed segments are purged. Consuming segments are typically in that state only for few hours anyways, so doesn’t make sense to purge them, as that can happen once they are committed. 2. Typically data platforms would have some sort of micro-service that provides a list of IDs to purge based on a data set.
f
Ok I’ve managed to get pieces of the puzzle a bit more together ... not sure to be at the end 😄 So I’ve craft my minion version by setting the setRecordPurgerFactory with my logic. On the table side I’ve added a task in my table spec like this
"task": {
"taskTypeConfigsMap": {
"PurgeTask": {
"schedule": "0 */10 * * * ?"
}
}
},
But the controller say
Caused by: java.lang.IllegalStateException: Task type: PurgeTask is not registered
😕 Any idea welcomed 😕 After understanding all this I will redact a document to help future people like me 😄
Digging a bit more ... when I go to api and check http://IP:9000/tasks/tasktypes I only get RealtimeToOfflineSegmentsTask 😕
@User or @User any clues to get it working ? I’m a bit lost. I’ve check the code PurgeTask is registered in the minion constants but literally lost a piece somewhere 😕
m
Where have you implemented your purge task and how are providing it to Pinot?
f
For a first POoC directly in the
BaseMinionStarter
in the
start()
method
j
@User you can implement your own customized
RecordPurgerFactory
which overrides the
getRecordPurger()
method in your pinot-minion code base. Then, before initializing
MinionStarter
, you need to register your customized
RecordPurgerFactory
like this:
Copy code
// Prepare your own PinotConfig
PinotConfiguration config = new PinotConfiguration();

...

// Set your own RecordPurgerFactory
MinionContext minionContext = MinionContext.getInstance();
minionContext.setRecordPurgerFactory(new YourCustomRecordPurgerFactory());

// Initialize minion starter
BaseMinionStarter minionStater = new MinionStarter(helixClusterName, zkStr, config);

minionStarter.start();
In your pinot-controller code base, you’d have to implement your own
PurgeTaskGenerator
which implements
PinotTaskGenerator
interface. The most important logic is in
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
method, ie. given a list of table configs, generates the list of
PinotTaskConfig
for your tables. Then in your controller codebase:
Copy code
// Initialize a controller starter
          ControllerStarter controllerStarter = new ControllerStarter(pinotConfiguration);
          controllerStarter.start();
          // Register Purge Task
          PinotTaskManager taskManager = controllerStarter.getTaskManager();
          PurgeTaskGenerator purgeTaskGenerator = PurgeTaskGenerator.getInstance();
          purgeTaskGenerator.init(taskManager.getClusterInfoAccessor());
          taskManager.registerTaskGenerator(purgeTaskGenerator);
f
Ok seems that the
PurgeTaskGenerator
is the missing part 😕 I’ve already done the minon part (just a stupid logic to test 😉 )
Copy code
public void start()
    throws Exception {
  <http://LOGGER.info|LOGGER.info>("Starting Pinot minion: {}", _instanceId);
  Utils.logVersions();
  MinionContext minionContext = MinionContext.getInstance();

  minionContext.setRecordPurgerFactory(new SegmentPurger.RecordPurgerFactory() {
    @Override
    public SegmentPurger.RecordPurger getRecordPurger(String rawTableName) {
      SegmentPurger.RecordPurger p = new SegmentPurger.RecordPurger() {
        @Override
        public boolean shouldPurge(GenericRow row) {
          return true;
        }
      };
      return p;
    }
  });
j
Awesome! So the remaining thing is to register your purgeTaskGenerator to controller once it’s implemented
f
Wow many thanks for the help. It still not trivial the
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs)
seems to be really complex 😕
It still not working I’m maybe putting things in the wrong place my
ControllerStarter.java
looks like
Copy code
//INIT CONTROLLER STARTER
    ControllerStarter starter = new ControllerStarter(conf);
    starter.start();
    //Register PurgeTask
    PinotTaskManager taskManager = starter.getTaskManager();
    PurgeTaskGenerator purgeTaskGenerator = new PurgeTaskGenerator();
    purgeTaskGenerator.init(taskManager.getClusterInfoAccessor());
    taskManager.registerTaskGenerator(purgeTaskGenerator);
    return starter;
And I keep exactly the same error
Task type: PurgeTask is not registered
. @User
Ok I’ve manage the register the task. But now I’m facing a
Copy code
2022/04/07 15:40:00.021 ERROR [ErrorLogger] [DefaultQuartzScheduler_Worker-4] Job (PurgeTask.actions_REALTIME threw an exception.
org.quartz.SchedulerException: Job threw an unhandled exception.
	at org.quartz.core.JobRunShell.run(JobRunShell.java:213) [pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor.getClusterConfig(String)" because "this._clusterInfoAccessor" is null
	at org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator.getTaskTimeoutMs(BaseTaskGenerator.java:46) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	at org.apache.pinot.controller.helix.core.minion.PinotTaskManager.scheduleTask(PinotTaskManager.java:457) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	at org.apache.pinot.controller.helix.core.minion.PinotTaskManager.scheduleTask(PinotTaskManager.java:515) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	at org.apache.pinot.controller.helix.core.minion.CronJobScheduleJob.execute(CronJobScheduleJob.java:53) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	at org.quartz.core.JobRunShell.run(JobRunShell.java:202) ~[pinot-all-0.11.0-SNAPSHOT-jar-with-dependencies.jar:0.11.0-SNAPSHOT-5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351]
	... 1 more
Learning it the hard way 😕
ok manage to get task running but throw parseLongException in the
BaseSingleSegmentConversionExecutor.java
Allright ... a quick recap on this long story 🙂 1- I’ve implemented PurgeTaskGenrator (with the correct annotation so it’s pick up a Controller Launch) 2- Task are registered 3- I’ve generate a correct task list with crc (my previous error) 4- On the minion side I’ve implemented the recordPurgerFactory All the mecanics arround task seems to be Ok but I keep getting errors on Segment upload so the purge is’nt working. If I check task on Zookeeper I get
"INFO": "org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 5 attempts",
After a bit more log checking I’ve found that the segmentUpload is trying to push in offline table 😕 Any way to get it pushing to real time
Processing upload request for segment: actions__0__0__20220405T0951Z of table: actions_OFFLINE from client: localhost, ingestion descriptor: null
j
@User May I know what is the retention for your realtime table? IIRC there is an upper time limit for GDPR. If the retention of your realtime table is under that limit you don’t need to submit purge task for realtime segments
f
Was'nt linked to that I was hoping to purge a realtime table wich is not possible now. Many thanks for the help provided 😉
Hi 😉 As promised I’ve submited a PR to at least implements the purge task as a normal build-in minion task. And I’ve managed to submit my PR as promised 😉 https://github.com/apache/pinot/pull/8589
m
Thanks @User
v
@francoisa can you please help me on how can i use this purge task?
f
You will have to set first
RecordPurgerFactory
(in this factory you will have to define you own business logic )in the
BaseMinionStarter
Then in the table definition you will have to set the schedule of this task. this for now only works on offline table. I’ve made an implementation witch works on realtime but no more time to work on it 😕 PR still on github 😉
v
okay thanks @francoisa
can you please share me you PR?
f
v
Thanks @francoisa
@francoisa how can i use ur implemented class for offline table? i need to add some task in table configure only right?
f
For offline it's already merged but not for realtime
v
yeah for offline only i am saying
"task": {
"taskTypeConfigsMap": {
"PurgeTask": {
"schedule": "0 */10 * * * ?"
}
}
},
is this enough?