Emile Alberts
06/21/2022, 11:51 AMdefaultConfiguration:
create: true
append: true
flink-conf.yaml: |+
kubernetes.operator.metrics.reporter.stsd.factory.class: org.apache.flink.metrics.statsd.StatsDReporterFactory
kubernetes.operator.metrics.reporter.stsd.host: localhost
kubernetes.operator.metrics.reporter.stsd.port: 8125
However, I want to dynamically resolve the host using the value from the field path status.hostIP
. Any suggestions about how this can be achieved?Zain Haider Nemati
06/21/2022, 12:38 PMaws emr add-steps --cluster-id j-XXXXXX --steps Type=CUSTOM_JAR,Name=Name,Jar=<s3://abc.jar>,Args="bash","-c"," /usr/lib/flink/bin/flink run-application -t yarn-application",MainClass=org.main.classname
However it gives the following error :
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.util.RunJar.run(RunJar.java:237)
at org.apache.hadoop.util.RunJar.main(RunJar.java:158)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.functions.source.SourceFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 4 more
Can someone point me in the right direction or what is wrong here?Chengxuan Wang
06/21/2022, 8:43 PMRichAsyncFunction
?Jeff Levesque
06/21/2022, 11:06 PMsliding_window_table = table_env.sql_query('''
SELECT
ticker,
MIN(price) as min_price,
MAX(price) as max_price,
window_start,
window_end
FROM TABLE(
HOP(TABLE {1}, DESCRIPTOR(utc), INTERVAL {3}, INTERVAL {4}))
GROUP BY
ticker,
window_start,
window_end
'''.format(
sliding_window_alias,
input_table_name,
sliding_window_on,
sliding_window_every,
sliding_window_over
))
Which produces the following output:
+I[AAPL, 0.64, 99.77, 2022-06-21T10:19, 2022-06-21T18:19]
+I[TSLA, 0.33, 99.5, 2022-06-21T10:19, 2022-06-21T18:19]
+I[MSFT, 0.33, 99.66, 2022-06-21T10:19, 2022-06-21T18:19]
+I[AMZN, 0.5, 99.78, 2022-06-21T10:19, 2022-06-21T18:19]
+I[AMZN, 0.12, 99.83, 2022-06-21T10:20, 2022-06-21T18:20]
+I[AAPL, 0.07, 99.96, 2022-06-21T10:20, 2022-06-21T18:20]
+I[TSLA, 0.13, 99.88, 2022-06-21T10:20, 2022-06-21T18:20]
+I[MSFT, 0.05, 99.91, 2022-06-21T10:20, 2022-06-21T18:20]
An alternative syntax also produces identical results:
sliding_window_table = table_env.sql_query("""
SELECT
ticker,
MIN(price) as min_price,
MAX(price) as max_price,
HOP_START({2}, INTERVAL {3}, INTERVAL {4}) AS utc_start,
HOP_END({2}, INTERVAL {3}, INTERVAL {4}) AS utc_end
FROM {1}
GROUP BY
HOP({2}, INTERVAL {3}, INTERVAL {4}),
ticker
""".format(
sliding_window_alias,
input_table_name,
sliding_window_on,
sliding_window_every,
sliding_window_over
))
I would like to integrate some syntax to get the first and last price
within the HOP
range. When I try to expand either of the above sliding window (requires minor sink DDL adjustment) by adding `FIRST_VALUE()`:
sliding_window_table = table_env.sql_query('''
SELECT
ticker,
FIRST_VALUE(price) OVER (ORDER BY utc ASC) as first_price,
...
GROUP BY
...
'''.format(
sliding_window_alias,
input_table_name,
sliding_window_on,
sliding_window_every,
sliding_window_over
))
Both approaches arrive at the same error:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.java:159)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:104)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Process finished with exit code 1
Just tried the following:
sliding_window_table = table_env.sql_query('''
SELECT
ticker,
FIRST_VALUE(price) as first_price,
MIN(price) as min_price,
MAX(price) as max_price,
window_start,
window_end
FROM TABLE(
HOP(TABLE {1}, DESCRIPTOR(utc), INTERVAL {3}, INTERVAL {4}))
GROUP BY
ticker,
window_start,
window_end
'''.format(
sliding_window_alias,
input_table_name,
sliding_window_on,
sliding_window_every,
sliding_window_over
))
Got a merge
method error:
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: Could not find an implementation method 'merge' in class 'org.apache.flink.table.planner.functions.aggfunctions.FirstValueAggFunction' for function 'FIRST_VALUE' that matches the following signature:
void merge(org.apache.flink.table.data.RowData, java.lang.Iterable)
at org.apache.flink.table.functions.UserDefinedFunctionHelper.validateClassForRuntime(UserDefinedFunctionHelper.java:280)
at org.apache.flink.table.planner.codegen.agg.ImperativeAggCodeGen.checkNeededMethods(ImperativeAggCodeGen.scala:464)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1222)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator$$anonfun$checkNeededMethods$1.apply(AggsHandlerCodeGenerator.scala:1222)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.checkNeededMethods(AggsHandlerCodeGenerator.scala:1221)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.genMerge(AggsHandlerCodeGenerator.scala:987)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:615)
at org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.generateNamespaceAggsHandler(AggsHandlerCodeGenerator.scala:587)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate.createAggsHandler(StreamExecWindowAggregate.java:217)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate.translateToPlanInternal(StreamExecWindowAggregate.java:153)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:104)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Process finished with exit code 1
The above is interesting, since I've used SQL syntax yet received a Table API error.licho
06/22/2022, 2:59 AMTable taskTable =
tableEnv.fromDataStream(
taskDS,
Schema.newBuilder()
.columnByExpression("taskId", "JSON_VALUE(f0, '$.taskId')")
.columnByExpression("taskStatus", "JSON_VALUE(f0, '$.taskStatus')")
.columnByExpression("proc_time", "PROCTIME()")
.primaryKey("taskId")
.build());
I want define a table, the columns calculate from json struct data, and set a column as primarykey, only for temporal table join, but exception show
Exception in thread "main" org.apache.flink.table.api.ValidationException: Invalid primary key 'PK_taskId'. Column 'taskId' is not a physical column.
at org.apache.flink.table.catalog.DefaultSchemaResolver.validatePrimaryKey(DefaultSchemaResolver.java:372)
at org.apache.flink.table.catalog.DefaultSchemaResolver.resolvePrimaryKey(DefaultSchemaResolver.java:340)
at org.apache.flink.table.catalog.DefaultSchemaResolver.resolve(DefaultSchemaResolver.java:89)
at org.apache.flink.table.api.Schema.resolve(Schema.java:121)
at org.apache.flink.table.catalog.CatalogManager.resolveCatalogTable(CatalogManager.java:919)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.fromStreamInternal(AbstractStreamTableEnvironmentImpl.java:144)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:180)
at zdpx.Simulate2.sqlStream(Simulate2.java:69)
at zdpx.Simulate2.main(Simulate2.java:45)
Maciej Obuchowski
06/22/2022, 9:42 AMAs a rule of thumb, we suggest packaging the application code and all its required dependencies into one fat/uber JAR. This includes packaging connectors, formats, and third-party dependencies of your job. This rule does not apply to Java APIs, DataStream Scala APIs, and the aforementioned runtime modules, which are already provided by Flink itself and should not be included in a job uber JAR. This job JAR can be submitted to an already running Flink cluster, or added to a Flink application container image easily without modifying the distribution.
Jacob Tolar
06/22/2022, 7:28 PMaddDefaultKryoSerializer
). Would disableGenericTypes() do that, or does it disable Kryo completely?Eric Sammer
06/23/2022, 12:20 AMlag(x) over
can cause an NPE when accessing RowData. It's not immediately clear to me what the smallest reproducible case is just yet. Here's a query:
select
window_time,
window_start,
window_end,
req_path,
cnt
lag(cnt) over ( order by window_time ) as cnt_prev -- <-- problem
from (
select
window_time,
window_start,
window_end,
req_path,
count(1) cnt
from table(
tumble(table http_events, descriptor(_time), interval '1' second)
)
group by window_time, window_start, window_end, req_path
)
Exception:
java.lang.NullPointerException: null
at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:249)
at org$apache$flink$table$planner$functions$aggfunctions$LagAggFunction$LagAcc$8$Converter.toExternal(Unknown Source)
at org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
at UnboundedOverAggregateHelper$1757.setAccumulators(Unknown Source)
at org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:219)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:603)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
I'll try and come up with a simpler repro. Just a heads up. This is Flink 1.14.2 I think. cc/ @Timo WaltherFrancis Conroy
06/23/2022, 6:27 AMError from server: request to convert CR from an invalid group/version: <http://flink.apache.org/v1alpha1|flink.apache.org/v1alpha1>
Do I need to re-install the old CRD?Yoel Benharrous
06/23/2022, 8:29 AMstateless
function (I accumulate records and flush every X records, records are accumulated in a transient list in memory.)
the motivation is to create batch without the need of keyBy
. How can I flush records when a checkpoint barrier is received and warranty exactly-once semantic?Ananya Goel
06/23/2022, 1:03 PMParquetProtoWriters
for writing to S3 sink. I am now querying over S3 via Athena. I am able to query over all the fields except the array fields. Getting: org.apache.parquet.io.PrimitiveColumnIO cannot be cast to org.apache.parquet.io.GroupColumnIO
message. Any idea how can I resolve this?Omar Izhar
06/23/2022, 5:13 PMFlink version = 1.13.1
"data": {
"id": 107707,
"shop_name": "redacted_shopname",
"shop_location": "{\"latitude\":33.6992544,\"longitude\":73.0651125}",
"shop_type_id": 1,
"created_at": "2022-06-10T14:57:03Z",
"updated_at": "2022-06-10T14:57:03Z"
}
Hello all, I am trying to read messages in the above format from a Kafka source using the Flink Table API. I am having difficulty parsing the above json structure to extract latitude
and longitude
. I am using the following code to try to fetch id and latitude/longitude from this message.
tEnv.executeSql("CREATE TABLE customer_retailer_shop_details (\n" +
" data ROW<`customer_id` BIGINT, `shop_location` ROW<latitude FLOAT, longitude FLOAT>>\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'customer-retailer-shop-details',\n" +
" 'properties.bootstrap.servers' = 'redacted',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");
The problem is that when I print out the result, customer_id
is correctly populated but latitude
and longitude
are null. Could this be happening due to escape characters in latitude/longitude nested field? If so, how would we deal with this in the Table API?Jeff Levesque
06/23/2022, 7:53 PMFIRST_VALUE
, and `LAST_VALUE`:
CREATE TABLE test
(id integer, session_ID integer, value integer, utc timestamp)
;
INSERT INTO test
(id, session_ID, value, utc)
VALUES
(0, 2, 100, '2022-06-23 16:38:00'),
(1, 2, 120, '2022-06-20 16:38:00'),
(2, 2, 140, '2022-06-21 16:38:00'),
(3, 1, 900, '2022-06-11 16:38:00'),
(4, 1, 800, '2022-06-29 16:38:00'),
(5, 1, 500, '2022-06-23 12:38:00')
;
SELECT
jeff.first_value,
jeff2.last_value
FROM (
SELECT
id,
value AS first_value,
NULL AS last_value
FROM (
SELECT
1 AS id,
value,
dense_rank() OVER (ORDER BY utc) AS first_value
FROM test
) inner_jeff
WHERE first_value = 1
) AS jeff
INNER JOIN (
SELECT
id,
NULL AS first_value,
value AS last_value
FROM (
SELECT
1 AS id,
value,
dense_rank() OVER (ORDER BY utc DESC) AS last_value
FROM test
) inner_jeff2
) AS jeff2
ON jeff.id = jeff2.id
LIMIT 1;
However, this is contingent DENSE_RANK
support in PyFlink 1.13 (when I attempt to migrate the above syntax to be streaming based). When I checked the documentation, it seems to portray the same "N/A" pattern as FIRST_VALUE
and `LAST_VALUE`:
DENSE_RANK() N/A Returns the rank of a value in a group of values. The result is one plus the previously assigned rank value. Unlike the function rank, dense_rank will not produce gaps in the ranking sequence.
I'm going to hunch that it is not supported in PyFlink 1.13. Further, alternative approaches, namely RANK
, and ROW_NUMBER
each seem to have "N/A" in the same documentation. If the above general SQL syntax doesn't work in PyFlink, does anyone have an alternative approach they could recommend?Gaurav Miglani
06/24/2022, 7:16 AMGaurav Miglani
06/24/2022, 7:16 AM{"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.client.program.ProgramInvocationException: The program caused an error: \n\nClasspath: [file:/tmp/flink-web-8bc46ccd-f545-474c-8605-d084950afed1/flink-web-upload/38118da6-41f1-4e0b-9bb1-c69ee9662f3a_data-streamverse-core-1.0-SNAPSHOT-jar-with-dependencies.jar]\n\nSystem.out: (none)\n\nSystem.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:159)
at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:107)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NoClassDefFoundError: org/apache/commons/math3/util/ArithmeticUtils
at org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil$.mergeMiniBatchInterval(FlinkRelOptUtil.scala:439)
at org.apache.flink.table.planner.plan.rules.physical.stream.MiniBatchIntervalInferRule.onMatch(MiniBatchIntervalInferRule.scala:81)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
Emile Alberts
06/24/2022, 8:38 AMval hadoopConfig = Configuration()
hadoopConfig.set(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.WebIdentityTokenCredentialsProvider"
)
hadoopConfig.set("fs.s3a.assumed.role.arn", "<role-arn>")
val deltaSink = DeltaSink.forRowData(
Path(sinkConfiguration.deltaTablePath),
hadoopConfig,
rowType,
).withPartitionColumns("column1", "column2")
.build()
dataStream.sinkTo(deltaSink)
However, the role is not assumed then. The AWS_ROLE_ARN
and AWS_WEB_IDENTITY_TOKEN_FILE
environment variables are added. It works fine when I'm using the AWS credentials.
I'm using Flink 1.15.0
and version 0.4.1
of the Flink Delta connector. I've also added the flink-s3-fs-hadoop-1.15.0.jar
to $FLINK_HOME/plugins/s3/
.
What am I missing to be able to use the WebIdentityTokenCredentialsProvider
?
Thank you in advance for the assistance.Emily Morgan
06/24/2022, 8:57 AMThe heartbeat of JobManager with id [] timed out.
Disconnect job manager [] for job [] from the resource manager.
The heartbeat of TaskManager with id [] timed out.
Closing TaskExecutor connection [] because: The heartbeat of TaskManager with id [] timed out.
and finally
Could not fulfill resource requirements of job []. Free slots: 0
and the job fails.
Note that I don't have checkpointing enabled, and I can't see any out of memory errors in the logs either. Any ideas on what the issue might be?
Thanks 🙏Jeesmon Jacob
06/24/2022, 8:09 PMshikai ng
06/26/2022, 3:34 PMINITIALIZING to FAILED with failure cause: java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction retrieved invalid state.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
I assumed this was parallelism previously and have set maxParallelism to 1 (which reactive mode should respect) but to no avail? Does anyone have any suggestions? 🙂 🙏czchen
06/27/2022, 2:11 PMFailed sync attempt to 597d35a7434bede526f526852c33a65262765219: one or more objects failed to apply, reason: Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.flink-operator.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority (possibly because of "x509: invalid signature: parent certificate cannot sign this kind of certificate" while trying to verify candidate authority certificate "FlinkDeployment Validator") (retried 3 times).
This problem can be solved by rollout webhook-service. However, it would be better if operator itself and reload certificate automatically.Aitozi
06/27/2022, 2:12 PMROW_NUMBER()
) supports the batch and stream modes, but My job using the same query run with "execution.runtime-mode": "batch"
do not translate to a Deduplicate operator but a Sort
operator. After checking the code, it only have a rule for StreamPhysicalDeduplicateRule
and there is not a similar rule for batch. Is it a misleading in the doc?Krishna Chaithanya M A
06/27/2022, 8:57 PMKishore Pola
06/28/2022, 4:13 AMTheo Diefenthal
06/28/2022, 7:02 AMINFO
log messages: ProducerId set to -1 with epoch -1
and ProducerId set to 2359014 with epoch 0
. Investigating a bit into this shows me that those log events are no errors, can be ignored and indeed are on purpose polluting my logs 🙂 In an old flink mailing list post Tzu Li (Gordon) wrote: With that in mind, when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint. When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled. So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer.
which more or less describes what I see in Flink 1.14 codebase despite being an older post: For each new checkpoint, Flink will reuse a producer, but assigns a new transaction.id leading to reinitializing a new transaction-id (producer-broker roundtrip) and assigning a new producerId and epoch. My question is: Why does flink do this? From a Confluent blogpost about transactions, the "default" way to use transactions seems to assign a transactional.id once with application start and keep it from then on. I understand that the pattern used by flink works as well, but is there a specific purpose on why creating fully new transactional ids with each checkpoint having a fully runnig application? Was it easier to implement? Or are there details requiring flink to work that way? Is there a design doc or something where thoughts around all the internal stuff with regards to the Flink Kafka source/writer are shared?Gaurav Miglani
06/28/2022, 10:57 AMTrystan
06/28/2022, 6:13 PMcopy & delete
, and that this mechanism is slow in s3 itself.
are there any obvious metrics to confirm this? any configurations that might speed this up, or any suggestions to look for? at this point, we are leaning towards not using flink to sink to s3 at any sort of scale … but surely we are missing something 🙂Kishore Pola
06/28/2022, 8:07 PMserviceAccount: <mailto:stream-test-sa@test-project.iam.gserviceaccount.com|stream-test-sa@test-project.iam.gserviceaccount.com>
deployment is failing with error
org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: <https://x.y.z.1/apis/apps/v1/namespaces/default/deployments>. Message: Deployment.apps "flink-cluster-test-097" is invalid: spec.template.spec.serviceAccountName: Invalid value: "<mailto:stream-test-sa@test-project.iam.gserviceaccount.com|stream-test-sa@test-project.iam.gserviceaccount.com>": a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. '<http://example.com|example.com>', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*').
I think serviceAccount is copied to serviceAccountNameSergey Postument
06/29/2022, 8:43 AM// SIDE OUTPUT DOESNT WORK
val enrichedPacketData: DataStream[EnrichedData] = rawPackets
.keyBy(SOME_KEY_HERE)
.connect(SOME_CONNECT_EXP_HERE)
.process(new AssetDataEnrichment)
.map(_.packetAssetData.get)
val packetsNoStateData = enrichedPacketData
.getSideOutput(new OutputTag[AssetStatusUpdateStreamMessage]("no-state-tags-output"))
// WORKS GOOD
val enrichedPacketData: DataStream[EnrichedData] = rawPackets
.keyBy(SOME_KEY_HERE)
.connect(SOME_CONNECT_EXP_HERE)
.process(new AssetDataEnrichment)
val packetsNoStateData = enrichedPacketData
.getSideOutput(new OutputTag[AssetStatusUpdateStreamMessage]("no-state-tags-output"))
the only diff is having .map(_.packetAssetData.get)
I would be very grateful if someone could explain why this works 🙏
did not find information in the documentation ..Saket Sinha
06/29/2022, 1:40 PMGaurav Miglani
06/29/2022, 2:06 PM