Hi Team, regarding stateful ingestion checkpoint. ...
# random
w
Hi Team, regarding stateful ingestion checkpoint. I realized that Job ID is pretty rigid. We are not really able to change this value from recipe config. Does it make more sense for it to be
pipeline_name
instead? Reason is that, I am building this connector that create a checkpoint for each workspace / namespace. Because of how
JobID
is created. I am not able to create checkpoint for each individual workspace / namespace in one recipe. I think some workaround is to run 1 workspace per recipe in a seperate python process.
I guess i could subclass
StaleEntityRemovalHandler
and change how
JobID
is generated… just wonder which is easier to maintain
g
The naming here is a bit confusing - the pipeline name is already used as part of the state’s storage key. The job ID is more like a task ID, and is used in case a single source has to commit multiple pieces of state.
To clarify, here’s how we’re fetching existing state from datahub https://github.com/datahub-project/datahub/blob/817406eadbaa459342f481183097885037[…]n/src/datahub/ingestion/source/state/stateful_ingestion_base.py - note that it uses the pipeline_name, platform ID, and job ID
I’d love to learn a bit more about what your use case is, and why it’s not possible to use a single checkpoint to store all of the state in your case
w
Hey Harshal. I think the turning point is that sometime we only want to ingest data that have been modified/ updated in the namespace/ workspace. Currently for PowerBI, there is an API to get last 10 days modified workspaces to ingest (for example). We can still run the same recipe to iterate through the workspaces and commit the whole workspace to their respective checkpoint. This way, we can achieve 1 recipe with multiple checkpoints instead of lumping all the data into 1 checkpoint. The entity difference will keep soft deleting for this use case.
I think there will be alot of this usecase, especially for large corporate. Where we dont want to always ingest / run for all namespaces. One easy workaround is of cause creating multiple recipe runs. Sorry I didn’t mention that this line is the problematic one. https://github.com/datahub-project/datahub/blob/ebd685d40d11373270c83309704f2fe29c[…]n/src/datahub/ingestion/source/state/stateful_ingestion_base.py
job_checkpoint_aspects[job_name] = checkpoint_aspect
Because
job_name
is fixed. We aren’t able to update a new checkpoint.
So for example, in this recipe I got 3 different checkpoint i created for each namespace. It wont work. since
job_name
will always be
PLATFORMNAME_stale_entity_removal
Let me know your opinion 🙇🏽
g
Another approach here would be to have the state be a single
dict[workspace_id, urns]
. That way, when the full recipe runs, it does a full replacement of the state. When the “limited” recipe runs for a subset of the workspaces, it updates the workspace IDs that it actually ingested from and copies the rest from the previous checkpoint
It’d still require some custom code within the source, but the
StaleEntityRemovalHandler
would continue to be useful
The other option is to subclass StaleEntityRemovalHandler and override the
_init_job_id
method- if you do that, you’d need multiple instances of the StaleEntityRemovalHandler which is why I’m less a fan of that idea
Overall this definitely is an interesting use case, and you’re right that we’ll probably see this pattern re-emerge in other places. At the same time, I do want to make sure that we can keep the simple cases simple and hence don’t want to add too much additional complexity to an already overly complex stateful ingestion impl - that’s why I’m hesitant to make changes here without careful consideration
w
I agree. But i dont think stateful ingestion is too complicated. I did the below to hack around the problem though. Not sure if its recommended.
Copy code
class PowerBiStaleEntityRemovalHandler(StaleEntityRemovalHandler):
    def __init__(
        self,
        source: StatefulIngestionSourceBase,
        config: Optional[StatefulIngestionConfigBase],
        state_type_class: Type[StaleEntityCheckpointStateBase],
        pipeline_name: Optional[str],
        run_id: str,
    ):
        super().__init__(source, config, state_type_class, pipeline_name, run_id)

    def _init_job_id(self) -> JobId:
        job_name_suffix = "stale_entity_removal"
        platform: Optional[str] = getattr(self.source, "platform")
        if getattr(self.source, "current_workspace_id", None):
            job_name_suffix += "_{}".format(
                getattr(self.source, "current_workspace_id")
            )

        return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix)
In the
class PowerBiDashboardV2Source(StatefulIngestionSourceBase):
I run the below segment when I ingest a new workspace_id
Copy code
self.stale_entity_removal_handler._job_id = (
                self.stale_entity_removal_handler._init_job_id()
            )
            # TODO: this is getting hacky
            self.register_stateful_ingestion_usecase_handler(
                self.stale_entity_removal_handler
            )
g
This approach definitely should work. I don’t think we’d be able to merge that into OSS as-is, but it should work for you