milk
05/22/2023, 8:24 AMSumit Singh
05/22/2023, 8:57 AM2023-05-22 06:32:10,019 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@private_ip:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@private_ip:6123/user/rpc/resourcemanager_*>.
sagar shinde
05/22/2023, 10:23 AMsagar shinde
05/22/2023, 11:24 AMPalani
05/22/2023, 2:31 PMJirawech Siwawut
05/22/2023, 4:15 PMAmir Hossein Sharifzadeh
05/22/2023, 4:34 PMValueState
in my application. Does anybody have an experience working with ValueState
, ListState
and EmbeddedRocksDBStateBackend
?Dylan Meissner
05/22/2023, 11:42 PMpublic class CustomValidator implements FlinkResourceValidator {
@Override
public Optional<String> validateDeployment(FlinkDeployment deployment) {
var namespace = deployment.getMetadata().getNamespace();
if (namespace == "foo") {
//...
}
return Optional.empty();
}
}
Bharathkrishna G M
05/23/2023, 12:28 AM2023-05-23 00:26:48,863 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2023-05-23 00:26:48,931 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2023-05-23 00:26:49,101 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
I don't see any stack trace in the logs or in the UI logs . This error is seen in jobmanager log.
Any idea how to find the root cause for this ?Samrat Deb
05/23/2023, 3:04 AMSonika Singla
05/23/2023, 5:38 AMSlackbot
05/23/2023, 5:44 AMbehrooz razzaghi
05/23/2023, 5:46 AMMohan M
05/23/2023, 5:55 AMmy_table
.postgres
.sink_table
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:915)
at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:652)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1000)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:208)
at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:663)
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:909)
... 14 more
Appreciate any help here
ThanksDheeraj Panangat
05/23/2023, 8:27 AMOscar Perez
05/23/2023, 11:18 AMShiv Desai
05/23/2023, 2:42 PMresultTable
column contains the complete json in one column of table. Can someone please help me in how to create table in such a way that each field of event is mapped to one column in the table.
DataStream<GenericRecord> dataStream = ...;
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table attribution = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("attribution", attribution);
Table resultTable = tableEnv.sqlQuery("SELECT f0 FROM attribution");
tableEnv.createTemporaryView("resultTable", resultTable);
tableEnv.sqlQuery("SELECT `field_name` from resultTable").execute().collect().forEachRemaining(System.out::println); // Throwing Exception.
Madhu Sudhan
05/23/2023, 3:12 PMJirawech Siwawut
05/23/2023, 3:30 PMKyle Godfrey
05/23/2023, 5:09 PMFIRE_AND_PURGE
to the next global window instead of the event being the last event in the global window that it’s closing. It looks like an evictor might be able to do this but the stipulation that an evictor requires all events to stay in state until the window is closed and the evictor run isn’t feasible (we’re aggregating data for thousands of assets, each with thousands of events for each asset so it’s just too much state to keep). Is there another way of pushing this event off to the next window?Shiv Desai
05/23/2023, 8:17 PMAlex Brekken
05/24/2023, 2:22 AMmetadata
in the CRD, but it doesn’t seem to get discovered by prometheus. Example:
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
namespace: flink
name: my-flink-job
labels:
customLabel: my-flink-application
But “customLabel” does not show up in Prometheus. Maybe I’m going about this the wrong way? Basically I just need a way to distinguish between multiple jobs running on the same FlinkDeployment
so I can toggle between them in a Grafana dashboard. Thanks for any help!Or Keren
05/24/2023, 10:13 AMTilman Krokotsch
05/24/2023, 11:22 AMimport os.path
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment
JAR_PATH = "<Path to Fat Jar>"
def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.add_jars(JAR_PATH)
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(
"""
CREATE CATALOG iceberg_catalog WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'warehouse'='s3://<My Bucket>',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'client.factory'='org.apache.iceberg.aws.AssumeRoleAwsClientFactory',
'client.assume-role.region'='<My Region>',
'client.assume-role.arn'='<My Role>'
)
"""
)
source_table = t_env.from_path(
"iceberg_catalog.<My Database>.<My Table>"
)
t_env.to_data_stream(source_table).print()
env.execute()
if __name__ == "__main__":
example()
This is the error message:
File "<My venv>/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 58, in decode_from_stream
return self._value_coder.decode_from_stream(data_input_stream)
TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
Did anyone encounter something similar before? Thanks in advance.Oscar Perez
05/24/2023, 12:47 PMOscar Perez
05/24/2023, 1:58 PMMatthew Kerian
05/24/2023, 6:47 PMFailed: 1
. The checkpoint monitoring doc page says there should be a way to view more information about this but I can't find that option. Would appreciate a pointer, thanksZhong Chen
05/24/2023, 10:56 PMHJK nomad
05/25/2023, 1:18 AMSumit Nekar
05/25/2023, 7:03 AM