Slackbot
11/04/2022, 9:34 AMVincent canuel
11/04/2022, 10:09 AMEmmanuel Leroy
11/04/2022, 4:50 PMPrasaanth Neelakandan
11/04/2022, 9:07 PMEmmanuel Leroy
11/04/2022, 9:08 PMorg.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID
and I’m told it is a feature used for transactions.
On the same kafka platform, with a simple kafka consumer, I’m able to subscribe to multiple topics, so is the Flink KafkaSource making use of the transaction feature somehow?Steven Zhang
11/04/2022, 10:39 PMRM
11/05/2022, 1:55 PMAdrian Chang
11/05/2022, 4:11 PMDan Hill
11/05/2022, 10:05 PMBill G
11/06/2022, 10:34 PMschema
. It will throw an exception Expecting type to be a PojoTypeInfo
. The problem is that the avro schema compiler generates the getter getSchema$()
because getSchema()
is always generated to return the field SCHEMA$
. Is it possible for Flink's deserializer to ever be generalized for this case or is this not worthy of an issue? I don't necessarily have control of the schemas. In particular, messages generated by Debezium include a source schema with the field schema
included. It is a nested field so I can't drop or rename this field using the ReplaceField
transform for Kafka, and I do not want to flatten the structure.Arnon
11/07/2022, 6:06 AM2022-11-06T18:11:49.010167928Z Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.
2022-11-06T18:11:49.010170313Z
2022-11-06T18:11:49.010172619Z Table options are:
2022-11-06T18:11:49.010174700Z
2022-11-06T18:11:49.010176956Z 'connector'='kafka'
2022-11-06T18:11:49.010179236Z 'format'='json'
2022-11-06T18:11:49.010181470Z 'properties.bootstrap.servers'='kafka:9092'
2022-11-06T18:11:49.010183706Z 'scan.startup.mode'='earliest-offset'
2022-11-06T18:11:49.010185846Z 'topic'='transactions'
:
:
2022-11-06T18:11:49.010475640Z Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
2022-11-06T18:11:49.010478276Z
2022-11-06T18:11:49.010480410Z Available factory identifiers are:
2022-11-06T18:11:49.010482603Z
2022-11-06T18:11:49.010484875Z blackhole
2022-11-06T18:11:49.010486997Z datagen
2022-11-06T18:11:49.010489065Z filesystem
2022-11-06T18:11:49.010491014Z print
Matteo De Martino
11/07/2022, 10:06 AMCREATE TABLE
and I set up a watermark
column called _ingested_timestamp
.
CREATE TABLE IF NOT EXISTS MY_TABLE
(
id INT NOT NULL,
...
...
_ingested_timestamp TIMESTAMP(3),
WATERMARK FOR _ingested_timestamp AS _ingested_timestamp - INTERVAL '10' SECOND,
PRIMARY KEY(id) NOT ENFORCED
)
WITH(...);
I then have a SELECT
using data from the above table, something like this:
SELECT
item.id,
...
...
item._ingested_timestamp as event_time,
FROM MY_TABLE as item JOIN ....;
However, this fails with an exception around Conversion to relational algebra failed to preserve datatypes
.
Specifically, there seems to be a mismatch with due to the fact that event_time
is expected to be ROWTIME
but it is not (or the other way around):
validated type:
RecordType(INTEGER NOT NULL id, ..., TIMESTAMP(3) *ROWTIME* event_time) NOT NULL
converted type:
RecordType(INTEGER NOT NULL id, ..., TIMESTAMP(3) event_time) NOT NULL
As you can see, the only difference in the exception message seems to be about the type of event_time
.
Can anyone help me our here?
What exactly is the issue here, and how can I use a rowtime field in a SQL without facing this issue?
ThanksKosta Sovaridis
11/07/2022, 10:50 AMSumit Nekar
11/07/2022, 12:27 PMArnon
11/07/2022, 12:30 PM{
"order_id": 1,
"price": 2.99,
"product": {
"name": "p1",
"size": "big"
}
How can map them to Flink table?
Since all the nested objects are key-value I can store them in Map data type, but can't find how to map it, a sample in java api will be even greater
ThnaksPedro Mázala
11/07/2022, 2:23 PMsink
a final step on Flink or not?
I mean, can I have a stream that sinks on ES and after sinking (only when it gets stored) run an extra step? Just like a callback.Abhinav sharma
11/07/2022, 3:26 PMEmmanuel Leroy
11/07/2022, 4:12 PMoci://
, much like the s3-fs-hadoop connector abstracts S3, but as a 3rd party component.
Using that, Flink Hadoop fs is able to authenticate and get setup, but then it fails on the scheme check and I get the error:
Recoverable writers on Hadoop are only supported for HDFS
I have tried the S3 connector (flink-s3-fs-hadoop) with the S3 compatibility mode, which works fine, but has some limitations on OCI, and I’d rather use the native HDFS connector.
I’m trying to understand what is the RecoverableWriter functionality / limitation, to see if the OCI HDFS connector actually fits the requirements, and it’s merely the check on the scheme that is preventing me from using it (without having to fork Flink and trying to bypass the check myself)
Is anyone familiar with this check and what it is really checking for by enforcing the hdfs:// scheme?
ThanksJason Politis
11/07/2022, 7:05 PMRICHARD JOY
11/07/2022, 7:55 PMCaused by: <http://org.apache.flink.kubernetes.shaded.io|org.apache.flink.kubernetes.shaded.io>.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: <https://10.19.0.1/apis/apps/v1/namespaces/ns-1/deployments>. Admission webhook "validation.gatekeeper.sh"denied the request: [deployment-must-have-costcenter] All deployments must have a 'app_id', 'app_class', 'env' label set.
Any help is so much appreciated. Thanks!Emmanuel Leroy
11/07/2022, 10:00 PMKrish Narukulla
11/07/2022, 10:16 PMscylladb
or cassandra
, let me know , we would need such capability.Emmanuel Leroy
11/07/2022, 11:39 PMMatt Fysh
11/08/2022, 1:49 AMCaused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator
at java.base/java.net.URLClassLoader.findClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
Jason Politis
11/08/2022, 4:50 AMSuna Yin
11/08/2022, 9:11 AMAdrian Chang
11/08/2022, 3:32 PMAvroRowDeserializationSchema
will fail because of the first 5 bytes of the record containing the Schema ID.
Is it a good option using the SQL connector and then convert the table into a stream ? I know this will add some latency.
Is there any other option ? We use Python
thanksEmmanuel Leroy
11/08/2022, 4:59 PMlocal://
scheme errors with
Error: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'local'.
and when using file://
I get:
java.io.FileNotFoundException: /opt/flink/examples/streaming/flink-demo-1.0-SNAPSHOT.jar
when this is where I put the jar in my docker image and that works in a FlinkDeployment template.
I had to use a remote URI to make it work.
So where are the jars supposed to be located if i want to put them in the image, when using session mode?Sachin Saikrishna Manikandan
11/08/2022, 5:32 PMYaroslav Bezruchenko
11/08/2022, 7:07 PM