worried-branch-76677
12/02/2022, 5:35 AMpipeline_name instead?
Reason is that, I am building this connector that create a checkpoint for each workspace / namespace.
Because of how JobID is created. I am not able to create checkpoint for each individual workspace / namespace in one recipe.
I think some workaround is to run 1 workspace per recipe in a seperate python process.worried-branch-76677
12/02/2022, 5:45 AMStaleEntityRemovalHandler and change how JobID is generated… just wonder which is easier to maintaingray-shoe-75895
12/02/2022, 11:54 PMgray-shoe-75895
12/02/2022, 11:54 PMgray-shoe-75895
12/02/2022, 11:57 PMworried-branch-76677
12/05/2022, 3:25 AMworried-branch-76677
12/05/2022, 3:30 AMjob_checkpoint_aspects[job_name] = checkpoint_aspect
Because job_name is fixed. We aren’t able to update a new checkpoint.worried-branch-76677
12/05/2022, 3:31 AMjob_name will always be PLATFORMNAME_stale_entity_removalworried-branch-76677
12/06/2022, 5:43 PMgray-shoe-75895
12/06/2022, 8:43 PMdict[workspace_id, urns] . That way, when the full recipe runs, it does a full replacement of the state. When the “limited” recipe runs for a subset of the workspaces, it updates the workspace IDs that it actually ingested from and copies the rest from the previous checkpointgray-shoe-75895
12/06/2022, 8:45 PMStaleEntityRemovalHandler would continue to be usefulgray-shoe-75895
12/06/2022, 8:49 PM_init_job_id method- if you do that, you’d need multiple instances of the StaleEntityRemovalHandler which is why I’m less a fan of that ideagray-shoe-75895
12/06/2022, 8:50 PMworried-branch-76677
12/07/2022, 10:48 AMclass PowerBiStaleEntityRemovalHandler(StaleEntityRemovalHandler):
def __init__(
self,
source: StatefulIngestionSourceBase,
config: Optional[StatefulIngestionConfigBase],
state_type_class: Type[StaleEntityCheckpointStateBase],
pipeline_name: Optional[str],
run_id: str,
):
super().__init__(source, config, state_type_class, pipeline_name, run_id)
def _init_job_id(self) -> JobId:
job_name_suffix = "stale_entity_removal"
platform: Optional[str] = getattr(self.source, "platform")
if getattr(self.source, "current_workspace_id", None):
job_name_suffix += "_{}".format(
getattr(self.source, "current_workspace_id")
)
return JobId(f"{platform}_{job_name_suffix}" if platform else job_name_suffix)
In the class PowerBiDashboardV2Source(StatefulIngestionSourceBase):
I run the below segment when I ingest a new workspace_id
self.stale_entity_removal_handler._job_id = (
self.stale_entity_removal_handler._init_job_id()
)
# TODO: this is getting hacky
self.register_stateful_ingestion_usecase_handler(
self.stale_entity_removal_handler
)gray-shoe-75895
12/07/2022, 8:10 PM