When ingesting from S3 with data-lake type, the ge...
# ingestion
b
When ingesting from S3 with data-lake type, the getFileStatus for s3a:// is executed for the object of s3:// and an error occurs.
Copy code
Py4JJavaError: An error occurred while calling o42.parquet.
: java.nio.file.AccessDeniedException: <s3a://xxxxx/part-00000-6a55c272-ff55-45b8-b60c-18bd27d799f2-c000.snappy.parquet>: getFileStatus on <s3a://xxxxx/part-00000-6a55c272-ff55-45b8-b60c-18bd27d799f2-c000.snappy.parquet>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden;
recipe:
Copy code
source:
  type: data-lake
  config:
    env: "PROD"
    platform: "S3"
    base_path: "<s3://xxxxx/>"
    profiling:
      enabled: false
    aws_config:
      aws_region: "xxx"

sink: 
  type: "datahub-rest"
  config: 
    server: <http://localhost:8080>
n
this looks like an access denied more than a protocol issue
plus1 1
review your IAM role!
b
The IAM role was properly configured for the EC2 instance. Why is datahub accessing s3a:// when I have specified s3:// for recipe and there are only s3:// objects in S3?
l
@chilly-holiday-80781 ^
c
Hmm this isn’t supposed to happen—will fix
b
Thank you for your support. Additional log:
Copy code
---- (full traceback above) ----
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/entrypoints.py", line 105, in main
    sys.exit(datahub(standalone_mode=False, **kwargs))
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
File "/home/ec2-user/.local/lib/python3.7/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/telemetry/telemetry.py", line 196, in wrapper
    raise e
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/telemetry/telemetry.py", line 190, in wrapper
    res = func(*args, **kwargs)
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/cli/ingest_cli.py", line 87, in run
    pipeline.run()
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/ingestion/run/pipeline.py", line 182, in run
    self.source.get_workunits(), 10 if self.preview_mode else None
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/ingestion/source/data_lake/__init__.py", line 538, in get_workunits
    yield from self.get_workunits_s3()
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/ingestion/source/data_lake/__init__.py", line 507, in get_workunits_s3
    yield from self.ingest_table(aws_file, relative_path)
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/ingestion/source/data_lake/__init__.py", line 386, in ingest_table
    table = self.read_file(full_path)
File "/home/ec2-user/.local/lib/python3.7/site-packages/datahub/ingestion/source/data_lake/__init__.py", line 252, in read_file
    df = self.spark.read.parquet(file)
File "/home/ec2-user/.local/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 353, in parquet
    return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
File "/home/ec2-user/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
File "/home/ec2-user/.local/lib/python3.7/site-packages/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
File "/home/ec2-user/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o42.parquet.
: java.nio.file.AccessDeniedException: <s3a://xxxxx/part-00000-6a55c272-ff55-45b8-b60c-18bd27d799f2-c000.snappy.parquet>: getFileStatus on <s3a://xxxxx/part-00000-6a55c272-ff55-45b8-b60c-18bd27d799f2-c000.snappy.parquet>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: xxxxx; S3 Extended Request ID: xxxxx), S3 Extended Request ID: xxxxx
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:174)
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:117)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1887)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1854)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1794)
	at org.apache.hadoop.fs.FileSystem.isDirectory(FileSystem.java:1700)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.isDirectory(S3AFileSystem.java:2572)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:47)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:376)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:758)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: xxxxx; S3 Extended Request ID: xxxxx), S3 Extended Request ID: xxxxx
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1264)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1045)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1872)
	... 22 more
c
This PR should clarify and fix: https://github.com/linkedin/datahub/pull/4263. For standard crawling and schema inference, just s3 will be fine. But for profiling, you’ll still need to enable permissions for the s3a protocol because Spark relies on it to read S3 files (Hadoop 3.0+ only support s3a, see https://cwiki.apache.org/confluence/display/HADOOP2/AmazonS3).
b
Thank you! I'll try again the PR is merged.
👍 1
In this PR, the following sentence is added, but what exactly are the permissions for s3a? I had set AmazonS3FullAccess in my IAM policy, but I got a 403 error. Is there any further policy I need to add?
If profiling, make sure that permissions for **s3a://** access are set because Spark and Hadoop use the s3a:// protocol to interface with AWS (schema inference outside of profiling requires s3:// access).
c
Hmm can you see if you have enabled
s3:ListBucket
,
s3:GetObject
, and
s3:ListBucketMultipartUploads
?
These should be the specific ones required
b
I use this official policy. I think these permission is included to "s3:*". AmazonS3FullAccess:
Copy code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:*",
        "s3-object-lambda:*"
      ],
      "Resource": "*"
    }
  ]
}
c
Hmm this should be sufficient
I’ll do some more digging
b
@chilly-holiday-80781 I tried with v0.8.28 and ingested the same recipe as before and it worked! Thank you!👍
c
Incredible! Glad it worked out