Felipe Mendonça
10/05/2023, 12:51 PMerrorCode
CodeError.InvalidApplicationCode
message
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:248)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:239)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
... 17 more
messageSchemaVersion
1
messageType
ERROR
zip file content:
• main.py (contains main function reading kinesis stream using table_env)
• lib/flink-sql-connector-kinesis-1.15.2.jar
Runtime Properties:
consumer.config.0 input.stream.name <INPUT_STREAM_NAME>
consumer.config.0 scan.stream.initpos LATEST
kinesis.analytics.flink.run.options jarfile lib/flink-sql-connector-kinesis-1.15.2.jar
kinesis.analytics.flink.run.options python main.py
producer.config.0 output.stream.name <OUTPUT_STREAM_NAME>
producer.config.0 shard.count 1
main.py
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json
# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(
environment_settings=env_settings)
statement_set = table_env.create_statement_set()
# on kda
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
raise Exception('A file at "{}" was not found'.format(
APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def create_source_table(table_name, stream_name, region, stream_initpos):
return """
CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'scan.stream.initpos' = '{3}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601')
""".format(
table_name, stream_name, region, stream_initpos).strip()
def create_sink_table(table_name, stream_name, region):
return """
CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
PARTITIONED BY (ticker)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'sink.partitioner-field-delimiter' = ';',
'sink.batch.max-size' = '100',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""".format(table_name, stream_name, region)
def create_print_table(table_name):
return """
CREATE TABLE {0} (
ticker VARCHAR(6),
price DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
WITH (
'connector' = 'print')
""".format(table_name)
def main():
# Application Property Keys
input_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"
input_stream_key = "input.stream.name"
input_region_key = "aws.region"
input_starting_position_key = "flink.stream.initpos"
output_stream_key = "output.stream.name"
output_region_key = "aws.region"
# tables
input_table_name = "input_table"
output_table_name = "output_table"
# get application properties
props = get_application_properties()
print(props)
input_property_map = property_map(props, input_property_group_key)
output_property_map = property_map(props, producer_property_group_key)
input_stream = input_property_map[input_stream_key]
input_region = input_property_map[input_region_key]
stream_initpos = input_property_map[input_starting_position_key]
output_stream = output_property_map[output_stream_key]
output_region = output_property_map[output_region_key]
# 2. Creates a source table from a Kinesis Data Stream
create_source_sql = create_source_table(
input_table_name, input_stream, input_region, stream_initpos)
table_env.execute_sql(create_source_sql)
# 3. Creates a sink table writing to a Kinesis Data Stream
create_sink_sql = create_sink_table(
output_table_name, output_stream, output_region)
table_env.execute_sql(create_sink_sql)
# 4. Inserts the source table data into the sink table
insert_sql = "INSERT INTO {0} SELECT * FROM {1}" \
.format(output_table_name, input_table_name)
statement_set.add_insert_sql(insert_sql)
print(statement_set.explain())
statement_set.execute()
# print(table_result.get_job_client().get_job_status())
if __name__ == "__main__":
main()
Any Ideas?Aleksandr Pilipenko
10/05/2023, 12:56 PMPythonDriver
at INFO
level - you should see traceback logged thereJoey Resuento
10/05/2023, 12:59 PMFelipe Mendonça
10/05/2023, 1:03 PMJeremy Ber
10/05/2023, 1:40 PMJeremy Ber
10/05/2023, 1:40 PMFelipe Mendonça
10/05/2023, 1:50 PMJeremy Ber
10/05/2023, 1:51 PMFelipe Mendonça
10/05/2023, 3:13 PM