James Watkins
06/08/2023, 10:12 AMtransactions
stream with a users
stream, both Kafka source, using the avro-confluent
value.format so that I can deserialise the Avro message using a schema registry. Looks like this format is only available in the Table API, so I have defined my source connectors using that.
I’m trying to join the streams using an Event-Time Temporal Table Join. I added a primary key to the users
table but I get the following error message:
The Kafka table 'default_catalog.default_database.users' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
Is it possible to use an Event-Time Temporal Table Join with the ‘avro-confluent’ data format?Or Keren
06/08/2023, 11:11 AMAbhishek Gupta
06/08/2023, 11:17 AMAbhishek Gupta
06/08/2023, 11:27 AMRaghunadh Nittala
06/08/2023, 2:32 PMCREATE TABLE sink_table_s3 (
event_id STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
eventId STRING NOT NULL,
eventName STRING NOT NULL,
`date` STRING
) PARTITIONED BY (eventId, eventName, `date`) WITH (
'connector' = 'filesystem',
'path' = '<path>',
'format' = 'parquet',
'auto-compaction' = 'true'
);
Insert query:
INSERT INTO sink_table_s3
SELECT event_id, event_type, event_name,
DATE_FORMAT(proc_time, 'yyyy-MM-dd') AS `date`, event_id AS eventId, event_name AS eventName
FROM source_table;
I’m adding eventId, eventTime just to make sure those columns are also available in the Parquet file in S3. How can we avoid small files being created?Elizaveta Batanina
06/08/2023, 3:53 PMwatermark latency
in our pipeline to track e2e latency (Is it the correct metric to use when using metrics.latency.granularity: "single"
? 🤔). Sometimes, latency is negative, what would it mean? In flink code watermark latency is defined as processing_time - watermark
.
Another question we have is that when using python udfs, PythonGroupAggregate Operator is used, which doesn’t track watermark latency
. How we can use this metric with python?
P.S. we are using flinnk 1.17Gaurav Gupta
06/08/2023, 6:22 PMpackage org.example;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
public class FlinkCEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env.fromElements(
new Event(1, "start"),
new Event(2, "middle"),
new Event(3, "end")
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(SimpleCondition.of(event -> event.getName().equals("start")))
.followedBy("middle")
.where(SimpleCondition.of(event -> event.getName().equals("middle")))
.followedBy("end")
.where(SimpleCondition.of(event -> event.getName().equals("end")));
DataStream<String> output = CEP.pattern(events, pattern)
.select(new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
StringBuilder result = new StringBuilder();
for (Event event : pattern.get("start")) {
result.append(event.getId()).append("-");
}
result.append(pattern.get("middle").get(0).getId()).append("-");
result.append(pattern.get("end").get(0).getId());
return result.toString();
}
});
output.print();
// Execute the job
env.execute("Flink CEP Pattern Example");
}
}
Pankaj Singh
06/08/2023, 6:24 PM"bootstrap.servers": "kafka-server:9094",
"group.id": "fin-topic-name",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='passwd';",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_PLAINTEXT"
My program is reading from kafka topic and doing some transformations.
Followed this doc https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html for configuring KDA.
Getting below error when deployed on KDA (but working on local), any idea?
Exception:
"exception-classifier.filters.user-exception-stack-regex.configuration, Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. java.lang.InterruptedException. sleep interrupted.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run;Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. org.apache.flink.kinesis.shaded.com.amazonaws.AbortedException.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run.+Caused by. java.lang.InterruptedException. sleep interrupted;Caused by. org.apache.kafka.common.errors.TimeoutException. Timed out waiting for a node assignment;Caused by. org.apache.kafka.common.errors.SaslAuthenticationException. Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512;Caused by. org.apache.kafka.common.errors.TopicAuthorizationException. Topic authorization failed"Nihar Rao
06/08/2023, 7:06 PMAnkam Praveen
06/09/2023, 8:19 AMJuan Carlos Gomez Pomar
06/09/2023, 8:31 AMUnrecoverableS3OperationException
AmazonS3Exception: The specified key does not exist.
When we look in S3, the corresponding key does not exist.
It happens after a restart like in this post https://lists.apache.org/thread/qtwho32yzhqhbx7jhnr556bslhpc0x9k. However, there is no error before the above mentioned exception.
I can also see it pointed out in prestodb https://github.com/prestodb/presto/issues/18154#issuecomment-1284415321.
• Have you find this error?
• Is there a way to debug this error?
Note: Apparently, this is happening in the jobs that use Ververica Platform High Availability feature, however, it is difficult to pinpoint if that is the true root cause since the error does not happen consistently.
Any advice or insights would be greatly appreciated!Felix Hildén
06/09/2023, 12:01 PMexecute
, but for debugging I’d like to change logging levels. Currently I only see warning and up, and I haven’t found a way to show more. In a proper deployment on K8s all logs are visible thanks to configuring logConfiguration:log4j-console.properties
. Is there a way to change the level with some configuration either in code or in a config file? The flink deployment is pointing to the same exact entry point in Python, so surely there must be, but merely having a log4j-console.properties
file didn’t do the job. Where am I going wrong?Hussain Abbas
06/09/2023, 1:35 PMGaurav Gupta
06/09/2023, 2:15 PMClen Moras
06/09/2023, 2:30 PMOscar Perez
06/09/2023, 3:14 PMval accountChangedStream = accountDataStreamFactory.get().keyBy { it.accountUserId }
val accountActivityStream = accountActivityDataStreamFactory.get().keyBy { it.userId }
val csvStream = factory.get().keyBy { it.userId }.process(ReadFirstTimeProcessor())
val joinedStream = accountActivityStream.union(csvStream)
val clusterChanges: SingleOutputStreamOperator<CustomerClusterEvent> =
accountChangedStream
.connect(joinedStream)
.process(ActivityClusteringProcessor())
.name(dataStreamName)
Stefan Bumpus-Barnett
06/09/2023, 4:00 PMSELECT COUNT(*)
queries. But, when we try to run SELECT *
queries we get records with all null
fields. The Kafka topic uses Confluent Schema registry to hold the schemas. I was wondering if the fields are not being deserialized properly. Any help would be greatly appreciated!
Query:
CREATE TABLE account
(
`ACCT_COND_ID` INT,
`ACCT_ID` INT,
`ACCT_COND_NM` STRING,
`ACCT_COND_STRT_TS` TIMESTAMP(3),
`ACCT_COND_STOP_TS` TIMESTAMP(3),
`SRC_ACCT_ID` STRING,
`UPDATE_TS` TIMESTAMP(3),
`SRC_ACCT_ID2` STRING,
`SRC_SYS_CD` STRING,
`SRC_ACCT_SYS_CD` STRING
) WITH (
'connector' = 'kafka',
'topic' = '**********',
'properties.bootstrap.servers' = '**********',
'key.format' = 'avro-confluent',
'key.fields' = 'ACCT_COND_ID',
'key.avro-confluent.url' = 'https://**********',
'key.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'key.avro-confluent.basic-auth.user-info' = '**********',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'https://**********',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '**********',
'scan.startup.mode' = 'earliest-offset',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.jaas.config' = '**********'
);
SELECT *
FROM account
LIMIT 10;
Output:
See attached picAmir Hossein Sharifzadeh
06/10/2023, 7:31 PMCaused by: org.apache.flink.util.FlinkException: Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.
at <http://org.apache.flink.streaming.api.operators.co|org.apache.flink.streaming.api.operators.co>.IntervalJoinOperator.processElement(IntervalJoinOperator.java:227)
at <http://org.apache.flink.streaming.api.operators.co|org.apache.flink.streaming.api.operators.co>.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:196)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:88)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
Amir Hossein Sharifzadeh
06/10/2023, 7:33 PMAmir Hossein Sharifzadeh
06/10/2023, 8:41 PMassignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy._create_());
to both streams!guenter hipler
06/11/2023, 7:30 AMParmveer Randhawa
06/12/2023, 2:37 AMParmveer Randhawa
06/12/2023, 2:37 AMdino bin
06/12/2023, 8:36 AMZin
06/12/2023, 8:45 AMAsish Upadhyay
06/12/2023, 9:09 AM吉春
06/12/2023, 9:27 AMorg.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:528) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:236) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1028) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:123) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:121) ~[?:?]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at com.dlink.executor.Executor.executeStatementSet(Executor.java:351) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
at com.dlink.executor.Executor.submitStatementSet(Executor.java:367) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
at com.dlink.app.flinksql.Submitter.submit(Submitter.java:143) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
at com.dlink.app.MainApp.main(MainApp.java:34) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_121]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:303) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:256) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_121]
at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_121]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_121]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_121]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_121]
Ari Huttunen
06/12/2023, 9:39 AMjava.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
, and they get solved by adding the following to lib
directory with Ansible, but is this how we're supposed to fix this? Edit: version 1.17.1, and the plugins were copied as instructed by the manual, we're using pyflink.
flink_dependencies:
- group_id: org.apache.flink
artifact_id: flink-sql-parquet
version: "{{ flink_version }}"
- group_id: org.apache.hadoop
artifact_id: hadoop-common
version: "{{ hadoop_version }}"
- group_id: org.apache.hadoop
artifact_id: hadoop-mapreduce-client-core
version: "{{ hadoop_version }}"
- group_id: com.fasterxml.woodstox
artifact_id: woodstox-core
version: 5.3.0
- group_id: org.codehaus.woodstox
artifact_id: stax2-api
version: 4.2.1
- group_id: com.google.guava
artifact_id: guava
version: 11.0.2
- group_id: commons-logging
artifact_id: commons-logging
version: 1.1.3
Zhang Zhao
06/12/2023, 10:17 AMslowratatoskr
06/12/2023, 2:58 PMSUM0
in flink sql?