Hi All, I am trying to import Airflow jobs data fr...
# ingestion
f
Hi All, I am trying to import Airflow jobs data from json dump using python script and found that next classes only available:
Copy code
DataJobSnapshotClass
DataJobInfoClass
DataJobInputOutputClass
Could you please explain how you imported links to airflow or any custom properties in your demo environment ? Will these missing classes be available soon ?
m
Hi @fast-leather-13054: we integrate with Airflow using the lineage backend. (https://datahubproject.io/docs/metadata-ingestion/#using-datahubs-airflow-lineage-backend)
All you need to do it configure it in your instance, and it will automatically emit metadata for every DAG when it runs
m
we are not looking to di it this way
m
Are you looking to "pull" the DAG metadata from the airflow files checked into source?
m
we already have all of the metadata that we need extracted in a separate process. What does lineage backend use to write to MCE
we pulling this data via airflow http apis
basically
Copy code
DataJobSnapshotClass
DataJobInfoClass
DataJobInputOutputClass
do not seem to have all the properties avaialble
in particular… we want to pass a url for airflow job details during ingestion but do not see a property for it
m
record DataJobInfo includes CustomProperties, ExternalReference {
the ExternalReference includes that url property
f
I see only
Copy code
name: str,
type: Union[str, "AzkabanJobTypeClass"],
description: Union[None, str]=None,
in DataJobInfoClass
m
are you on the latest code-base?
g
also, you made need to rerun the gradle build + scripts/codegen.sh before you see it
f
may be it's the reason
m
@fast-leather-13054: did that solve the problem?
f
No, I updated to the latest master, run scripts/codegen.sh but still have only 3 python classes related to jobs
Also in examples/mce_files/bootstrap_mce.json only this classes are mentioned
@mammoth-bear-12532 Could you please help
m
@fast-leather-13054: do you see this snippet in your bootstrap_mce.json?
Copy code
"com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot": {
                 "urn": "urn:li:dataFlow:(airflow,dag_abc,PROD)",
g
The external link and custom properties are inside the DataJobInfoClass
f
I see only name, type, description inside DataJobInfoClass
m
what does
git log | head -n 1
give?
g
also want to confirm that you’ve run
(cd .. && ./gradlew :metadata-events:mxe-schemas:build) && ./scripts/codegen.sh
m
here is mine after running it:
Copy code
class DataJobInfoClass(DictWrapper):
    """Information about a Data processing job"""
    
    RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.datajob.DataJobInfo")
    def __init__(self,
        customProperties: Dict[str, str],
        name: str,
        type: Union[str, "AzkabanJobTypeClass"],
        externalUrl: Union[None, str]=None,
        description: Union[None, str]=None,
    ):
m
yep I go this after destroying and recreating my venv
Copy code
class DataFlowInfoClass(DictWrapper):
    """Information about a Data processing flow"""
    
    RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.datajob.DataFlowInfo")
    def __init__(self,
        name: str,
        customProperties: Optional[Dict[str, str]]=None,
        externalUrl: Union[None, str]=None,
        description: Union[None, str]=None,
        project: Union[None, str]=None,
    ):
and
Copy code
class DataJobInfoClass(DictWrapper):
    """Information about a Data processing job"""
    
    RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.datajob.DataJobInfo")
    def __init__(self,
        name: str,
        type: Union[str, "AzkabanJobTypeClass"],
        customProperties: Optional[Dict[str, str]]=None,
        externalUrl: Union[None, str]=None,
        description: Union[None, str]=None,
    ):
@fast-leather-13054 let me know if you are still having issues
@gray-shoe-75895, @mammoth-bear-12532 why
type: Union[str, "AzkabanJobTypeClass"],
? we have many more job types
f
@millions-engineer-56536 yes, (cd .. && ./gradlew metadata eventsmxe-schemas:build) fixes the issue @mammoth-bear-12532 I guess it's better to move this command from Requirements section to Set up your Python environment section.
1
g
@millions-engineer-56536 It’s a limitation in our modeling right now - we just haven’t gone through and added all the types. It should be pretty easy to add to
metadata-models/src/main/pegasus/com/linkedin/datajob/DataJobInfo.pdl
- any specific job types you’d want to be included?
@fast-leather-13054 good call, I’ll put it in both spots - https://github.com/linkedin/datahub/pull/2479
m
@gray-shoe-75895 Re specific job types that we are missing… we are missing Spark but would like to differentiate between PySpark and ScalaSpark. We also have just python scripts as jobs but can prob use something from existing ecosystem to indicate that
Quite honestly I don’t think it should be an Enum It can be just a text
m
+1 to what @millions-engineer-56536 said .. the AzkabanJobType was an unnecessary specializing IMO
We can just create a StringJobType to preserve backward compatibility
oh looks like this is already union of str, Azkaban..
so one can already provide just a string
g
the union str Azkaban is just to get around some python typing weirdness with enums
but agree that freeform string would be good
f
Also noted that link to airflow jobs is not worked in demo env https://demo.datahubproject.io/tasks/urn:li:dataJob:(urn:li:dataFlow:(airflow,datahub_superset_ingestion,prod),ingest_from_superset)/ownership It's because airflow expect run date in task url but it missed
m
it used to work… maybe it’s a good thing that it does not work now… I don’t think Acryl needs others to see internals of their airflow jobs 😉
actually links to pipeline do work
g
What’s not working in that link?
f
to pipeline works, but for jobs not
g
oh interesting - I never saw that since it only fails if you’re not logged into airflow