I'm having issues using PyFlink on AWS Managed Fli...
# troubleshooting
f
I'm having issues using PyFlink on AWS Managed Flink deployment:
Copy code
errorCode
CodeError.InvalidApplicationCode
message
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:248)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 17 more
messageSchemaVersion
1
messageType
ERROR
zip file content: • main.py (contains main function reading kinesis stream using table_env) • lib/flink-sql-connector-kinesis-1.15.2.jar Runtime Properties:
Copy code
consumer.config.0	input.stream.name	<INPUT_STREAM_NAME>
consumer.config.0	scan.stream.initpos	LATEST
kinesis.analytics.flink.run.options	jarfile	lib/flink-sql-connector-kinesis-1.15.2.jar
kinesis.analytics.flink.run.options	python	main.py
producer.config.0	output.stream.name	<OUTPUT_STREAM_NAME>
producer.config.0	shard.count	1
main.py
Copy code
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(
    environment_settings=env_settings)
statement_set = table_env.create_statement_set()

# on kda
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"


def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        raise Exception('A file at "{}" was not found'.format(
            APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]


def create_source_table(table_name, stream_name, region, stream_initpos):
    return """
    CREATE TABLE {0} (
        ticker VARCHAR(6),
        price DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
    PARTITIONED BY (ticker)
    WITH (
        'connector' = 'kinesis',
        'stream' = '{1}',
        'aws.region' = '{2}',
        'scan.stream.initpos' = '{3}',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601')
    """.format(
        table_name, stream_name, region, stream_initpos).strip()


def create_sink_table(table_name, stream_name, region):
    return """
    CREATE TABLE {0} (
        ticker VARCHAR(6),
        price DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
    PARTITIONED BY (ticker)
    WITH (
        'connector' = 'kinesis',
        'stream' = '{1}',
        'aws.region' = '{2}',
        'sink.partitioner-field-delimiter' = ';',
        'sink.batch.max-size' = '100',
        'format' = 'json',
        'json.timestamp-format.standard' = 'ISO-8601'
    )
    """.format(table_name, stream_name, region)


def create_print_table(table_name):
    return """
    CREATE TABLE {0} (
        ticker VARCHAR(6),
        price DOUBLE,
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
    WITH (
        'connector' = 'print')
    """.format(table_name)


def main():
    # Application Property Keys
    input_property_group_key = "consumer.config.0"
    producer_property_group_key = "producer.config.0"

    input_stream_key = "input.stream.name"
    input_region_key = "aws.region"
    input_starting_position_key = "flink.stream.initpos"

    output_stream_key = "output.stream.name"
    output_region_key = "aws.region"

    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    # get application properties
    props = get_application_properties()
    print(props)

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, producer_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_stream = output_property_map[output_stream_key]
    output_region = output_property_map[output_region_key]

    # 2. Creates a source table from a Kinesis Data Stream
    create_source_sql = create_source_table(
            input_table_name, input_stream, input_region, stream_initpos)
    table_env.execute_sql(create_source_sql)

    # 3. Creates a sink table writing to a Kinesis Data Stream
    create_sink_sql = create_sink_table(
        output_table_name, output_stream, output_region)
    table_env.execute_sql(create_sink_sql)

    # 4. Inserts the source table data into the sink table
    insert_sql = "INSERT INTO {0} SELECT * FROM {1}" \
        .format(output_table_name, input_table_name)
    statement_set.add_insert_sql(insert_sql)

    print(statement_set.explain())
    statement_set.execute()
    # print(table_result.get_job_client().get_job_status())


if __name__ == "__main__":
    main()
Any Ideas?
a
Hi, exception you’ve provided shows that Python process failed. To see what have caused this failure check logs produced by
PythonDriver
at
INFO
level - you should see traceback logged there
🙌 1
j
We had the same problem with KDA not being very descriptive of the error messages when running the python datastream API. Running the application locally first via session cluster on docker allowed us to see and debug the errors. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#starting-a-sessi[…]luster-on-docker
🙌 1
1
f
will check those points, thank you guys 😃
running locally examples^^
f
@Jeremy Ber I followed it some times, but always get errors on numpy build on my mac m1
🥲 3
j
ahh yes i have seen that error with m1. @Lorenzo Nicora
f
@Aleksandr Pilipenko @Joey Resuento @Jeremy Ber I found the issue and it's working now, thanks for the support ❤️
👍 1
🙌 1