Hey folks, Currently seeing this kind of an error...
# troubleshooting
j
Hey folks, Currently seeing this kind of an error:
Copy code
Sink: Committer (1/1)#0 (0c5abffd2f065e1edef3036b20ec42a5) switched from RUNNING to FAILED with failure cause: java.nio.file.AccessDeniedException: <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: initiate MultiPartUpload on <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: KC4SFHVXBGJC6XFW; S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=; Proxy: null), S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=:AccessDenied
My sink is configured like so:
Copy code
return DeltaSink.forRowData(
                        new Path("s3a://<bucket/<path>"),
                        new Configuration() {
                            {
                                set(
                                        "spark.hadoop.fs.s3a.aws.credentials.provider",
                                        "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
                            }
                        },
                        FULL_SCHEMA_ROW_TYPE)
                .withMergeSchema(true)
                .build();
Writes to my local work, but writes to S3 are failing. The credentials on the machine are admin for the account. Any thoughts?
j
is this s3 bucket in a different region than your app’s default credential region by any chance?
j
It should not be, let me check.
Copy code
$ $AWS_REGION
us-east-1: command not found
And the bucket is in us-east-1
j
and you’re just running flink off your local machine, or is this on a hosted environment?
j
This is running out of an EC2 instance of mine
I am able to do things like upload my jar file to that same bucket from the instance.
j
in a VPC by any chance?
j
The EC2 instance is in a VPC, but it is able to access S3 (it seems)
(as well as my data sources and our confluent sink).
j
i wonder if its just the permissions--i see specifically it calls out MultiPartUpload--do you have permissions to do so?
j
The credentials configured in ~/.aws is for that account. Bucket owner (the account) has List, Write ACLs. The role I am authing as has AdministratorAccess
Copy code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "*",
      "Resource": "*"
    }
  ]
}
So I should have permissions is I guess my thought.
Flink, running locally, will ultimately lean on the config in my ~/.aws folder right?
It seems to for the AWS Clients I spin up for things like secret manager.
j
it depends on the connector--i know for FileSink you need to specify AWS credentials in the flink config somewhere
might be worthwhile also going through this checklist: https://repost.aws/knowledge-center/emr-s3-403-access-denied
j
So I checked VPC Endpoint permissions, that all seems sorted, when I run AWS CLI commands that seems sorted too. I wonder if the issue is that the account the EC2 instance is in is different than the S3 Bucket. This shouldn't be an issue since it is supposed to use the DefaultAWSCreds which should ultimately use the saml creds I have on the host, but maybe something funky is going on.
j
Yeah if you can test with a bucket in the same account that might be a quick way to prove
j
Not as much of an option unfortunately What I did do was apply this policy to the bucket:
Copy code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Dev Desk Access",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<other_aws_account>:root"
      },
      "Action": "s3:*",
      "Resource": "arn:aws:s3:::<my_bucket>/*"
    }
  ]
}
But am still seeing the same issue.
k
@Jalil Alchy You are using Environment Variables -
AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
right?
j
No I have tried to set the
spark.hadoop.fs.s3a.aws.credentials.provider"
instead so that it can pick up off of my IAM creds using the DefaultAWSCredentialsProviderChain
If I spin up an
AmazonS3
client and write a file, that works directly.
k
isnt DefaultAWSCredentialsProviderChain AWS credentials provider chain that looks for credentials in this order: • Environment Variables -
AWS_ACCESS_KEY_ID
and
AWS_SECRET_ACCESS_KEY
(RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), or
AWS_ACCESS_KEY
and
AWS_SECRET_KEY
(only recognized by Java SDK) ?
j
It also looks in your
~/.aws/
config and can use the env value
AWS_PROFILE
to pick a profile from that config.
👍 1
At least that is how all my AWS SDK clients I am using are configured (and working)
k
i have a demo app on my local machine that writes data to S3 using DeltaSink, I was using ENV variables though. No issues there I have not tried
AWS_PROFILE
Can try tommorrow maybe, since Im runnign tests for SQL support for Delta Connector anyways https://github.com/delta-io/connectors/issues/238
j
I can try using ENV Variables, but end state needs to be able to write from a deployed application using IAM.
👍 1
Even using the env variables, same issue.
Something seems very wrong. Mind sharing with me the code you used to create the delta sink? Just want to make sure there isn't a major discrepancy.
Could this be caused by anything related to shading?
k
@Jalil Alchy I will try to find some time to take a look. Could you tell me few things: 1. what is delta-connector version you are using 2. what is flink version you are using 3. Do you run Flink from docker/k8s on EC2 or "standalone" app? I run it using connector 0.6.0 on Flink 1.15.3 and 1.16.1 regarding my setup, I was running this on local docker, from docker-compose -> writes to S3 docker-compose setup both for Task and Job managers:
Copy code
environment:
    - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
    - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
The DeltaSink setup is very simple:
Copy code
public static DeltaSink<RowData> createDeltaSink(
        String deltaTablePath,
        RowType rowType) {

    Configuration conf = new Configuration();
    conf.set("delta.checkpointInterval", "1000");
    return DeltaSink
        .forRowData(
            new Path(deltaTablePath),
            conf,
            rowType)
        .withPartitionColumns("age")
        .build();
}
deltaTablePath is path to S3. Im using a "fat jar" to submit my demo app. my dependencies in pom.xml looks like this (im using aws profile when building the fat jar.
Copy code
<dependencies>
		<dependency>
			<groupId>io.delta</groupId>
			<artifactId>delta-flink</artifactId>
			<version>${delta.version}</version>
		</dependency>
		<dependency>
			<groupId>io.delta</groupId>
			<artifactId>delta-standalone_${scala.main.version}</artifactId>
			<version>${delta.version}</version>
		</dependency>

		<!-- <https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files> -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-parquet</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop-version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-hadoop-fs</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>

	<profiles>
		<profile>
			<id>aws</id>
			<properties>
				<flink.scope>compile</flink.scope>
			</properties>
			<dependencies>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-aws</artifactId>
					<version>3.1.0</version>
				</dependency>
				<dependency>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-client</artifactId>
					<version>${hadoop-version}</version>
				</dependency>
				<dependency>
					<groupId>org.apache.flink</groupId>
					<artifactId>flink-file-sink-common</artifactId>
					<version>${flink.version}</version>
				</dependency>
			</dependencies>
		</profile>
	</profiles>
Copy code
<hadoop-version>3.1.0</hadoop-version>
Could this be caused by anything related to shading?
I dont think so, if there would be a problem with shading/versions you woudl have ClassNotFound or ClassDefNotFound or MethodNotFound exceptions I think
j
1. what is delta-connector version you are using a. 0.6.0 2. what is flink version you are using a. 1.15.2, maybe this is the issue 3. Do you run Flink from docker/k8s on EC2 or "standalone" app? a. standalone.
Also using a thick jar, most of our dependencies are similar, although I am building with Bazel
k
so if this is a standalone process in your case, i wonder if the ENV variables are visible for flink process 🤔
j
They should be, but I can log them.
by standalone I mean I am running start-cluster.sh, then flink run <path_to_jar>
👍 1
k
yeah, logging them would be a good thing to check
log them from Flink process, from your main method for example Also all Flink nodes are on the same EC2?
For Delta Sink/Source both TM and JB will have interactions with s3
j
Yes
👍 1
It does seem the env variable is not available.
k
how you set them on that EC2?
j
I believe they were set via exports
in this terminal
k
maybe this will help https://docs.aws.amazon.com/cloud9/latest/user-guide/env-vars.html I bet you need something like ~/.bashrc or ~/.bash_profile
"set via exports" <- this will work only for currnet "bash" session I think. You are running flink from that same session? Maybe try to set them "globally"
j
Yes, but I have set it globally, and still seeing the same issue (primarily for AWS_PROFILE and AWS_REGION) I wonder if it is more or less running against a different .aws config.
Which is strange because the aws clients I manually create work.
I think at the very least it is becoming more clear that Hadoop isn't using my ~/.aws/config
You would think that running this on Kinesis Data Analytics using IAM would pretty much insulate all the credential issues with S3 I was seeing, but even then I am getting the same access denied 😅 Going to try attaching a secret directly in the env.
k
but even then I am getting the same access denied 😂
@Jalil Alchy any luck/progress with that one?
j
Not exactly yet, but actually funny you mention it, it did seem to be a permission isssue when running in KDA. I think the reason why my EC2 instance wasn't working (this just came up) was because the s3 bucket used s3 managed kms encryption, and so it didn't necessarily have access to KMS keys (although it should have) still tweaking to work with this.
So I did get it working from KDA, but not from ec2 yet.
Although I think it may be KMS related permissions, it isn't the first time I have seen S3 effectively hide KMS permission issues, but it wasn't on my mind until today.
👍 1