Mohit Aggarwal
01/03/2023, 9:57 AMLiubov
01/03/2023, 11:09 AMJean-René Robin
01/03/2023, 1:03 PMGaurav Miglani
01/03/2023, 1:04 PMAlex Cramer
01/03/2023, 4:52 PMGET /jobs/{jobid}
endpoint (specifically the nested JobDetailsVertexInfo
). The HTML version matches the JSON response I see when I hit the endpoint so I think that’s the right one.
Can someone who knows more quickly sanity check me? Its an experimental feature, but I wasn’t sure if other people were using it w/o issues and whether I’m just mis-interpreting something.René
01/03/2023, 7:44 PMJason Politis
01/03/2023, 10:11 PMChen-Che Huang
01/04/2023, 1:54 AMSuparn Lele
01/04/2023, 5:32 AMSoumya Ghosh
01/04/2023, 5:53 AMKafka Source --> 1 min tumble window --> Elasticsearch table connector sink
We are observing some intermittent exceptions (posted in below thread), the flink job restarts and then works fine until it is encountered again after few hours.
There are 2 type of exceptions logged, one or the other, not both at the same time.
The root exception is - Caused by: <http://org.apache.flink.elasticsearch7.shaded.org|org.apache.flink.elasticsearch7.shaded.org>.apache.http.ConnectionClosedException: Connection is closed
Any thoughts on this?Rohan Kumar
01/04/2023, 6:41 AMDoğaç Eldenk
01/04/2023, 7:47 AMSlackbot
01/04/2023, 8:01 AMAmenreet Singh Sodhi
01/04/2023, 12:40 PMAbdelhakim Bendjabeur
01/04/2023, 1:04 PMFlink SQL> SELECT *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn FROM ticket;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode.
...
Flink SQL> SELECT *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` ASC) rn FROM ticket;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
The field is a timestamp though!
But using a CTE OR sub-query solved it.
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn FROM ticket
) s
WHERE rn = 1);
---------------------------------------------------------------------------------------------------------
WITH ticket_dedup AS (
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn
FROM ticket
) s
WHERE rn = 1)
SELECT * FROM ticket_dedup;
Is this a bug somehow or is there a reason behind it?bharat chaudhury
01/04/2023, 2:07 PMAdriana Beltran
01/04/2023, 3:41 PMSami Badawi
01/04/2023, 6:18 PMobj_path = "public.model_counts"
catalog_path = ObjectPath("postgres", obj_path)
print(catalog_path)
table_exists = catalog.table_exists(catalog_path)
print(table_exists)
table: CatalogBaseTable = catalog.get_table(catalog_path)
print("table:", table)
table_schema: Schema = table.get_unresolved_schema()
print("table_schema:", table_schema)
I can get the database table but as a CatalogBaseTable and I am not sure how to a query the data in the table.
Is there a simple JDBC example show how to run the query?Joris Basiglio
01/04/2023, 6:26 PMSami Badawi
01/04/2023, 8:00 PMsql = "SELECT entity_id FROM event_files"
table2 = t_env.sql_query(sql)
table2.execute().print()
I get this error despite not using any of the column that has jsonb type:
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation failed. Failed getting table postgres.event_files
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
... 15 more
Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet
at org.apache.flink.connector.jdbc.dialect.psql.PostgresTypeMapper.mapping(PostgresTypeMapper.java:173)
at org.apache.flink.connector.jdbc.catalog.PostgresCatalog.fromJDBCType(PostgresCatalog.java:147)
at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.getTable(AbstractJdbcCatalog.java:271)
... 40 more
Emmanuel Leroy
01/04/2023, 10:20 PMsap1ens
01/04/2023, 11:27 PMupgradeMode: last-state
) and some of them have failed with:
java.lang.IllegalStateException: There is no operator for the state c0bfb6158e09ccdb03a634ce9b2009b0
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733)
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598)
...
However, I haven’t changed anything Flink-related (was updating some K8s configs), it was using exactly the same Docker image, JAR, etc.
Is this a known issue (maybe addressed in later versions)?Deryl Rodrigues
01/05/2023, 5:48 AMSELECT
a.column1,
a.column2,
b.column1
FROM tableA AS a
JOIN hive_catalog.tableB /*+ OPTIONS('streaming-source.enable'='true') */ FOR SYSTEM_TIME AS OF a.proctime AS b
ON a.column1 = b.column1
I encounter the following error
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
Sami Badawi
01/05/2023, 8:33 AMcatalog: JdbcCatalog = JdbcCatalog(
name, default_database, username, password, base_url)
t_env.register_catalog("my_catalog", catalog)
t_env.use_catalog("my_catalog")
ddl = """
CREATE TABLE shipments_cdc (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'table-name' = 'shipments'
);
"""
t_env.execute_sql(ddl)
with stacktrace:
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco
raise java_exception
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `my_catalog`.`postgres`.`shipments_cdc`
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:888)
at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:649)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:929)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:310)
at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:660)
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:882)
... 14 more
Ari Huttunen
01/05/2023, 8:35 AMjaiprasad
01/05/2023, 9:16 AM2023-01-05 09:07:47,566 ERROR org.apache.flink.runtime.metrics.ReporterSetup [] - Could not instantiate metrics reporter prom. Metrics might not be exposed/reported.
java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusReporter
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:223) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) [flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525) [flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66) [flink-dist-1.16.0.jar:1.16.0]
Ari Huttunen
01/05/2023, 9:42 AMsql-client.sh
. I have an init script that creates a table that reads streaming data from Kafka. If I try to select some rows like select * from the_table limit 10;
I get nothing, just an empty result.
The same SQL statement embedded in Python provides data just fine. In pyflink I have a .wait()
.
Does the sql-client support streaming from kafka, and how should I fix this?Arun Lakra
01/05/2023, 10:27 AMEmmanuel Leroy
01/05/2023, 9:08 PMEmmanuel Leroy
01/06/2023, 12:34 AM