hello, I am new to Datahub. I am getting error to ...
# ingestion
b
hello, I am new to Datahub. I am getting error to ingest metadata from source S3. Would you please help me on these questions. 1. Can I use sink as "file" type for s3 source? . I got error saying that I cant use file type sink 2. When I use gms as sink, its failing? Please help me. Thanks
g
You should be able to use both the file sink and the gms (datahub-rest) sink with the s3 source
Could you provide some more details on the error that you’re running into?
b
hi @gray-shoe-75895 Here is the yaml that I am using to ingest metadata from s3 to sink of type "file" source: type: "s3" config: platform: s3 path_spec: include: "s3://imo-datalake-dev-gold20201022182214781400000004/rhubarb/2022/08/29/dataset/LEXICAL_DATASET_CORE/part-00000-8497b5ec-f63e-4f33-a78a-a8795b4201bc-c000.snappy.parquet" aws_config: aws_access_key_id: XXX aws_secret_access_key: XXXX aws_region: us-east-1 env: "PROD" profiling: enabled: false sink: type: file config: filename: /Users/hgopu/datahub/s3-injest-output.json
I am recieving the below error
---- (full traceback above) ----
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/entrypoints.py", line 149, in main
sys.exit(datahub(standalone_mode=False, **kwargs))
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
return self.main(*args, **kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1053, in main
rv = self.invoke(ctx)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 754, in invoke
return __callback(*args, **kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/decorators.py", line 26, in new_func
return f(get_current_context(), *args, **kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 347, in wrapper
raise e
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 299, in wrapper
res = func(*args, **kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/utilities/memory_leak_detector.py", line 91, in wrapper
return func(*args, **kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/cli/ingest_cli.py", line 212, in run
loop.run_until_complete(run_func_check_upgrade(pipeline))
File "/Users/hgopu/opt/anaconda3/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/cli/ingest_cli.py", line 166, in run_func_check_upgrade
ret = await the_one_future
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/cli/ingest_cli.py", line 157, in run_pipeline_async
return await loop.run_in_executor(
File "/Users/hgopu/opt/anaconda3/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/cli/ingest_cli.py", line 148, in run_pipeline_to_completion
raise e
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/cli/ingest_cli.py", line 134, in run_pipeline_to_completion
pipeline.run()
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 348, in run
for wu in itertools.islice(
File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/ingestion/source/s3/source.py", line 728, in get_workunits
assert self.source_config.path_specs
AssertionError
[2022-09-14 10:37:24,949] DEBUG    {datahub.entrypoints:198} - DataHub CLI version: 0.8.44.2 at /Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/__init__.py
[2022-09-14 10:37:24,949] DEBUG    {datahub.entrypoints:201} - Python version: 3.8.8 (default, Apr 13 2021, 12:59:45)
[Clang 10.0.0 ] at /Users/hgopu/opt/anaconda3/bin/python3 on macOS-10.16-x86_64-i386-64bit
[2022-09-14 10:37:24,949] DEBUG    {datahub.entrypoints:204} - GMS config {}
more information below
(base) HGOPU-MAC:datahub hgopu$ datahub --debug  ingest   -c s3-datahub.yaml  --dry-run
[2022-09-14 103722,735] DEBUG {datahub.telemetry.telemetry:210} - Sending init Telemetry [2022-09-14 103723,093] DEBUG {datahub.telemetry.telemetry:243} - Sending Telemetry [2022-09-14 103723,247] INFO {datahub.cli.ingest_cli:182} - DataHub CLI version: 0.8.44.2 [2022-09-14 103723,250] DEBUG {datahub.cli.ingest_cli:196} - Using config: {'source': {'type': 's3', 'config': {'platform': 's3', 'path_spec': {'include': 's3://imo-datalake-dev-gold20201022182214781400000004/rhubarb/2022/08/29/dataset/LEXICAL_DATASET_CORE/part-00000-8497b5ec-f63e-4f33-a78a-a8795b4201bc-c000.snappy.parquet'}, 'aws_config': {'aws_access_key_id': 'xxx', 'aws_secret_access_key': 'xxx', 'aws_region': 'us-east-1'}, 'env': 'PROD', 'profiling': {'enabled': False}}}, 'sink': {'type': 'file', 'config': {'filename': '/Users/hgopu/datahub/s3-injest-output.json'}}} [2022-09-14 103723,252] DEBUG {datahub.ingestion.run.pipeline:174} - Sink type:file,<class 'datahub.ingestion.sink.file.FileSink'> configured [2022-09-14 103723,252] INFO {datahub.ingestion.run.pipeline:175} - Sink configured successfully. [2022-09-14 103723,252] WARNING {datahub.ingestion.run.pipeline:276} - Failed to configure reporter: datahub Traceback (most recent call last): File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 264, in _configure_reporting reporter_class.create( File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py", line 92, in create raise ValueError( ValueError: Datahub ingestion reporter will be disabled because sink type file is not supported [2022-09-14 103723,562] INFO {numexpr.utils:159} - NumExpr defaulting to 8 threads. [2022-09-14 103723,846] ERROR {logger:26} - Please set env variable SPARK_VERSION [2022-09-14 103723,846] INFO {logger:27} - Using deequ: com.amazon.deequdeequ1.2.2-spark-3.0 [2022-09-14 103724,170] DEBUG {datahub.telemetry.telemetry:243} - Sending Telemetry [2022-09-14 103724,315] DEBUG {datahub.ingestion.run.pipeline:199} - Source type:s3,<class 'datahub.ingestion.source.s3.source.S3Source'> configured [2022-09-14 103724,315] INFO {datahub.ingestion.run.pipeline:200} - Source configured successfully. [2022-09-14 103724,317] INFO {datahub.cli.ingest_cli:129} - Starting metadata ingestion -[2022-09-14 103724,319] INFO {datahub.cli.ingest_cli:136} - Source (s3) report: {'events_produced': '0', 'events_produced_per_sec': '0', 'event_ids': [], 'warnings': {}, 'failures': {}, 'filtered': [], 'start_time': '2022-09-14 103724.170245 (now).', 'running_time': '0.15 seconds'} [2022-09-14 103724,319] INFO {datahub.cli.ingest_cli:139} - Sink (file) report: {'total_records_written': '0', 'records_written_per_second': '0', 'warnings': [], 'failures': [], 'start_time': '2022-09-14 103723.251355 (1.07 seconds ago).', 'current_time': '2022-09-14 103724.319242 (now).', 'total_duration_in_seconds': '1.07'} [2022-09-14 103724,376] DEBUG {datahub.upgrade.upgrade:124} - server_config:{'models': {}, 'versions': {'linkedin/datahub': {'version': 'v0.8.44', 'commit': 'c606abdb4033b3a88059da6a94c1ee043de4db7d'}}, 'managedIngestion': {'defaultCliVersion': '0.8.42', 'enabled': True}, 'statefulIngestionCapable': True, 'supportsImpactAnalysis': True, 'telemetry': {'enabledCli': True, 'enabledIngestion': False}, 'datasetUrnNameCasing': False, 'retention': 'true', 'datahub': {'serverType': 'quickstart'}, 'noCode': 'true'} [2022-09-14 103724,583] DEBUG {datahub.telemetry.telemetry:243} - Sending Telemetry [2022-09-14 103724,947] DEBUG {datahub.entrypoints:168} - File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/datahub/entrypoints.py", line 149, in main 146 def main(**kwargs): 147 # This wrapper prevents click from suppressing errors. 148 try: --> 149 sys.exit(datahub(standalone_mode=False, **kwargs)) 150 except click.exceptions.Abort: .................................................. kwargs = {} datahub = <Group datahub> click.exceptions.Abort = <class 'click.exceptions.Abort'> .................................................. File "/Users/hgopu/opt/anaconda3/lib/python3.8/site-packages/click/core.py", line 1128, in call
@gray-shoe-75895 please see above
g
Thanks for the logs - we’ve actually deprecated
path_spec
in favor or
path_specs
(which is just a list of multiple path spec). I think you’ve run into a bug with how we do the translation internally, but the fix should be to use the path_specs field instead
h
Hey @bland-sundown-49496 did it work for you after using
path_specs
?
b
@hundreds-photographer-13496 - Would you please provide couple of sample yaml examples of ingesting from S3. I am little confused with the syntax. If you have any docs please share . Thanks so much for follow up!
Also, I see my gms container getting automatically terminated and not able to access the service on port 8080 from the browser. FYI- I am installing in my mac
h
@bland-sundown-49496 - here is one example -https://datahubspace.slack.com/archives/CUMUWQU66/p1652608155866799. We definitely need more examples. I'll to update the s3 data lake doc with more examples. Meanwhile if you can share your bucket structure, I can help here.
b
Thank you. what does {table} represent in that example?
@hundreds-photographer-13496
h
{table} represents the folder for which dataset will be created.