Aviv Dozorets
08/28/2022, 8:48 AMjava.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy40.submitTask(Unknown Source)
at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: The rpc invocation size 39076205 exceeds the maximum akka framesize.
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:308)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
... 11 more
And what looks like a root cause is that sometimes latest checkpoint is way over 100Mb and sometimes even 1Gb.
I’d appreciate any ideas what to look for.Raghunadh Nittala
08/29/2022, 12:09 AMsinkTo(kafkaSink)
. I'm trying to come up with an end to end integration test and want to use a simple sink for the same. I came across CollectSink
where I can add results to a list and do the matchers. But, CollectSink being SinkFunction
, I am not able to use it in sinkTo
, instead addSink
is where it can be used. Is there a simple SinkInterafce implementation I can use for tests that can be used within sinkTo
call?
Thank you.Aeden Jameson
08/29/2022, 3:02 AMhaim ari
08/29/2022, 5:56 AMNipuna Shantha
08/29/2022, 10:06 AMJirawech Siwawut
08/29/2022, 11:02 AMscan
option? https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/#streaming-source-partition-include
I got this error
The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
I check the code but it seems the only option is all
at the moment
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java (edited)Pedro Mázala
08/29/2022, 11:27 AMJuan Carlos Gomez Pomar
08/29/2022, 12:20 PMBastien DINE
08/29/2022, 3:27 PMAdrian Chang
08/29/2022, 4:20 PMto_append_stream
But it seems Flink does this operation in batches of rows and I am getting some delay, about 2 seconds.
I haven't found how to configure that behaviour.
Does anyone could guide me to the right parameters of the configuration please ?Prasanth Kothuri
08/29/2022, 7:54 PMorg.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
and scala needs org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
, this brings few problems, e.g our code is in scala and for writing unit tests I was using flink test harness (KeyedOneInputStreamOperatorTestHarness
) which is in java, and for testing ProcessWindowFunction I had to write it in java to get the test harness work otherwise it was giving type mismatch errorsYahor Paulikau
08/29/2022, 9:22 PMAvroDeserializationSchema.forGeneric(schema)
where schema
is a string representing avro parsed schema. This works fine until I try to convert stream into Flink table using
val inputTable = tableEnv.fromDataStream(
kafka_stream,
Schema.newBuilder()
.columnByExpression("proctime", "PROCTIME()")
.build())
inputTable.printSchema()
tableEnv.createTemporaryView(table_name, kafka_stream)
it’s printing this schema
(
`f0` RAW('org.apache.avro.generic.GenericRecord', '...'),
`proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
)
so this tells my i need further expand RAW data but have no idea how to do it. Also, looks like i need to convert from avro datatypes.
If i print kafka message directly from the stream object I can see its being decoded with the correct schema so deserialization step into GenericRecord seems to work.Haim Ari
08/30/2022, 3:53 AMreadiness/liveness
checks of the apps.
So if the flink app status is for example: Failed
k8s will not kill the pods and in Argo cd the app is considered synced. when in fact it should be considered as failed.
Is there a native(generic) way to handle that so that each app will self check ?Sylvia Lin
08/30/2022, 4:24 AMjob 00000000000000000000000000000000
, is there way to make unique id everytime when start a new job?Rashmin Patel
08/30/2022, 10:28 AMval bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
bEnv
.createInput(
HadoopInputs
.readHadoopFile(new ParquetInputFormat[T](), classOf[Void], classOf[T], "s3://<bucket-name>/<some-path>"))
Piero Gerardo Torres Robatty
08/30/2022, 1:23 PMKafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test1")
.setGroupId("test-group1")
.setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "<http://localhost:8081>"))
.build();
I still don't get what value should I put as Schema when I want to extract the schema from schema registry and use that info to deserialize the topic.Satya
08/30/2022, 2:41 PMflink-1.14.5
and based on my use case I want to read files from S3 and for that I am adding flink-connector-filesystem
but it seems like there is not connector for flink-1.14.*
but there is docs for the same for flink-1.14.*
. Is my understanding is correct?Krish Narukulla
08/30/2022, 4:08 PMString targetTable = """
CREATE TEMPORARY TABLE orders_agg(
order_id STRING,
total BIGINT
)
WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'orders_agg'
)
""";
Aeden Jameson
08/30/2022, 5:11 PMpublic class TimeoutReporter extends KeyedProcessFunction<Key, IN, OUT> {
private transient ValueState<Long> sessionState;
public void processElement(final IN event, final Context ctx, final Collector<OUT> out) throws Exception {
final Long timeout = sessionState.value();
if (timeout != null) {
ctx.timerService().deleteEventTimeTimer(timeout);
}
final long newTimer = (event.timstamp() - [WatermarkInMillis]) + sessionTimeoutInMillis;
ctx.timerService().registerEventTimeTimer(newTimer);
sessionState.update(newTimer);
out.collect(event);
}
@Override
public void onTimer(final long timestamp,
final OnTimerContext ctx,
final Collector<OUT> out) throws Exception {
super.onTimer(timestamp, ctx, out);
final Long result = sessionState.value();
if (result != null) {
final KEY key = ctx.getCurrentKey();
// I believe I need to add back the watermark because timestamp is in Flink Event time
out.collect(TimeoutEvent.build(timestamp + [WatermarkInMillis], key));
} else {
/// .... log something
}
}
}
With this implementation I’d expect to see the same results regardless of how I set the watermark, but that’s it the case. Thanks in advance.Krish Narukulla
08/30/2022, 5:14 PMdatagen source and jdbc sink
. Is it flink 1.16 is not free from scala?
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:92)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1723)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:811)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:904)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at com.roku.common.frameworks.airstream.examples.sql.Datagen.main(Datagen.java:46)
Kyle Ahn
08/30/2022, 5:30 PMAdrian Chang
08/30/2022, 8:39 PMIkvir Singh
08/30/2022, 8:54 PMStephan Weinwurm
08/31/2022, 12:03 AMJirawech Siwawut
08/31/2022, 1:23 AM{
"name": "david",
"events": [
{
"timestamp": "2022-08-01 00:00:00",
"id": "1"
},
{
"timestamp": "2022-08-01 00:00:01",
"id": "2"
}
]
}
I would like to create watermark on column timestampDonatien Schmitz
08/31/2022, 8:02 AMSylvia Lin
09/01/2022, 12:43 AM"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" % "flink-metrics-datadog" % flinkMetricVersion,
"org.apache.flink" % "flink-metrics-dropwizard" % flinkMetricVersion
"org.json4s" %% "json4s-ast" % "4.0.1",
"org.json4s" %% "json4s-jackson" % "4.0.1",
"com.fasterxml.jackson.core" % "jackson-core" % "2.12.4"
Any suggestion here? we'll have to use hadoop, since FileSink
only supports hadoop.Krish Narukulla
09/01/2022, 6:23 AMJirawech Siwawut
09/01/2022, 8:46 AMlatest
option for Hive read? https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/#streaming-source-partition-include
I got this error
The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
I check the code but it seems the only option is all
at the moment
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.javaRobin Cassan
09/01/2022, 9:05 AM