Anyone ingested metadata from S3 Delta Lake? I am ...
# ingestion
n
Anyone ingested metadata from S3 Delta Lake? I am stuck and need some help.
m
@plain-guitar-45103 might be able to help!
thank you 1
p
What error are you getting @numerous-bird-27004 or you need some help getting started?
thank you 1
n
First of all, I thought I can ingest S3 Delta Lake using the UI Ingest (by creating a Custom Source). Not sure if that is supposed to work.
Secondly, This is what I have in the UI Custom Source:
Copy code
source:
    type: delta-lake
    config:
        env: PROD
        base_path: '<s3://path/to/delta/tables>'
        s3:
            aws_config:
                aws_access_key_id: aws_access_key_id
                aws_secret_access_key: aws_secret_access_key
sink:
    type: datahub-rest
    config:
        token: 'ey......'
        server: '<https://abc.io/api/gms>'
Third, when I run the above, I am getting the following stack trace:
Copy code
~~~~ Execution Summary ~~~~

RUN_INGEST - {'errors': [],
 'exec_id': 'b3d1f773-3794-4192-accb-f94bfc89622b',
 'infos': ['2022-07-12 20:05:48.616065 [exec_id=b3d1f773-3794-4192-accb-f94bfc89622b] INFO: Starting execution for task with name=RUN_INGEST',
..................................
.........................................
           'xmltodict-0.13.0\n'
           '[2022-07-12 20:06:43,882] INFO     {datahub.cli.ingest_cli:99} - DataHub CLI version: 0.8.40\n'
           '[2022-07-12 20:06:45,029] ERROR    {logger:26} - Please set env variable SPARK_VERSION\n'
           '[2022-07-12 20:06:45,643] ERROR    {datahub.entrypoints:167} - File '
           '"/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/cli/ingest_cli.py", line 106, in run\n'
           '    88   def run(\n'
           '    89       ctx: click.Context,\n'
           '    90       config: str,\n'
           '    91       dry_run: bool,\n'
           '    92       preview: bool,\n'
           '    93       strict_warnings: bool,\n'
           '    94       preview_workunits: int,\n'
           '    95       suppress_error_logs: bool,\n'
           '    96   ) -> None:\n'
           ' (...)\n'
           '    102      pipeline_config = load_config_file(config_file)\n'
           '    103  \n'
           '    104      try:\n'
           '    105          logger.debug(f"Using config: {pipeline_config}")\n'
           '--> 106          pipeline = Pipeline.create(pipeline_config, dry_run, preview, preview_workunits)\n'
           '    107      except ValidationError as e:\n'
           '\n'
           'File "/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line '
           '204, in create\n'
           '    196  def create(\n'
           '    197      cls,\n'
           '    198      config_dict: dict,\n'
           '    199      dry_run: bool = False,\n'
           '    200      preview_mode: bool = False,\n'
           '    201      preview_workunits: int = 10,\n'
           '    202  ) -> "Pipeline":\n'
           '    203      config = PipelineConfig.parse_obj(config_dict)\n'
           '--> 204      return cls(\n'
           '    205          config,\n'
           '\n'
           'File "/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line '
           '152, in __init__\n'
           '    125  def __init__(\n'
           '    126      self,\n'
           '    127      config: PipelineConfig,\n'
           '    128      dry_run: bool = False,\n'
           '    129      preview_mode: bool = False,\n'
           '    130      preview_workunits: int = 10,\n'
           '    131  ):\n'
           ' (...)\n'
           '    148      logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")\n'
           '    149  \n'
           '    150      source_type = self.config.source.type\n'
           '    151      source_class = source_registry.get(source_type)\n'
           '--> 152      self.source: Source = source_class.create(\n'
           '    153          self.config.source.dict().get("config", {}), self.ctx\n'
           '\n'
           'File '
           '"/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/source/delta_lake/source.py", '
           'line 99, in create\n'
           '    97   @classmethod\n'
           '    98   def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":\n'
           '--> 99       config = DeltaLakeSourceConfig.parse_obj(config_dict)\n'
           '    100      return cls(config, ctx)\n'
           '\n'
           'File "pydantic/main.py", line 521, in pydantic.main.BaseModel.parse_obj\n'
           '\n'
           'File "pydantic/main.py", line 339, in pydantic.main.BaseModel.__init__\n'
           '\n'
           'File "pydantic/main.py", line 1064, in pydantic.main.validate_model\n'
           '\n'
           'File '
           '"/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/source/delta_lake/config.py", '
           'line 79, in validate_config\n'
           '    75   @pydantic.root_validator()\n'
           '    76   def validate_config(cls, values: Dict) -> Dict[str, Any]:\n'
           '    77       values["_is_s3"] = is_s3_uri(values["base_path"])\n'
           '    78       if values["_is_s3"]:\n'
           '--> 79           if values["s3"] is None:\n'
           '    80               raise ValueError("s3 config must be set for s3 path")\n'
           '\n'
           '---- (full traceback above) ----\n'
           'File "/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/cli/ingest_cli.py", line 106, in '
           'run\n'
           '    pipeline = Pipeline.create(pipeline_config, dry_run, preview, preview_workunits)\n'
           'File "/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line '
           '204, in create\n'
           '    return cls(\n'
           'File "/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line '
           '152, in __init__\n'
           '    self.source: Source = source_class.create(\n'
           'File '
           '"/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/source/delta_lake/source.py", '
           'line 99, in create\n'
           '    config = DeltaLakeSourceConfig.parse_obj(config_dict)\n'
           'File "pydantic/main.py", line 521, in pydantic.main.BaseModel.parse_obj\n'
           'File "pydantic/main.py", line 339, in pydantic.main.BaseModel.__init__\n'
           'File "pydantic/main.py", line 1064, in pydantic.main.validate_model\n'
           'File '
           '"/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/ingestion/source/delta_lake/config.py", '
           'line 79, in validate_config\n'
           '    if values["s3"] is None:\n'
           '\n'
           "KeyError: 's3'\n"
           '[2022-07-12 20:06:45,643] INFO     {datahub.entrypoints:176} - DataHub CLI version: 0.8.40 at '
           '/tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/lib/python3.9/site-packages/datahub/__init__.py\n'
           '[2022-07-12 20:06:45,643] INFO     {datahub.entrypoints:179} - Python version: 3.9.9 (main, Dec 21 2021, 10:03:34) \n'
           '[GCC 10.2.1 20210110] at /tmp/datahub/ingest/venv-b3d1f773-3794-4192-accb-f94bfc89622b/bin/python3 on '
           'Linux-5.13.0-1021-aws-x86_64-with-glibc2.31\n'
           "[2022-07-12 20:06:45,643] INFO     {datahub.entrypoints:182} - GMS config {'models': {}, 'versions': {'linkedin/datahub': {'version': "
           "'v0.8.40', 'commit': '11356e37ba84d45568811c1304826009c7207e9a'}}, 'managedIngestion': {'defaultCliVersion': '0.8.40', 'enabled': True}, "
           "'statefulIngestionCapable': True, 'supportsImpactAnalysis': False, 'telemetry': {'enabledCli': True, 'enabledIngestion': False}, "
           "'datasetUrnNameCasing': False, 'retention': 'true', 'datahub': {'serverType': 'prod'}, 'noCode': 'true'}\n",
           "2022-07-12 20:06:46.803085 [exec_id=b3d1f773-3794-4192-accb-f94bfc89622b] INFO: Failed to execute 'datahub ingest'",
           '2022-07-12 20:06:46.804137 [exec_id=b3d1f773-3794-4192-accb-f94bfc89622b] INFO: Caught exception EXECUTING '
           'task_id=b3d1f773-3794-4192-accb-f94bfc89622b, name=RUN_INGEST, stacktrace=Traceback (most recent call last):\n'
           '  File "/usr/local/lib/python3.9/site-packages/acryl/executor/execution/default_executor.py", line 121, in execute_task\n'
           '    self.event_loop.run_until_complete(task_future)\n'
           '  File "/usr/local/lib/python3.9/site-packages/nest_asyncio.py", line 89, in run_until_complete\n'
           '    return f.result()\n'
           '  File "/usr/local/lib/python3.9/asyncio/futures.py", line 201, in result\n'
           '    raise self._exception\n'
           '  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 256, in __step\n'
           '    result = coro.send(None)\n'
           '  File "/usr/local/lib/python3.9/site-packages/acryl/executor/execution/sub_process_ingestion_task.py", line 115, in execute\n'
           '    raise TaskError("Failed to execute \'datahub ingest\'")\n'
           "acryl.executor.execution.task.TaskError: Failed to execute 'datahub ingest'\n"]}
Execution finished with errors.
p
oh yea I was getting this previously
you will need to specify the aws_region
👍 1
but once you do you might get another error, which I am chasing
Here is link to the other error I am chasing
n
So no successful ingestion from S3 Delta Lake yet?
c
Can you please confirm if you have download permissions on s3 bucket's actual path? I have had issue where i had access to bucket but not to specific delta file in the bucket. Can you try to download one of the json file from the location to confirm the access? Also please let me know how are you creating a delta table ( using databricks/spark/standalone)
p
So I tried to download from within the
acryldata/datahub-actions:head
container and I got a permission error. But when I download from outside of the container, it downloads file. I think it's a permission error to the file system and not to the s3. I can list objects from the s3 just fine from within the container.
We are using databricks
and saving files as snappy.parquet
I am new to deltalake stuff too, so if I am not making sense, let me know
c
@plain-guitar-45103 Can we go over quick call to see whats happening?
p
Hi @careful-pilot-86309, sure my schedule is pretty open on the PST hours
c
Will 10:00 am PST work for you?
w
Hi, I also have the same error, how did you solve it?