Antonio Manuel
07/21/2022, 4:40 PMEmily Morgan
07/22/2022, 7:52 AMjava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []
which is preventing me from reading from my Kafka source topic. The Kafka instance requires SASL_PLAINTEXT protocol with SCRAM-SHA-256 mechanism. When I run Flink locally (and only Kafka is running in a docker container) I do not get this error, however when I run both services inside respective docker containers it occurs. Flink and Kafka services are running on the same network as they are in the same docker-compose file. This is what the Flink docker-compose setup looks like:
services:
jobmanager:
image: flink:1.15-scala_2.12
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.15-scala_2.12
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
...
Jaya Ananthram
07/22/2022, 8:22 AMaws-msk-iam-auth
. As per this, I think I should not add aws-msk-iam-auth
in the plugin because Kafka comes in my shaded jar. Any idea?Jeesmon Jacob
07/22/2022, 2:06 PMflink
cli to a cluster created through FlinkDeployment
? Trying to see if there will be any conflicts in kubernetes operator when it sees a job not managed by the operator.
Also, in this scenario, is there a plan (in future) to sync unmanaged job to a FlinkSessionJob
so that it can be managed by operator? I have seen similar approach in Strimzi Kafka operator for a two way sync for Topics. So trying to see if anyone thought about it 🙂Yahor Paulikau
07/22/2022, 6:20 PMSucheth Shivakumar
07/23/2022, 5:20 AMpublic class AvroDataSerializationSchema implements SerializationSchema<GenericRecord> {
private final String registryUrl;
private final String topic;
private transient KafkaAvroSerializer kafkaAvroSerializerClient = null;
public AvroDataSerializationSchema(String registryUrl, String topic) {
this.registryUrl = registryUrl;
this.topic = topic;
}
@Override
public void open(InitializationContext context) throws Exception {
SerializationSchema.super.open(context);
}
@Override
public byte[] serialize(GenericRecord element) {
checkInitialized();
return kafkaAvroSerializerClient.serialize(topic, element);
}
private void checkInitialized() {
if (kafkaAvroSerializerClient == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
// props.put("schema.compatibility.level", "forward_transitive");
// props.put("auto.register.schemas", false);
SchemaRegistryClient client = new CachedSchemaRegistryClient(
registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
kafkaAvroSerializerClient = new KafkaAvroSerializer(client, props);
}
}
Problem is schema in the GenericRecord has new reference for every generic record, and in the CachedSchemaRegistryClient they are storing schema reference in the map as a key and schema id as a value. with the new instance of the schema in every GenericRecord it is overflowing the map size of 1000. Below is the code from CachedSchemaRegistryClient
public synchronized int register(String subject, Schema schema, int version, int id) throws IOException, RestClientException {
Map<Schema, Integer> schemaIdMap = (Map)this.schemaCache.computeIfAbsent(subject, (k) -> {
return new HashMap();
});
Integer cachedId = (Integer)schemaIdMap.get(schema);
if (cachedId != null) {
if (id >= 0 && id != cachedId) {
throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + id);
} else {
return cachedId;
}
} else if (schemaIdMap.size() >= this.identityMapCapacity) {
throw new IllegalStateException("Too many schema objects created for " + subject + "!");
} else {
int retrievedId = id >= 0 ? this.registerAndGetId(subject, schema, version, id) : this.registerAndGetId(subject, schema);
schemaIdMap.put(schema, retrievedId);
((Map)this.idCache.get((Object)null)).put(retrievedId, schema);
return retrievedId;
}
}
Anyone encountered this before ? Can someone please point put what am i doing wrong here ?Ali Zia
07/23/2022, 3:13 PMException:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Functions implemented for the old type system are not supported.
from pyflink.table import TableEnvironment, EnvironmentSettings, CsvTableSource
from pyflink.table.expressions import col
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
kinesis_source_ddl = """
CREATE TABLE source1 (
a STRING,
b STRING,
datetime TIMESTAMP(3),
test ARRAY<STRING>
)
WITH (
'connector' = 'kinesis',
'stream' = 'test',
'aws.region' = 'us-west-2',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
"""
@udf(result_type=DataTypes.FLOAT())
def semantic_analysis():
return [0, 0, 0]
t_env.execute_sql(kinesis_source_ddl)
t_env.from_path("source1").select(
col("a"),
col("b"),
col("datetime"),
col("test"),
semantic_analysis(col("a")).alias("semantic_scores")
).execute()
Jaya Ananthram
07/24/2022, 11:41 AMorg.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException
Jeff Levesque
07/25/2022, 12:50 AMMemorySink
[1] and DataBricks has memory
tables [2]. Can I do something similar in (Py)Flink, or should I sink to AWS S3, or AWS Kinesis?
---
[1] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-MemorySink.html
[2] https://docs.databricks.com/getting-started/spark/streaming.html#start-the-streaming-jobSucheth Shivakumar
07/25/2022, 2:57 AMenv.java.opts: -DDATA_STREAM_TENANT=local
i can set this in flink-conf.yml and access it in main class where im setting ExecutionEnvironment.
but System.getProperty("DATA_STREAM_TENANT") returns null else where. can someone please help me on how to do it. ?chunilal kukreja
07/25/2022, 7:34 AMnumRecordsOutPerSecond
chunilal kukreja
07/25/2022, 9:11 AMDieter
07/25/2022, 12:01 PMJin Yi
07/25/2022, 2:48 PMDylan Meissner
07/25/2022, 3:08 PMhelm repo add flink-operator-repo <https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/>
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
Which works. But in Operations > Helm it invites us to
helm install flink-kubernetes-operator helm/flink-kubernetes-operator
Which fails because the repo doesn’t exist. Is there a plan for the second method to work?Sucheth Shivakumar
07/25/2022, 4:29 PMGiannis Polyzos
07/25/2022, 4:40 PMErik Wickstrom
07/25/2022, 9:54 PMA
), and I’m performing a LEFT JOIN
against a kafka source (table B
) that contains a few hundred GB of slowly changing state (my intention is for that state to get cached into flink’s rocksdb backend.). I’m using a Versioned Table View to accomplish this.
The task reading in the large state for table B
from Kafka is 100% backpressured, and the task populating the Versioned Table View is Busy at 100% (Using the ROW_NUMBER()
SQL window function pattern described in the docs linked above.
I tried updating my FlinkDeployment.parallelism: 2
from 1
— but the Flink UI is still reporting “Parallelism: 1” for all tasks. Am I dialing the right knob to fix this issue? Or should I be looking at something else?Echo Lee
07/26/2022, 12:48 AMrohit jangid
07/26/2022, 1:56 AMchunilal kukreja
07/26/2022, 6:42 AMAli AIT-BACHIR
07/26/2022, 12:33 PMJeesmon Jacob
07/26/2022, 1:01 PMSome caveats (and explanations)
There is no support at this time for upgrading or deleting CRDs using Helm. This was an explicit decision after much community discussion due to the danger for unintentional data loss. Furthermore, there is currently no community consensus around how to handle CRDs and their lifecycle. As this evolves, Helm will add support for those use cases.
https://helm.sh/docs/chart_best_practices/custom_resource_definitions/
Interested to see how others are tackling this issue for automation.Ali AIT-BACHIR
07/26/2022, 3:13 PMchunilal kukreja
07/26/2022, 4:49 PMRoman Bohdan
07/26/2022, 10:41 PMErik Wickstrom
07/27/2022, 12:36 AMFlinkDeployment.spec.podTemplate
indicates that the operator wants to manage the pod directly.salvalcantara
07/27/2022, 8:38 AMcpu.utilization > 75% and mem.utilization > 75%
, which can be easily translated into SQL. However, I'm not totally convinced that alerting is a good use case for Flink SQL. My concern is more on the deployment/management side, namely:
• Users are expected to define their own alerts, so at the end of the day we will have one sql job per alert, which might be very expensive and/or difficult to manage...
In general, can anyone point me to existing alerting applications built on top of Flink? The best I could find for now is this blog post: https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html, which is based on DataStream API instead, maybe confirming my Flink SQL unsuitability hypothesis...Parthiban PR
07/27/2022, 8:46 AMCaused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
I get this error when deploying my project JAR into flink’s machine.
Flink version: 1.14.0
Kafka version: kafka_2.12-3.1.1Hunter
07/27/2022, 9:04 AM