Ankur Aggarwal
03/28/2024, 6:13 PMjava.lang.ArrayIndexOutOfBoundsException
code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.avro import AvroRowDeserializationSchema
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from pyflink.common.serialization import DeserializationSchema
from confluent_kafka import Consumer
import os
import sys
import time
import json
schema_registry_url = '<http://schema-registry:8081>'
schema_registry_client = SchemaRegistryClient({'url': schema_registry_url})
consumer_config: dict = {
'bootstrap.servers': 'broker:29092',
'group.id': 'av-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
kafka_topic: str = 'postgres_avro.public.transactions'
kafka_consumer: Consumer = Consumer(consumer_config)
kafka_consumer.subscribe([kafka_topic])
def fetch_schema():
try:
# lines removed to reduce length
schema_layout = schema_registry_client.get_schema(schema_id).schema_str
return schema_layout
except Exception as e:
print(f"schema issue - {e}")
def flink_avro_consumer(env: StreamExecutionEnvironment):
try:
schema_layout = fetch_schema()
print(f"schema - {schema_layout}")
avro_deserializer = AvroRowDeserializationSchema(avro_schema_string=schema_layout)
flink_consumer = FlinkKafkaConsumer(
topics='postgres_avro.public.transactions',
deserialization_schema=avro_deserializer,
properties={'bootstrap.servers': 'broker:29092', 'group.id': 'avro_group', 'enable.auto.commit': 'false', '<http://request.timeout.ms|request.timeout.ms>': '120000', '<http://session.timeout.ms|session.timeout.ms>': '60000'}
)
flink_consumer.set_start_from_earliest()
env.add_source(flink_consumer).print()
env.execute()
except Exception as e:
print(f"Consumer error - {e}")
raise e
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
kafka_jar = os.path.abspath('flink-sql-connector-kafka-1.17.2.jar')
avro_jar = os.path.abspath('flink-sql-avro-1.17.2.jar')
kafka_jar_struct = f"file://{kafka_jar}"
avro_jar_struct = f"file://{avro_jar}"
print(kafka_jar_struct)
env.add_jars(kafka_jar_struct, avro_jar_struct)
flink_avro_consumer(env)
print(f"records are picked from topic")
Adas Kavaliauskas
03/29/2024, 11:46 PMfrom pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state_backend import FsStateBackend
# 1. Create a StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
# start a checkpoint every 1000 ms
env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(interval=1000)
t_env.execute_sql("""
CREATE TABLE metrics (
name STRING,
type STRING,
`timestamp` BIGINT,
`dimensions` ROW<domain STRING>,
`values` ROW<doubleValue DOUBLE>
) WITH (
'connector' = 'kafka',
'topic' = 'metrics',
'properties.bootstrap.servers' = 'xxx:9092',
'format' = 'json',
'key.format' = 'json',
'key.fields' = 'name',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx";'
)
""")
t_env.execute_sql("""
CREATE TABLE metrics_print (
name STRING,
type STRING,
`timestamp` BIGINT,
`dimensions` ROW<domain STRING>,
`values` ROW<doubleValue DOUBLE>
) WITH (
'connector' = 'print'
)
""")
t_env.execute_sql(f"""
CREATE TABLE kafka_source (
vin STRING,
sale_location ROW<country STRING, address STRING>,
spec_powertrain_displacement ROW<unit STRING, `value` INT>,
meta ROW<`date` STRING, domain STRING>,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'completed_jobs',
'properties.bootstrap.servers' = 'xxx:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxxx";'
)
""")
# 3. Query the table and print the output to the console
metrics_query = t_env.sql_query("""
SELECT
'flink_data.domain_count' as name ,
'count' as type,
window_end AS `timestamp`,
ROW(domain) as dimensions,
ROW(doubleValue) as `values`
FROM (
SELECT
meta.domain,
EXTRACT(EPOCH FROM TUMBLE_START(proctime, INTERVAL '10' SECONDS)) AS window_start,
EXTRACT(EPOCH FROM TUMBLE_END(proctime, INTERVAL '10' SECONDS)) AS window_end,
COUNT(*) AS doubleValue
FROM kafka_source
WHERE meta.domain is not null
GROUP BY
TUMBLE(proctime, INTERVAL '10' SECONDS), meta.domain
)
""")#.print()
metrics_query.print_schema()
statement_set = t_env.create_statement_set()
statement_set.add_insert("metrics", metrics_query)
statement_set.add_insert("metrics_print", metrics_query)
statement_set.execute().wait()
env.execute()
Deepyaman Datta
04/01/2024, 4:11 PMcloudpickle
version is different between the client and remote worker. I've shared in a bit more detail in a GitHub issue on the Ibis repo, including how this didn't matter for Python 3.9.
I got a couple things working, but they're both quite heavy/hard to maintain from a user perspective. The first option that worked was to build the right Python version on top of the Flink image: https://github.com/ibis-project/ibis/pull/8725/commits/06bcf739ae896b1b57ce96b6d098290462ca54f8
A second option, perhaps a bit more lightweight, was to install Java SDK and Flink on top of the appropriate Python image: https://github.com/ibis-project/ibis/pull/8725/commits/7085159e05feb01227b9069e4f3ee42e06c0258c (although this had the extra downside, that I copied the docker-entrypoint.sh
script from the official Docker build).
Since we are basically just providing a unified Python dataframe API, and don't want to change the user workflow, would like to understand how the Flink team would recommend users configuring their images. My knowledge is likely also lacking in this area; for example, only last night I found out about the python.executable
config option on https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/python_config/; is the recommended answer to install Python using something like deadsnakes
PPA, and use this option? Any guidance would be much appreciated, and also happy to help add context to docs if this makes sense anywhere.Takous
04/02/2024, 8:04 AMDavid Bryson
04/03/2024, 5:35 PMSumit Kashyap
04/04/2024, 11:36 AMsetBulkFlushMaxActions
and setBulkFlushInterval
functionality.
If anyone has any insights or suggestions on how to achieve this or if there are any alternative solutions available, I'd greatly appreciate your input!Caroline McKee
04/04/2024, 7:09 PMres_table = (penalties_table
.window(Tumble.over(lit(1).milli).on(col('rowtime1')).alias("w"))
.group_by(col("frame_time"), col("id1"), col(id2"), col("w"))
.select(col("frame_time"), col("penalty").sum.alias("total_penalty"), col("id1"), col("id2")))
These are the metrics for the subtask after failure. Increasing the total taskmanager flink. memory does help, ie. the job will run for longer before failure, but it still has the same pattern of slowing down and eventually failing:
{
"id": "3bd0d25bb596363b287ebce5bfa9f22b",
"name": "GroupWindowAggregate[13] -> Calc[14]",
"maxParallelism": 128,
"parallelism": 1,
"status": "CANCELED",
"start-time": 1712255215749,
"end-time": 1712256000278,
"duration": 784529,
"tasks": {
"FINISHED": 0,
"DEPLOYING": 0,
"CANCELED": 0,
"CREATED": 0,
"CANCELING": 0,
"FAILED": 1,
"RUNNING": 0,
"RECONCILING": 0,
"SCHEDULED": 0,
"INITIALIZING": 0
},
"metrics": {
"read-bytes": 76581735,
"read-bytes-complete": true,
"write-bytes": 15695872,
"write-bytes-complete": true,
"read-records": 860344,
"read-records-complete": true,
"write-records": 214875,
"write-records-complete": true,
"accumulated-backpressured-time": 0,
"accumulated-idle-time": 815152,
"accumulated-busy-time": 0
}
}
I am confused because these metrics seem to indicate that the subtask is idle the entire time, yet the pyflink script does yield an output for a while (subsequent tasks in the job graph are not idle the whole time).
Can anyone offer any insight? Thanks in advance!Takous
04/04/2024, 8:21 PMAdas Kavaliauskas
04/08/2024, 3:43 PMt_env.execute_sql(f"""
CREATE TABLE kafka_source (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`topic` string METADATA FROM 'topic',
`proc_time` AS PROCTIME(),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'failures',
)
metrics_query = t_env.execute_sql("""
SELECT
topic,
EXTRACT(EPOCH FROM TUMBLE_START(proc_time, INTERVAL '10' SECONDS)) AS window_start,
EXTRACT(EPOCH FROM TUMBLE_END(proc_time, INTERVAL '10' SECONDS)) AS window_end,
COUNT(*) AS topic_count
FROM kafka_source
GROUP BY
TUMBLE(proc_time, INTERVAL '10' SECONDS), topic
""").print()
+----+--------------------------------+----------------------+----------------------+----------------------+
| op | topic | window_start | window_end | topic_count |
+----+--------------------------------+----------------------+----------------------+----------------------+
| +I | failures | 1712566460 | 1712566470 | 3 |
David Bryson
04/08/2024, 8:20 PMTraceback (most recent call last):
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 267, in _execute
response = task()
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 340, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 580, in do_instruction
return getattr(self, request_type)(
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 618, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle
op.finish()
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 152, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.finish
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 153, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.finish
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 155, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.finish
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 124, in finish
self.keyed_state_backend.commit()
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 1150, in commit
self.commit_internal_state(internal_state)
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 1214, in commit_internal_state
internal_state.commit()
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 566, in commit
to_await = self._state_handler.extend(
File "/Users/david/.pyenv/versions/pyflink-3.8-1.16.2/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1190, in extend
coder.encode_to_stream(element, out, True)
File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 53, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkFieldCoderBeamWrapper.encode_to_stream
File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 54, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkFieldCoderBeamWrapper.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 528, in pyflink.fn_execution.coder_impl_fast.IntCoderImpl.encode_to_stream
TypeError: an integer is required
Adas Kavaliauskas
04/09/2024, 4:23 PMVladimir
04/10/2024, 9:56 AMflink run -py kafka_csv_format.py
where I'm changing
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.15.0.jar")
to
env.add_jars("file:///flink-sql-connector-kafka-3.1.0-1.18.jar")
and I'm getting error, job not even starts, full code and error message will be provided in thread
please help to fix this error, also another helloworld with word count runs okay
Version: 1.19.0, Commit ID: eaffd22
Python 3.8.19 (I've also tried 3.10, 3.9, 3.12)Frank Gilroy
04/10/2024, 12:57 PMsoumilshah1995
04/10/2024, 10:00 PMfrom pyflink.table import EnvironmentSettings, TableEnvironment
import os
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-jar/flink-sql-avro-confluent-1.19.0.jar",
"flink-jar/flink-sql-connector-kafka_2.11-1.12.0.jar" # Update the connector version
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
# Define the customer_source table with Debezium Avro format
customer_source = """
CREATE TABLE IF NOT EXISTS customer_source (
salesid STRING,
invoiceid STRING,
itemid STRING,
category STRING,
price DECIMAL(38, 18), -- Adjust the type according to your schema
quantity INT, -- Adjust the type according to your schema
orderdate DATE, -- Adjust the type according to your schema
destinationstate STRING,
shippingtype STRING,
referral STRING,
updated_at TIMESTAMP(3), -- Adjust the type according to your schema
ts_ms BIGINT -- Adjust the type according to your schema
) WITH (
'connector' = 'kafka',
'topic' = 'hive.public.sales',
'properties.bootstrap.servers' = 'localhost:9092', # Update the port to default Kafka port
'properties.group.id' = 'sales',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = '<http://localhost:8081>'
)
"""
try:
# Execute the SQL to create the sources
table_env.execute_sql(customer_source)
print("Created customer_source table.")
# Execute the query to select data from the table
result = table_env.execute_sql("SELECT * FROM customer_source")
result.print()
print("Job completed successfully.")
except Exception as e:
print("An error occurred:", str(e))
Error
eated customer_source table.
An error occurred: An error occurred while calling o122.print.
: java.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:129)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:100)
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:247)
at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:163)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:187)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:123)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:126)
... 15 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2028)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:185)
... 17 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2100)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
... 17 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:421)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = hive.public.sales, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1712784191798, serialized key size = 6, serialized value size = 156, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@6d8cca2, value = [B@58aac1d1).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: java.io.IOException: Can't deserialize Debezium Avro message.
at org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:165)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:143)
at org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.deserialize(DebeziumAvroDeserializationSchema.java:133)
... 17 more
Caused by: org.apache.flink.avro.shaded.org.apache.avro.AvroTypeException: Found hive.public.sales.Value, expecting union
at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.a
Koushik Thiagarajan
04/12/2024, 10:07 PMShou Mik
04/13/2024, 8:35 AMJózef Nabski
04/13/2024, 8:31 PMRicardo Cordeiro
04/15/2024, 10:57 AMjava.lang.ClassCastException: class [B cannot be cast to class org.apache.flink.types.Row ([B is in module java.base of loader 'bootstrap'; org.apache.flink.types.Row is in unnamed module of loader 'app')
at org.apache.flink.formats.json.JsonRowSerializationSchema.serialize(JsonRowSerializationSchema.java:73)
at org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder$KafkaRecordSerializationSchemaWrapper.serialize(KafkaRecordSerializationSchemaBuilder.java:312)
at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.collector.RunnerOutputCollector.collect(RunnerOutputCollector.java:52)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalTwoInputPythonFunctionOperator.emitResult(AbstractExternalTwoInputPythonFunctionOperator.java:124)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:100)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:292)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalTwoInputPythonFunctionOperator.processElement(AbstractExternalTwoInputPythonFunctionOperator.java:138)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator.processElement1(ExternalPythonCoProcessOperator.java:116)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
David Bryson
04/22/2024, 5:18 PMVincent Laurenzano
04/22/2024, 6:42 PMRyan Goldenberg
04/22/2024, 11:40 PMflink-sql-connector-hive-2.3.9_2.12-1.18.1.jar
Flink is unable to recognize tables with connector option 'connector' = 'hive'
. How can I register 'connector' = 'hive'
?
(Edit) Partially solved, 'connector' = 'hive'
is recognized if the current catalog is HiveCatalog
Details: gist
Command: flink run --python /tmp/hive-dwh-3/scripts/run10.py
(hive jar in /opt/flink/lib/)
flink run --python <script> --jarfile <hive_connector>
also gives the same error
Errors:
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='hive'
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Nataraj T
04/24/2024, 12:30 PMNataraj T
04/24/2024, 12:32 PMsoumilshah1995
04/25/2024, 3:30 PM# Set AWS access and secret keys
table_env.get_config().get_configuration().set_string(
"s3.access-key", "admin"
)
table_env.get_config().get_configuration().set_string(
"s3.secret-key", "password"
)
# Set the MinIO endpoint
table_env.get_config().get_configuration().set_string(
"s3.endpoint", "<http://127.0.0.1:9000>"
)
# Enable path style access if needed (for MinIO)
table_env.get_config().get_configuration().set_boolean(
"s3.path.style.access", True
)
import os
os.environ['AWS_ACCESS_KEY_ID'] = "admin"
os.environ['AWS_ACCESS_KEY'] = "admin"
os.environ['AWS_SECRET_ACCESS_KEY'] = "password"
os.environ['AWS_SECRET_KEY'] = "password"
i am able to read the data and mino is running on docker container
hudi_output_path = '<s3a://warehouse/>'
hudi_sink = f"""
CREATE TABLE customers(
uuid VARCHAR(36) PRIMARY KEY NOT ENFORCED,
first_name VARCHAR(50),
city VARCHAR(50),
state VARCHAR(50)
)
PARTITIONED BY (`state`)
WITH (
'connector' = 'hudi',
'path' = '{hudi_output_path}' ,
'table.type' = 'MERGE_ON_READ'
);
"""
# Execute the SQL to create the Hudi table
table_env.execute_sql(hudi_sink)
# Define the data to be inserted into the Hudi table
table_env.execute_sql("""
INSERT INTO customers
SELECT * FROM source_table
""").wait()
able options are:
'connector'='hudi'
'path'='<s3a://warehouse/>'
'table.type'='MERGE_ON_READ'
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:331)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:451)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:227)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:177)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:177)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.HoodieIOException: Get table config error
at org.apache.hudi.util.StreamerUtil.getTableConfig(StreamerUtil.java:319)
at org.apache.hudi.table.HoodieTableFactory.setupTableOptions(HoodieTableFactory.java:115)
at org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:103)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:328)
... 29 more
Caused by: java.nio.file.AccessDeniedException: <s3a://warehouse/.hoodie/hoodie.properties>: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3799)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
at org.apache.hudi.util.StreamerUtil.getTableConfig(StreamerUtil.java:315)
... 32 more
Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:216)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1257)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:833)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:783)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6220)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6193)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5244)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
... 40 more
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50)
at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:177)
... 61 more
Wouter Zorgdrager
04/26/2024, 12:34 PMt_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set_local_timezone("UTC")
input_table = t_env.from_elements(
[
(
"elementA",
datetime(year=2024, month=4, day=12, hour=8, minute=35, tzinfo=pytz.utc),
),
(
"elementB",
datetime(year=2024, month=4, day=12, hour=9, minute=35, tzinfo=pytz.utc),
),
],
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3)),
]
),
)
input_table.execute().print()
With the output:
+----+--------------------------------+-------------------------+
| op | name | timestamp |
+----+--------------------------------+-------------------------+
| +I | elementA | 2024-04-12 10:35:00.000 |
| +I | elementB | 2024-04-12 11:35:00.000 |
+----+--------------------------------+-------------------------+
2 rows in set
If I change the timestamp to DataTypes.TIMESTAMP_LTZ(3)
then the output is still shifted by 1h compared to the UTC timestamp input element.
+----+--------------------------------+-------------------------+
| op | name | timestamp |
+----+--------------------------------+-------------------------+
| +I | elementA | 2024-04-12 09:35:00.000 |
| +I | elementB | 2024-04-12 10:35:00.000 |
+----+--------------------------------+-------------------------+
2 rows in set
Is this a bug in PyFlink or am I misunderstanding something here?Adas Kavaliauskas
04/26/2024, 1:42 PMlookup.cache
parameters, but so far no luck.
Would appreciate any help.
t_env.execute_sql(
f"""
CREATE TABLE completed (
meta ROW<domain STRING, url STRING>
) WITH (
'connector' = 'kafka',
'topic' = '{SOURCE_TOPIC}',
'properties.bootstrap.servers' = '{confluent_secret.bootstrap_servers}',
'properties.group.id' = 'flink_{APP_NAME}',
'format' = 'json',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{confluent_secret.sasl_username}" password="{confluent_secret.sasl_password}";'
)
"""
)
t_env.execute_sql(
f"""
CREATE TABLE configuration (
domain_name STRING,
enabled BOOLEAN,
country STRING,
created_at TIMESTAMP
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://{rds_secret.host}:{rds_secret.port}/{DB_SCHEMA}',
'table-name' = 'liquidity.configuration',
'username' = '{rds_secret.username}',
'password' = '{rds_secret.password}',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '0',
'lookup.partial-cache.expire-after-write' = '0',
'lookup.max-retries' = '3'
);
"""
)
t_env.execute_sql(
f"""
CREATE TABLE jobs (
id STRING,
url STRING,
domain STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://{rds_secret.host}:{rds_secret.port}/{DB_SCHEMA}',
'table-name' = 'liquidity.jobs',
'username' = '{rds_secret.username}',
'password' = '{rds_secret.password}'
);
"""
)
join_query = t_env.sql_query(
"""
SELECT
MD5(CONCAT(completed.url, completed.domain)) AS id,
completed.url,
completed.domain
FROM completed JOIN configuration ON completed.domain = configuration.domain_name
WHERE configuration.enabled = TRUE;
"""
)
join_query.print_schema()
statement_set = t_env.create_statement_set()
statement_set.add_insert("jobs", join_query)
statement_set.execute().wait()
Michiel Vandromme
04/26/2024, 7:41 PMXianyao Chen
04/27/2024, 5:01 PMFile "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 83, in __init__
) = extract_stateless_function(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/process/operations.py", line 176, in extract_stateless_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ModuleNotFoundError: No module named 'processor'
Xianyao Chen
04/27/2024, 5:09 PMNawaz Nayeem
04/28/2024, 6:43 AM