Hi team, I was exploring pyflink with KDA. Depende...
# troubleshooting
p
Hi team, I was exploring pyflink with KDA. Dependencies: Kafka (inhouse kafka, not on MSK), Pyflink, also have packaged all required python packages as per how to use external package within pyFlink, also created Uber jar for kafka and flink connectors. Properties (consumer - FlinkKafkaConsumer):
Copy code
"bootstrap.servers": "kafka-server:9094",
"group.id": "fin-topic-name",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='passwd';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT"
My program is reading from kafka topic and doing some transformations. Followed this doc https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html for configuring KDA. Getting below error when deployed on KDA (but working on local), any idea? Exception: "exception-classifier.filters.user-exception-stack-regex.configuration, Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. java.lang.InterruptedException. sleep interrupted.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run;Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. org.apache.flink.kinesis.shaded.com.amazonaws.AbortedException.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run.+Caused by. java.lang.InterruptedException. sleep interrupted;Caused by. org.apache.kafka.common.errors.TimeoutException. Timed out waiting for a node assignment;Caused by. org.apache.kafka.common.errors.SaslAuthenticationException. Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512;Caused by. org.apache.kafka.common.errors.TopicAuthorizationException. Topic authorization failed"
j
this is an auth error--can you validate with the same credentials in an EC2 box?
p
@Jeremy Ber any idea why this says SCRAM-SHA-512 mechanism even though i am passing PLAIN (and same is expected by the kafka server)?
j
9094 is typically the TLS port, are you sure the port is correct?
9092 is used for plaintext
p
@Jeremy Ber with below config, and python consumer (from confluent_kafka import Consumer), it worked from Ec2:
Copy code
{
'bootstrap.servers': 'kafka-server:9094', 
'security.protocol': 'SASL_PLAINTEXT', 
'sasl.mechanisms': 'PLAIN',
 'sasl.username': 'username', 
 'sasl.password': 'passwd', 
 'group.id': 'group-id', 
 'compression.type': 'gzip', 
 'message.max.bytes': 10000000
 }
j
in same VPC?
p
yup
j
might need to provide truststore info
p
this is MSK, we are having inhouse hosted kafka (most probably on EKS... not so sure). Shouldn't it throw connectivity related error instead of auth?
j
i see lots of errors in your stack actually
one of which mentions Kinesis?
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.
Copy code
Caused by. org.apache.kafka.common.errors.TimeoutException. Timed out waiting for a node assignment;
Copy code
Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
p
"Timed out waiting for a node assignment" might mean connectivity problem. Kafka and KDA is in different VPCs, we have done VPC peering. Client EC2 and KDA is in same VPC, this is working from EC2. Regarding port 9094, i suspect we are doing port forwarding to 9092 (not sure).
j
Yes appears this is network issue then. Might be worth creating ticket through AWS support
p
yes, errors/trace from flink is little confusing in this case.
a
Hi, what you’re referring to is not an exception, but logged configuration of exception classification. It should not affect your application execution. You can safely ignore this in your logs. Is your application failing to run? If yes - try to look at the logs that have
messageType
equals
ERROR
and have
throwableInformation
field containing exception stacktrace.
πŸ‘ 1
p
yes i also suspected same thing, because i can see some mysql related errors also apart from this. Thanks.
Copy code
/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/sqlalchemy/orm/query.py:196: SyntaxWarning: \"is not\" with a literal. Did you mean \"!=\"?
  if entities is not ():
Traceback (most recent call last):
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/flink_transformation_job.py\", line 2, in <module>
    from lib.MessageProcessor.TransformationMessageProcessors.transformation_message_processor import \\
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/lib/MessageProcessor/__init__.py\", line 1, in <module>
    from lib.utils.helpers import *
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/lib/utils/helpers.py\", line 6, in <module>
    from lib.utils._redis import *
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/lib/utils/_redis.py\", line 4, in <module>
    from lib.utils.custom_decorators import provide_session
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/lib/utils/custom_decorators.py\", line 1, in <module>
    from database import Session
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/database.py\", line 39, in <module>
    engine = create_engine(
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/sqlalchemy/engine/__init__.py\", line 450, in create_engine
    return strategy.create(*args, **kwargs)
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/sqlalchemy/engine/strategies.py\", line 87, in create
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/sqlalchemy/dialects/mysql/mysqldb.py\", line 118, in dbapi
    return __import__(\"MySQLdb\")
  File \"/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/MySQLdb/__init__.py\", line 18, in <module>
    from . import _mysql
ImportError: cannot import name '_mysql' from partially initialized module 'MySQLdb' (most likely due to a circular import) (/tmp/flink-web-4d42f811-bcf9-4b1b-806b-e9a30344e280/flink-web-upload/04054398-949f-43b0-947e-832da02bff91_code/MySQLdb/__init__.py)
this could be the actual error, @Aleksandr Pilipenko @Jeremy Ber I am packaging all python packages in my parent directory, referred this doc: https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/commit/70a1372b7d0106e9fe8518119924ecf3d481f91c Is this enough to connect to MySql directly (not via data stream)? I am using Mysql to get some meta data. Using below python packages versions: mysql-connector-python==8.0.33 mysqlclient==1.4.4 PyMySQL==0.9.3 SQLAlchemy==1.3.7 Also, have created fat jar with mysql-connector-java, kafka-clients and flink-connector-kafka
a
Error you’ve posted indicates an issue with dependencies. Does this code runs locally? I’m not too familiar with SQLAlchemy, but I think this might help https://stackoverflow.com/questions/59156895/cannot-import-name-mydb-from-partially-initialized-module-connection-in-pyth
p
yeah it works locally, I will try with pyRequirements config now instead of copying all packages. will update here.
@Aleksandr Pilipenko it it is really a code issue then it should fail on local also, I guess
@Aleksandr Pilipenko Looks like mysql driver is missing in KDA, Any idea how we can install this in KDA? or pass it with .zip? Note: I am trying to connect Mysql directly, not via pyflink lib. (this works in local env). getting below error with pip install in KDA:
Copy code
Collecting mysqlclient==1.4.4
Downloading mysqlclient-1.4.4.tar.gz (86 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 86.2/86.2 kB 127.2 MB/s eta 0:00:00
Preparing metadata (setup.py): started
Preparing metadata (setup.py): finished with status 'error'
error: subprocess-exited-with-error

Γ— python setup.py egg_info did not run successfully.
β”‚ exit code: 1
╰─> [13 lines of output]
/bin/sh: mysql_config: command not found
/bin/sh: mariadb_config: command not found
/bin/sh: mysql_config: command not found
Traceback (most recent call last):
File "<string>", line 2, in <module>
File "<pip-setuptools-caller>", line 34, in <module>
File "/tmp/pip-install-u3fa174n/mysqlclient_35ce17197f6648378dc1d41dab36562f/setup.py", line 16, in <module>
metadata, options = get_config()
File "/tmp/pip-install-u3fa174n/mysqlclient_35ce17197f6648378dc1d41dab36562f/setup_posix.py", line 61, in get_config
libs = mysql_config("libs")
File "/tmp/pip-install-u3fa174n/mysqlclient_35ce17197f6648378dc1d41dab36562f/setup_posix.py", line 29, in mysql_config
raise EnvironmentError("%s not found" % (_mysql_config_path,))
OSError: mysql_config not found
[end of output]
a
Looks like
mysqlclient
requires native MySQL libraries to be present on host OS - you can’t install such dependencies with KDA. You will need to look at how to connect to your DB without such dependencies. Unfortunately I’m not familiar enough with this subject, so I can’t recommend any alternatives. If you have further questions related to KDA - please contact AWS support.
p
Yes, thanks
p
"This connector provides a sink that writes data to a JDBC database." Looks like it can just push data to mysql.
j
I can't confirm the compatibility between table and datastream but the jdbc table API connector certainly supports use as a source as well
So you could leverage table API for this piece
p
let me check