Avi Sanwal
05/13/2024, 2:20 PMKirill Ternovsky
05/13/2024, 7:47 PMcatalog.listViews()
returns an empty list.TCGUAN
05/14/2024, 2:50 AMTCGUAN
05/14/2024, 2:51 AMSchema schema = new Schema.Parser().parse(new File(SinkToFileTest2.class.getClassLoader().getResource("user.avsc").getFile()));;
GenericRecord user1 = new GenericData.Record(schema);
user1.put("name", "Alyssa");
user1.put("favorite_number", 256);
// Leave favorite color null
GenericRecord user2 = new GenericData.Record(schema);
user2.put("name", "Ben");
user2.put("favorite_number", 7);
user2.put("favorite_color", "red");
GenericRecord user3 = new GenericData.Record(schema);
user1.put("name", "Alyssa1");
user1.put("favorite_number", 2561);
// Leave favorite color null
GenericRecord user4 = new GenericData.Record(schema);
user2.put("name", "Ben1");
user2.put("favorite_number", 71);
user2.put("favorite_color", "red1");
String hdfsPath = "<hdfs://node01:8020/flink/output>";
DataStream<GenericRecord> input = env.fromElements(user1, user2, user3, user4);
final FileSink<GenericRecord> sink =
FileSink.forBulkFormat(new Path(hdfsPath), AvroParquetWriters.forGenericRecord(schema))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
input.sinkTo(sink);
env.execute();
TCGUAN
05/14/2024, 2:52 AM{
"namespace": "com.test.sink",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "favorite_number",
"type": [
"int",
"null"
]
},
{
"name": "favorite_color",
"type": [
"string",
"null"
]
}
]
}
TCGUAN
05/14/2024, 2:53 AMTCGUAN
05/14/2024, 3:24 AMAditya Pratap Singh
05/14/2024, 6:47 AMJoshua England
05/14/2024, 8:54 AMmain
class can't be found, even though its in the /opt/flink/usrlib/artifacts
directory in a jar file
org.apache.flink.util.FlinkException: Could not load the provided entrypoint class.
at org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:230)
at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.getPackagedProgram(StandaloneApplicationClusterEntryPoint.java:149)
at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.lambda$main$0(StandaloneApplicationClusterEntryPoint.java:90)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:89)
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'XXXXXXXXX' was not found in the jar file.
The Job Manager logs also show this classpath which doesn't include the usrlib. I don't know if that's related or not.
Classpath: /opt/flink/lib/flink-cep-1.19.0.jar:/opt/flink/lib/flink-connector-files-1.19.0.jar:/opt/flink/lib/flink-csv-1.19.0.jar:/opt/flink/lib/flink-json-1.19.0.jar:/opt/flink/lib/flink-scala_2.12-1.19.0.jar:/opt/flink/lib/flink-table-api-java-uber-1.19.0.jar:/opt/flink/lib/flink-table-planner-loader-1.19.0.jar:/opt/flink/lib/flink-table-runtime-1.19.0.jar:/opt/flink/lib/log4j-1.2-api-2.17.1.jar:/opt/flink/lib/log4j-api-2.17.1.jar:/opt/flink/lib/log4j-core-2.17.1.jar:/opt/flink/lib/logback-classic-1.5.6.jar:/opt/flink/lib/logback-core-1.5.6.jar:/opt/flink/lib/slf4j-api-2.0.13.jar:/opt/flink/lib/flink-dist-1.19.0.jar::::
Any hints on how to debug this would be very welcome!David Kjerrumgaard
05/14/2024, 8:00 PMJsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS
JOB ALBERTH FLORES MAMANI
05/15/2024, 3:32 AMTudor Plugaru
05/15/2024, 8:36 AMPeter Klauke
05/15/2024, 8:52 AMMatan Perelmuter
05/15/2024, 9:18 AMLital Ron
05/15/2024, 12:18 PMAleksandr Stepuchev
05/15/2024, 12:22 PMSaurabh Singh
05/15/2024, 1:21 PMprocess.size
/container limit by 2GB and is getting killed by the K8S, as it exceeds the defined K8s container limits. We do not see any issues with Heap or Network Memory exhaustion as such, we have plenty of buffer memory still left. What could be the area which is causing this extra memory request to k8s? How we can debug and find out this area? Also, we see this pattern repeat every 4-5 hours once but in seprate TMs.Aniruddh Jhavar
05/15/2024, 6:27 PMflink-kubernetes-operator v1.7.0
on OCP in a namespace which starts with a number for example 2024-flink
. When we create a CR we noticed the below error in the operator pod:
[m[33m2024-05-15 16:25:02,838[m [36mi.j.o.p.e.ReconciliationDispatcher[m [1;31m[ERROR][123-flink/flink-test] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-test', namespace='2024-flink'}, version: 838807} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.lang.IllegalArgumentException: hostname can't be null
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:148)
Upon further investigation we found https://issues.apache.org/jira/browse/FLINK-33917 which seemed to be similar to what we faced. Upon updating our operator code with the fix the issue did get resolved or rather the error went away but we saw another error
2024-05-15 16:30:52,455 o.a.f.k.o.o.d.SessionObserver [ERROR][2202prod/flink-ssl] REST service in session cluster timed out
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:261)
at org.apache.flink.kubernetes.operator.observer.deployment.SessionObserver.observeFlinkCluster(SessionObserver.java:39)
and this is causing the job-manager and task-manager to fail with below error:
Caused by: java.lang.Exception: Could not retrieve InetSocketAddress from Pekko URL <pekko.tcp://flink@session-cluster-quick-start.2024-flink:6123/user/rpc/resourcemanager_*>
at org.apache.flink.runtime.rpc.pekko.PekkoUtils.getInetSocketAddressFromRpcURL(PekkoUtils.java:567) ~[?:?]
at org.apache.flink.runtime.rpc.pekko.PekkoRpcSystem.getInetSocketAddressFromRpcUrl(PekkoRpcSystem.java:48) ~[?:?]
Not sure if the operand also requires some kind of update where the URI parser is failing or if there's some other issue? Thanks!Ralph Wadner Blaise
05/15/2024, 9:26 PM[
{
"BankID": "B0136",
"LoanNumber": 331751,
"SpvType": "SNR",
"Workflow": 1,
"TerminationDate": "2024-05-13T15:00:48.702424-04:00"
}
]
Thanks in advance for any help.Maxim Senin
05/15/2024, 11:24 PMupgradeMode: savepoint, allowNonRestoredState: false, takeSavepointOnUpgrade: true
), can I start it again from savepoint (initialSavepointPath
in config) with parallelism of 1 (1 TM pod) and let autoscaler scale it up again? I've been experimenting and it seems like the job dies, not being able to fully (or correctly) restore state from original parallelism of 10 (10 TM pods). Is it correct to expect that the JM will be able to resume the job from savepoint if number of pods had changed after restart?Glauber Dantas
05/16/2024, 1:20 AMFOR SYSTEM_TIME AS OF
statement on Flink SQL. Error: "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field"
.
Table joinedTable = sedona.sqlQuery(
"SELECT i.proc_time, i.customer_id, ca.area_id " +
"FROM Input AS i " +
// FIXME the lookup should be made on Postgres but it's not working using the 'FOR SYSTEM_TIME AS OF' statement
"INNER JOIN CustomerArea FOR SYSTEM_TIME AS OF i.proc_time AS ca ON ca.customer_id = i.customer_id " +
"WHERE ST_Intersects(i.geometry, ST_GeomFromWKB(ca.area_geometry))"
);
Without using that statement, the application runs but the external database is queried once (the referenced table gets traversed only once) and after finishing the query gives no more results (in fact the operator from the JDBC source is shown as `FINISHED`on the Flink Web UI, while other operators are still RUNNING
).
I created a discussion on StackOverflow also if this could help others with the same issue.
Have you faced that issue already? Can someone give me any advices on how to solve that?
Flink's version is 1.18.1.Vishva Mahadevan
05/16/2024, 5:46 AMTerrence Tian
05/16/2024, 6:50 AMjava.lang.RuntimeException: java.lang.reflect.InaccessibleObjectException: Unable to make field private final java.lang.Object[] java.util.Arrays$ArrayList.a accessible: module java.base does not "opens java.util" to unnamed module @2a2843ec
at com.twitter.chill.java.ArraysAsListSerializer.<init>(ArraysAsListSerializer.java:69)
at org.apache.flink.api.java.typeutils.runtime.kryo.FlinkChillPackageRegistrar.registerSerializers(FlinkChillPackageRegistrar.java:67)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:502)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:511)
Tilo Wiklund
05/16/2024, 9:29 AMRoman Bohdan
05/16/2024, 11:11 AMAvroKryoSerializerUtils.getAvroUtils().addAvroSerializersIfRequired(env.getConfig().getSerializerConfig(),
GenericData.Record.class);
com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.avro.Schema$RecordSchema
Serialization trace:
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) ~[flink-dist-1.19.0.jar:1.19.0]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[flink-dist-1.19.0.jar:1.19.0]
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[flink-dist-1.19.0.jar:1.19.0]
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99) ~[flink-dist-1.19.0.jar:1.19.0]
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) ~[flink-dist-1.19.0.jar:1.19.0]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:311) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copyField(PojoSerializer.java:326) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:262) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34) ~[flink-dist-1.19.0.jar:1.19.0]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52) ~[flink-dist-1.19.0.jar:1.19.0]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline$2$1.accept(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
at java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) ~[?:?]
at java.util.concurrent.CountedCompleter.exec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) ~[?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.avro.Schema$RecordSchema
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[flink-dist-1.19.0.jar:1.19.0]
... 27 more
Pedro Mázala
05/16/2024, 2:58 PM1.17
to 1.18
and upgrading the Flink operator from 1.7
to 1.8
I started seeing some weird behaviors regarding savepoint/checkpoint restoration.
Besides having newer savepoints on the storage (GCS in this case), Flink was restarting from an older savepoint/checkpoint instead. It looks like the operator is not upgrading the latest data for savepoints and checkpoints. Have someone experienced that?
I had the same issue with the Operator on version 1.6
, but this fixed it.Caroline McKee
05/16/2024, 4:18 PMres_table = input_table.join(event_table) \
.where((col("rowtime2") >= col("rowtime1") - lit(750).millis) & \
(col("rowtime2") <= col("rowtime1") + lit(col("interval_millis")).millis))
Is this possible? If so, what would be the syntax?Camilo Marin
05/16/2024, 10:20 PMJirawech Siwawut
05/17/2024, 3:06 AMmetrics.reporter.prom.filter.includes: *.Status.JVM.Memory*,*.job:uptime*
Maybe I did something wrong, but my goal is to filter only necessary metricChristos Hadjinikolis
05/17/2024, 7:24 AMPYFLINK
example of using KafkaSink()
(1.17.2) to produce a keyed-messages stream to a Kafka topic?