Gerald Schmidt
02/16/2023, 6:17 PMReme Ajayi
02/16/2023, 6:42 PM.withTimestampAssigner()
on my Confluent Kafka sources. Bonus question, how do I know if flink already extracted timestamps from the records. The documentation, says that if timestamps are set in the headers of my records, Flink automatically retrieves them, does anyone know how I can check this?Herat Acharya
02/16/2023, 11:09 PMEnabling required built-in plugins
Linking flink-s3-fs-hadoop-1.16.1.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.16.1.jar
sed: couldn't open temporary file /opt/flink/conf/sedKxVpXP: Read-only file system
sed: couldn't open temporary file /opt/flink/conf/sed2jL9cQ: Read-only file system
/docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied
/docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
Starting kubernetes-session as a console application on host media-streaming-e1np-5c99487d59-5vwn2.
SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See <http://www.slf4j.org/codes.html#noProviders> for further details.
SLF4J: Class path contains SLF4J bindings targeting slf4j-api versions prior to 1.8.
SLF4J: Ignoring binding found at [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Ignoring binding found at [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.19.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See <http://www.slf4j.org/codes.html#ignoredBindings> for an explanation.
Tony Wang
02/16/2023, 11:45 PMFlink SQL> SET 'table.exec.mini-batch.enabled' = 'true';
Flink SQL> SET 'table.exec.mini-batch.allow-latency' = '5s';
Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
Does watermarks set like this: create TABLE users_table (ts TIMESTAMP(3), user_id INT, category INT, price FLOAT, featureA INT, WATERMARK FOR ts AS ts) WITH ('connector'='filesystem','path'='/home/ziheng/streaming-stuff/datagen/users.csv','format'='csv');
even work? When I printed out that table it showed a watermark column but that watermark is based on the processing time on my computer instead of the ts
column defined.Anjiang Wei
02/17/2023, 1:34 AMCaused by: java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
We have tried hard to resolve the dependency issue, but still failed. We have written down a readme to describe the problem: <https://github.com/Anjiang-Wei/flink/blob/tony/dataset/README.md>
Any feedback will be appreciated, many thanks!Anjiang Wei
02/17/2023, 1:37 AMTony Yeung
02/17/2023, 2:09 AMMadan
02/17/2023, 3:23 AMorg.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could not send message [RemoteFencedMessage(00000000000000000000000000000000, RemoteRpcInvocation(null.updateTaskExecutionState(TaskExecutionState)))] from sender [Actor[<akka.tcp://flink@ip-10-121-176-58.vpc.internal:42393/temp/jobmanager_2$Rs]]> to recipient [Actor[<akka://flink/user/rpc/jobmanager_2#-1101702724]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
Tony Wang
02/17/2023, 5:02 AM肖文浩
02/17/2023, 9:08 AMDheeraj Panangat
02/17/2023, 11:01 AMSai Sharath Dandi
02/17/2023, 8:11 PMJeremy Ber
02/17/2023, 10:02 PM.jpg
in Amazon S3, and I want to use the FileSource to read those in, is there any out of the box way to do this? Currently I am reading them in using the TextLineInputFormat
and trying to convert this to an image.Rohan Kumar
02/18/2023, 11:49 AMKrish Narukulla
02/18/2023, 8:32 PMCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.16.1.jar:1.16.1]
... 13 more
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101) ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
at org.apache.flink.table.api.brid
Jagan Nalla
02/19/2023, 11:17 PMTyler Wood
02/20/2023, 12:27 AM%flink.ssql
in a zeppelin notebook on AWS. Have a field created_at
that is a double and is getting read into zeppelin as eg 1.293848576E9
When I try to apply FROM_UNIXTIME
to cast it to a timestamp I get the following error:
SQL validation failed. From line 1, column 8 to line 1, column 49: Cannot apply 'FROM_UNIXTIME' to arguments of type 'FROM_UNIXTIME()'. Supported form(s): 'FROM_UNIXTIME()'
'FROM_UNIXTIME(, )'
the query is
%flink.ssql(type=update)
SELECT FROM_UNIXTIME(created_at) FROM table;
What does that error message mean? it looks like i’m following exactly the function signature…Raghunadh Nittala
02/20/2023, 4:30 AMfile is too short
exceptions in the environment.
Error while opening RocksDB instance.
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.loadDb(RocksDBHandle.java:134) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBHandle.openDB(RocksDBHandle.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:243) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:222) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:189) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:169) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:325) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:503) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:98) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) [flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) [flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) [flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) [flink-dist-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.rocksdb.RocksDBException: file is too short (4044 bytes) to be an sstable/opt/flink/rocksdb/<jobId>
Can someone please provide inputs on this?Abdelhakim Bendjabeur
02/20/2023, 11:32 AMouter
!
To my understanding, the outer
keyword is superfluous and should not change the result of the join. Am I missing something?
with t1 AS (
select
...
from `table1` t1
),
t2 AS (
select
...
from `table1` t2
)
select
*
FROM t1
left [OUTER] join t2
ON t1.accountId = t2.accountId
AND t1.id = t2.id
The change contains only the create
operation when using the outer
, whereas it contains create
-delete
-create
in a simple left join (with null on the second part of the dataAri Huttunen
02/20/2023, 12:47 PMBenjamin Wootton
02/20/2023, 1:22 PMBenjamin Wootton
02/20/2023, 1:23 PMBenjamin Wootton
02/20/2023, 1:23 PMNathanael England
02/20/2023, 8:20 PMdatastream.add_sink(<one test sink>)
and then datastream.get_side_output(<output tag>).add_sink(<another test sink>)
. However, my side outputs are showing up in the first sink. I followed the example in https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ which seemed quite simple, so I'm not sure what I could do wrong.Jeremy Ber
02/20/2023, 9:34 PMNathanael England
02/21/2023, 1:37 AMstate.MapStateDescriptor('example', Types.STRING(),
Types.MAP(<http://Types.INT|Types.INT>(), Types.STRING()))
Nathanael England
02/21/2023, 2:01 AMMapState.items()
from my process_element
function but get the following error when calling it from open
TypeError: None has type NoneType, but expected one of: bytes, unicode
Does that just mean the state hasn't actually been created yet?Ting Yin
02/21/2023, 2:36 AMTing Yin
02/21/2023, 2:41 AMNathanael England
02/21/2023, 4:55 AMis_empty()
returned False
, but list(keys())
and list(values())
are both empty lists.