Eric Xiao
09/23/2022, 6:26 PMMichael LeGore
09/23/2022, 8:43 PMHari Krishna Poturi
09/24/2022, 8:54 AMKristian Grimsby
09/24/2022, 4:52 PMKristian Grimsby
09/24/2022, 8:05 PMAttributeError: 'NoneType' object has no attribute 'message_types_by_name'
. Is this familiar for anyone? Sorry if this is obvious, but I’m fairly new to Flink - so trying to figure out the way…
Traceback (most recent call last):
File "/Users/grimsby/Dev/stimle/flink/wc.py", line 134, in <module>
word_count(known_args.input, known_args.output)
File "/Users/grimsby/Dev/stimle/flink/wc.py", line 66, in word_count
env = StreamExecutionEnvironment.get_execution_environment()
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 837, in get_execution_environment
return StreamExecutionEnvironment(j_stream_exection_environment)
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 65, in __init__
self._open()
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1033, in _open
startup_loopback_server()
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1023, in startup_loopback_server
from pyflink.fn_execution.beam.beam_worker_pool_service import \
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py", line 44, in <module>
from pyflink.fn_execution.beam import beam_sdk_worker_main # noqa # pylint: disable=unused-import
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py", line 21, in <module>
import pyflink.fn_execution.beam.beam_operations # noqa # pylint: disable=unused-import
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 24, in <module>
from pyflink.fn_execution import flink_fn_execution_pb2
File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
_INPUT = DESCRIPTOR.message_types_by_name['Input']
AttributeError: 'NoneType' object has no attribute 'message_types_by_name
Isaac Pohl-Zaretsky
09/26/2022, 3:32 AMAbhinav sharma
09/26/2022, 7:35 AMSevvy Yusuf
09/26/2022, 10:22 AMflink.jobmanager.job.numberOfFailedCheckpoints
and flink.jobmanager.job.downtime
and neither of them solve our problem. Thank you in advanceKristian Grimsby
09/26/2022, 11:45 AMFelix Angell
09/26/2022, 7:38 PMKeyedProcessFunction
mention that elements can be output via the Collector param though I don't see any methods in any of the functions specified which have a Collector arg type specified here
How can I go around writing out an element in this case?Hunter
09/27/2022, 6:30 AMAdesh Dsilva
09/27/2022, 7:30 AMPrathit Malik
09/27/2022, 8:36 AMupsert-kafka
connector due to source being cdc
type)
◦ Destination : postgres
◦ Issue : if flink job was down for lets say 3 hours, when the pipeline is resumed, 3 hours of data is backfilled but we are speculating that records are not coming in order from kafka ( in case of backfill ), watermark is not behaving as per expectation. Due to some records with greater event timestamp, watermark is progressed way further and the records before that are dropped.
▪︎ watermark strategy : WATERMARK FOR event_created_at AS event_created_at - INTERVAL '5' MINUTE
• Things we have tried so far :
◦ setting SET 'pipeline.auto-watermark-interval' = '0';
to get punctuated watermark.
Can someone please help on this to provide some insights if anyone faced this kind of issue before ?
Thanks in advance 🙏Paul Lam
09/27/2022, 8:37 AMtoe
09/27/2022, 9:55 AMPouria Modaresi
09/27/2022, 2:33 PMAdrian Chang
09/27/2022, 3:45 PMJustin
09/27/2022, 8:16 PMLIMIT 5
. The Flink job’s topology seems to acknowledge this yet it will continue to receive records (1,269 when I took the screenshot below) and the job will continue to run (screenshot 3)
Output from the logs is in screenshots 4 and 5.Aeden Jameson
09/27/2022, 11:05 PMSergio Sainz
09/28/2022, 1:01 AMconf/flink-conf.yaml
with environment variables? like jobmanager.rpc.port=$MYPORT
?Krish Narukulla
09/28/2022, 3:27 AMavro versions
across jars: flink-avro
and flink-connector-hive
for same Flink version: 1.15. Any idea how to resolve this when building FAT jar?
flink-avro
brings in avro version 1.10.0
.
flink-connector-hive
brings in hive-exec
FAT jar, it contains avro classes for 1.8.2
1. https://mvnrepository.com/artifact/org.apache.flink/flink-avro/1.15.2
2. https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.12/1.15.2
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/overview/#user-defined-dependencies
is there hive-exec
slim jar which only brings in hive udf interfaceRashmin Patel
09/28/2022, 5:51 AMKafkaSourceFetcherManager
implements SingleThreadFetcherManager.
, so a single-threaded kafka consumer will be assigned multiple partitions and in KafkaRecordEmitter
, it emits a records of type output.collect(element.f0, element.f2)
where f0 is actual data record and f2 is kafka ingestion timestamp. So, we are not maintaining any partition info along with record which will not allow us to induce a partition aware watermark in the subsequent execution flow.
Can someone confirm is my understanding about this correct ?Zhiqiang Du
09/28/2022, 7:33 AMflink run -s /chk-1
2. And enable checkpointing every 2 minutes.
3. My understanding is that if task is failure while it’s running , Flink will recovery the task by the latest succeed checkpoint
4. But why the Flink recovery a job by the savepoint chk-1
?Hartmut
09/28/2022, 2:49 PMdata class
but resolves as RAW
type…
+------+--------------------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------+--------------------------------+------+-----+--------+-----------+
| f0 | RAW('com.example.User', '...') | TRUE | | | |
+------+--------------------------------+------+-----+--------+-----------+
Questions:
1. Is this a known issue?
2. Is Kotlin support anywhere on the roadmap?
3. If so, where can I create a feature request for this specific use case?Jin Yi
09/28/2022, 5:56 PMYaroslav Bezruchenko
09/28/2022, 8:33 PMAvinash
09/28/2022, 8:54 PMtimestamp
input_topic_schema_builder = Schema.new_builder() \
.watermark("timestamp", "timestamp - INTERVAL '5' SECOND")\
When I try to create this schema, it tells me there is an parsing error at
org.apache.flink.sql.parser.impl.ParseException: Encountered "timestamp -" at line 1, column 1.
Is timestamp
a reserved word? What can i do if the underlying kafka topic field name is timestamp
? Should I rename it somehow? TIASumit Nekar
09/29/2022, 3:38 AMTaruj Goyal
09/29/2022, 3:48 AMAsync
operators interacting with external services. We are trying to do batching here by KeyBy and Windowing operations currently and would want to know what are some good ways to do so to minimize the external network calls to while evenly distributing the load across all the task-slots.
What are some obvious ways to do so and good patterns to follow along for this?Taruj Goyal
09/29/2022, 3:51 AM