Angelo Kastroulis
10/11/2022, 1:33 AMczchen
10/11/2022, 2:36 AMSourceReader.isAvailable
need to be thread-safe?
We use static variable in SourceReader.isAvailable
to pass available data for pullNext
. From https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#isAvailable--, there will be more than one future. So we wonder if future returned from SourceReader.isAvailable
need to be thread-safe.Sucheth Shivakumar
10/11/2022, 5:31 AMcom.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:321)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
Aqib Mehmood
10/11/2022, 10:32 AM" SELECT \n" +
" sku,\n" +
" name,\n" +
" price,\n" +
" updatedAt,\n" +
" LAG(price) OVER (PARTITION BY sku ORDER BY updatedAt ASC) as last_price); "
But I keep getting this error
Caused by: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
But the updatedAt column is already in TIMESTAMP_LTZ data type
How can I fix this?
TIATroy Coombs
10/11/2022, 11:54 AMAqib Mehmood
10/11/2022, 12:47 PM" SELECT \n" +
" sku,\n" +
" name,\n" +
" price,\n" +
" updatedAt,\n" +
" LAG(price) AS last_price FROM orders"
I'm getting this error,
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: LAG(DECIMAL(15, 4))
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateCallExpression$10(ExprCodeGenerator.scala:837) ~[?:?]
at scala.Option.getOrElse(Option.scala:121) ~[flink-scala_2.12-1.15.0.jar:1.15.0]
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateCallExpression$8(ExprCodeGenerator.scala:834) ~[?:?]
at scala.Option.getOrElse(Option.scala:121) ~[flink-scala_2.12-1.15.0.jar:1.15.0]
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:838) ~[?:?]
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:487) ~[?:?]
at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58) ~[?:?]
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[?:?]
Is there something I'm doing wrong?Kyle Ahn
10/11/2022, 1:27 PMjava.http.net.HttpClient
in a WindowProcessFunction?Tiansu Yu
10/11/2022, 2:28 PMJeremy Ber
10/11/2022, 3:23 PMPrasanth Kothuri
10/11/2022, 6:24 PMSteven Zhang
10/11/2022, 7:47 PMflink
service account needs to be in the default
namespace? I deployed the helm chart into the flink-cloud
namespace and i see the service account present in that namespace. I would assume that the operator would check the same namespace that it's deployed in for the service account, not the default
namespace alwaysLydian Lee
10/12/2022, 12:33 AMWebIdentityTokenCredentialsProvider
?
1. I’ve tested with presto plugin, but it failed to init at all: https://apache-flink.slack.com/archives/C03G7LJTS2G/p1665009572470739
2. Therefore I am testing with Hadoop, it does seem to authenticate successfully, but it always write 0 length data. The only suggestion I got is to use presto, but because of 1, I am unable to use presto.
Now I am in a deadlock, and I have no idea how to fix it. Would someone please help me out of this situation? Thanks!Tim Bauer
10/12/2022, 8:55 AMOption
types as nullable columns in the Table API?
I'm having a Datastream[Row]
with one column of Option[Instant]
. I have provided the return type information explicitly with Types.OPTION(Types.INSTANT)
.
When I print the resulting table schema, I get RAW('scala.Option', '...')
and subsequent operations fail.
I found this SO Question and have contemplated simply using empty strings as a workaround and re-casting everything on Table API side but I wonder if people here who had similar problems might have found better ways? Thank you 🙏Jeesmon Jacob
10/12/2022, 11:49 AMjackson-databind
Duc Anh Khu
10/12/2022, 12:23 PMInitializing
state indefinitely which in turn, causes the application to stall (stop consuming from data sources). Has anyone come across this issue before? Atm, the application runs (with backpressure) if we have low number of parallelism per KPU, but if we scale it up by increasing parallelism or increasing parallelism per KPU, the issue occurs. AWS KDA aside, what can cause a task to stay in Initalizing
state and how to debug it? ThanksHannah Hagen
10/12/2022, 11:41 PM./bin/flink run --python examples/python/datastream/word_count.py
it runs successfully the first time, but then fails after that with the error: Failed to create stage bundle factory! INFO:root:Initializing Python harness:
I'm not sure how to interpret this error and what might be the problem. Anyone have suggestions? It's also strange to me that the job succeeds the first time I submit it, but fails only on subsequent runs. 🤔
The only info I was able to find online is this post which suggests it is caused by a bug which was fixed in 1.15.0 (I'm on 1.15.2 so it should be fixed).
I'm using Flink 1.15.2, python 3.8 and jdk 11. Thanks in advance for any help.Krish Narukulla
10/13/2022, 2:50 AMpublic static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv,
ClassLoader classLoader){
try{
ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
.asSubclass(ScalarFunction.class).newInstance();
tableEnv.registerFunction(funcName, udfFunc);
<http://logger.info|logger.info>("register scala function:{} success.", funcName);
}catch (Exception e){
logger.error("", e);
throw new RuntimeException("register UDF exception:" + e.getMessage());
}
}
Exception:
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature dim_lookup(<CHARACTER>, <CHARACTER>, <CHARACTER>)
Carl Choi
10/13/2022, 4:11 AMstate.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints
in my jobmanager and taskmanager.
After that my jobs which had no issue before constantly restart with error NullPointerException, but not specific error message.
I guess below the message might be related with my problem.
2022-10-13 12:53:27,806 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient closed, clearing outstanding requests {1=java.util.concurrent.CompletableFuture@6b1a3cf9[Not completed, 1 dependents]}
2022-10-13 12:53:27,814 WARN org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Hanged up for unknown endpoint.
2022-10-13 12:53:27,977 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Metrics scheduler closed
2022-10-13 12:53:27,977 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Closing reporter org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
2022-10-13 12:53:27,977 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Metrics reporters closed
2022-10-13 12:53:27,978 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - App info kafka.consumer for consumer-labs_lmlt08_enroll_cdc-6940 unregistered
2022-10-13 12:53:27,978 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Map -> *anonymous_datastream_source$1*[1] -> LookupJoin[2] -> Calc[3] -> LookupJoin[4] -> Calc[5] -> LookupJoin[6] -> Calc[7] -> LookupJoin[8] -> Calc[9] -> LookupJoin[10] -> Calc[11] -> ConstraintEnforcer[12] (1/1)#100 (ab8c8b80fe52ba2dd38838873d5b76d3) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
more stange thing is that Java is working properly but Python version cause the problem. Is anyone there to share precious knowledge?dario bonino
10/13/2022, 10:08 AMthe rpc invocation size 27551907 exceeds the maximum akka framesize
While we initially increased the akka.framesize
parameter, we also investigated the causes by following some of the advices found in the Flink mailing list.
In particular, we found that some states in one of our operators are taking less space than the minimum required to generate a data file in checkpoints and are therefore in-lined in the root checkpoint metadata file. This behavior probably is at the root of the above error (this hypothesis seems confirmed by the size of the metadata file as well as by direct inspection of its contents).
Although one possible solution could be reducing the value of the state.storage.fs.memory-threshold
we are wondering if there are some mitigation actions that we can take at the operator level. Any suggestions?Jin Yi
10/13/2022, 11:54 AMKeyedBroadcastProcessFunction
the processBroadcastElement
function receives a tuple that is of the form (T element, boolean keep?) and accordingly keeps or evicts the element from the broadcast state. from there, the processElement
method simply checks to see if the broadcast state contains the element to pass the element through.
issue:
the interesting thing is that the stream used to populate the broadcast stream needs to have a global count of all elements and a heap of the top n elements as state. the idea for this stream is that it should emit those (element, keep?) tuples whenever elements are added or removed from the heap. i implemented this is a RichFlatMapFunction
and am relying on MapState
(keyed state) to keep track of the counts and the heap.
will this work? iiuc, this will not work as the heap is keyed state, so the heap will NOT be global across partitions.
can i be weird and explicitly set the parallelism to 1 or effectively do it by using a key function that maps everything to the same key? are these a bad idea? is there a way to make this work?Jirawech Siwawut
10/13/2022, 12:40 PMTommy Gunnarsson
10/13/2022, 12:44 PMYegna Subramanian Jambunath
10/13/2022, 5:30 PMkarthik
10/13/2022, 5:49 PMCaused by: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
old:[p-bbc27e21a2305f48c44c62091549f3b61d7cbcd2-01a55b50631d6f7791a0cf0600392315]
new:[p-bbc27e21a2305f48c44c62091549f3b61d7cbcd2-14c2626071f3700e7b7328358464cb8d]
Jonathan Hoyt
10/13/2022, 8:57 PM[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: com.example.SimpleTest
I’ve put up an example with all of the steps I’ve taken in https://github.com/jonmagic/flink-experiments/pull/1. I used all of the example code from the documentation at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/.
Anyone successfully used flink-protobuf in the release candidate and have tips for how to get it to recognize the java protobuf descriptor classes that are compiled in the referenced jar?Yaroslav Bezruchenko
10/13/2022, 9:20 PMException in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @544a2ea6
which basically means that it can't serialize String, somehow. Any tip what to do in this case?Jin Yi
10/13/2022, 10:07 PMflink-core
on the version i want. this package is transitively provided from flink-streaming-java
afaict. my typical way of overriding the package for explicit dependencies doesn't seem to be workingAvinash
10/14/2022, 6:07 AMbatch_table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/avinashkumar/dev/flinky/flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar")
batch_table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///avinashkumar/***/dev/flinky/flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar")
catalog_name = "flink_hive"
default_database = "test_catalog"
hive_conf_dir = "/opt/hive-conf"
hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
batch_table_env.register_catalog(hive_catalog)
Getting the following error
py4j.protocol.Py4JError: org.apache.flink.table.catalog.hive.HiveCatalog does not exist in the JVM
Should I be using a different jar or a different way to bundle the jar or not setting the path right? Any help is greatly appreciatedding bei
10/14/2022, 1:30 PMVarun Sayal
10/14/2022, 2:39 PM