https://flink.apache.org/ logo
Join SlackCommunities
Powered by
# troubleshooting
  • j

    Jaehyeon Kim

    02/28/2023, 2:11 AM
    Hello, I'm testing a simple aggregation function using a python udf. It obtains the number of words and characters in values by key. It also concatenates value strings. It works fine for print sink but fails for kafka sink with the following error.
    Copy code
    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.output' doesn't support consuming update changes which is produced by node PythonGroupAggregate(groupBy=[k], select=[k, *org.apache.flink.table.functions.python.PythonAggregateFunction$82f60ecbb5d8eed484a7f66dd2ee2921*(k, v) AS TMP_0])
    Below show the print sink output.
    Copy code
    # input messages
    a:a
    b:hello
    b:world
    b:hi
    
    # print
    1> +I[a, 1, 1,  a]
    1> +I[b, 2, 10,  hello world]
    1> -U[b, 2, 10,  hello world]
    1> +U[b, 3, 12,  hello world hi]
    The script and full error messages are attached. Can you please guide me how to fix it?
    s5_03_aggregation.pyaggregation_error.txt
    d
    r
    • 3
    • 6
  • c

    ConradJam

    02/28/2023, 6:00 AM
    Hi Developers, I would like to consult a question about SessionCluster OOMKilled in JobManager I run a SessionCluster that deploys multiple jobs on K8S, and the JobManager is occasionally OOMKilled by K8S (not JVM Kill), which confuses me very much. The memory size of the K8s request and limit I configured is exactly the same as the value of jobmanager.memory.process.size. I don’t know if this will cause k8s to kill. I set (jobmanager.memory.enable-jvm-direct-memory-limit=true) according to the community guidelines [1], it doesn't seem to improve Flink Version: 1.16.1 jobmanager.memory.process.size: 1gb K8S Version: 1.24.x [1] https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/memory/mem_trouble/#container-memory-exceeded
  • s

    Slackbot

    02/28/2023, 10:08 AM
    This message was deleted.
    m
    k
    • 3
    • 21
  • l

    Lucas Alcântara Freire

    02/28/2023, 11:50 AM
    Hello all, I am using Flink with kinesis and facing this, have anyone faced something similar or has any clue what can be the reason? We use flink 1.16 with java 8 This cause our job to restart
    Copy code
    2023-02-28 11:48:55,553 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] {} - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ...
    java.lang.InterruptedException: null
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_362]
    	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) ~[?:1.8.0_362]
    	at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_362]
    	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:875) ~[cdp-flink-jobs.jar:?]
    	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:760) ~[cdp-flink-jobs.jar:?]
    	at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392) ~[cdp-flink-jobs.jar:?]
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    2023-02-28 11:48:55,553 INFO  org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] {} - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 3 ...
    j
    d
    • 3
    • 21
  • n

    Nithin kharvi

    02/28/2023, 12:04 PM
    Hi team, we are getting below error while running flink job in application mode in linux vm with checkpointing enabled. The error suggests that flink job doesnot have the access to base directory, but what is the base directory it is trying to access 2023-02-28 115624,256 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored. at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.0.jar:1.16.0] ... 4 more
  • a

    André Casimiro

    02/28/2023, 2:36 PM
    Hi, really appreciate if anyone can help with this question, thanks! 🙂 What are the options to sync input streams in time? https://stackoverflow.com/questions/75591993/in-flink-is-there-a-way-to-restrict-processing-in-strict-event-time-ordering
    e
    • 2
    • 1
  • a

    Amir Hossein Sharifzadeh

    02/28/2023, 5:43 PM
    Hello again. We will have to use our own serialization/deserialization, and our Kafka consumer and its serialization have been implemented in Python. Are there examples of customizing of serialization in Python? There are some examples on the Flink website but all are in Java and Scala. Thanks.
    d
    • 2
    • 7
  • r

    Roman Bohdan

    02/28/2023, 6:16 PM
    Hello, can the dataflow graph be processed without the iterations method?
  • k

    Kevin Lam

    02/28/2023, 6:29 PM
    👋 How does the Flink Kubernetes Operator determine if a job is healthy? From the docs:
    * When HA is enabled the
    savepoint
    upgrade mode may fall back to the
    last-state
    behaviour in cases where the job is in an unhealthy state.
    I'm wondering about the unhealthy state because I'm observing an issue where the operator won't take a new savepoint when
    savepointTriggerNonce
    is updated, or when a FlinkDeployment spec is changed and the
    upgradeMode
    is set to
    savepoint
    . Any help is appreciated!
    g
    • 2
    • 12
  • j

    Jasmin Redzepovic

    02/28/2023, 6:31 PM
    Hi 👋 , could someone please assist on this? I’m using Flink
    1.15.2
    and Flink Kubernetes Operator
    1.2.0
    After deploying a batch job, it completed successfully (job state: FINISHED), but the JobManager is still running. Is there a way to automatically clean up resources after batch job execution?
    l
    • 2
    • 4
  • y

    Yaroslav Bezruchenko

    02/28/2023, 7:09 PM
    Hello everyone, Can please someone suggest how to configure Flink 1.15 configuration, so it won't restart taskmanagers on each exception? Thing is that we have an Flink 1.14 app, which is not restarting taskmanagers on exceptions, but continues using same taskmanagers. Config of these seems identical, but I'm probably missing something
  • c

    Chloe He

    02/28/2023, 9:00 PM
    hi everyone! where has the visualization tool for Flink's execution plan moved to? i tried https://flink.apache.org/visualizer/ but this is giving 404 not found
    m
    • 2
    • 1
  • s

    Sofya T. Irwin

    02/28/2023, 9:48 PM
    Hi, I’m trying to run flink in a docker container and then use curl to access the rest api and seeing this error
    Copy code
    from the jobmanager: 
    :29:59.172 [kafka-admin-client-thread | flink.sqlclient-enumerator-admin-client] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 'kafka-admin-client-thread | flink.sqlclient-enumerator-admin-client':
    java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.message.ApiMessageType
    	at org.apache.kafka.common.protocol.ApiKeys.requestHeaderVersion(ApiKeys.java:341) ~[?:?]
    	at org.apache.kafka.clients.ClientRequest.makeHeader(ClientRequest.java:93) ~[?:?]
    ...
    	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196) ~[?:?]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.message.AlterReplicaLogDirsRequestData$AlterReplicaLogDir
    I’m using this api: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/#jars-jarid-run When I run the same job by logging into the container and executing it manually it runs ok. I’m trying to understand why running it manually works ok but running it via the REST api does not.
  • j

    Jagan Nalla

    02/28/2023, 11:49 PM
    Hello all, I’m reading a table from Postgres using the JDBC connector. There is a column with postgres data type uuid, and that fails with below error. ---- error ---- class java.util.UUID cannot be cast to class java.lang.String (java.util.UUID and java.lang.String are in module java.base of loader 'bootstrap') ---- code ------ ddl = """ CREATE TABLE source_table( hostname string, uuid string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/mydatabase', 'table-name' = 'users', 'username' = 'postgres', 'password' = 'postgres' ) """ t_env.execute_sql(ddl) note:- t_env is created using streaming mode t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    s
    m
    а
    • 4
    • 9
  • x

    Xiaosheng Wu

    03/01/2023, 2:22 AM
    Hello Flink community, I have a question about a strange issue I saw when I ran my flink job with Async IO operators . My job(parallelism == 1) will try to reach an flask external endpoint and use the result to query keyspace tables(separate steps), when I run some load tests(about 5 requests per second) on my flink job, I can see about half of the requests failed due to timeout, but some succeeded, and when I rerun it the second time, all the http requests failed due to
    timeout
    even for the keyspaces look up, which had not happened before. It almost looks like the thread pool is exhausted or the cluster blocked any outbound traffic. The external endpoint should be able to handle at least some traffic, any insights? Thanks!
  • a

    Abhilash S.R

    03/01/2023, 2:49 AM
    Hello Flink community , i am getting this exception while using the flink operator 1.4 with Zookeeper HA and job goes down . If the HA is disabled then am not seeing the exception and job is running fine . could you please suggest what might be wrong here
    Copy code
    org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(af00754d9d0dafaccdb1770d2e3f480e, LocalRpcInvocation(requestResourceOverview(Time))) because the fencing token af00754d9d0dafaccdb1770d2e3f480e did not match the expected fencing token 8b714cf5eb5af40851ea06f1df8d4124.
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    k
    r
    • 3
    • 4
  • c

    chunilal kukreja

    03/01/2023, 6:50 AM
    Hi Team, [Urgent]: With Flink 1.16.0 version in application mode, I am getting below exception too oftenly;
    Copy code
    2023-03-01 06:44:12,773 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 6
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    We have deployed the flink cluster in our k8s using helm charts & on after every helm delete followed by helm install we get this error. What should be done pls suggest?
  • a

    Ari Huttunen

    03/01/2023, 7:53 AM
    I created https://issues.apache.org/jira/browse/FLINK-31274
    d
    • 2
    • 7
  • r

    Ravi Teja

    03/01/2023, 12:05 PM
    Hello all, We are trying to Unnest and join the array which is parsed from a string column (json). But we are not able to cast the varchar into an array by using either CAST or JSON_VALUE with Returning functions. Example:
    Copy code
    --- "{production_reviews: [{"content": "some_content", "rating": 2},{"content": "some_content", "rating": 4}]}"
    
    select product_reviews from product_table;
    Please suggest any way where we can join this array and run aggregate on the rating field.
    m
    • 2
    • 2
  • s

    sharad mishra

    03/01/2023, 12:34 PM
    Hello, I’m using Flink DataStream(1.16) API to read from inbound Kafka topic and writing to another outbound Kafka Topic. I’ve enabled
    exactly once
    deliveree guarantee in my program. I got following exception in my application, along with duplicate data in the Kafka sink. I’m curious to know what might have caused this exception along with duplicate data although deliveree guarantee is set to
    exactlyonce
    ?
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka dcn_audit_log_events_unique_stg-6@-1 with FlinkKafkaInternalProducer{transactionalId='unique-6-1222', inTransaction=true, closed=false} 
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
    Here is the snapshot of FlinkKafkaConsumer properties from taskmanager logs:
    Copy code
    acks = -1
    	batch.size = 16384
    	bootstrap.servers = [10.7.111.32:9092]
    	buffer.memory = 33554432
    	client.dns.lookup = use_all_dns_ips
    	client.id = producer-duplicate-13-402
    	compression.type = none
    	<http://connections.max.idle.ms|connections.max.idle.ms> = 540000
    	<http://delivery.timeout.ms|delivery.timeout.ms> = 120000
    	enable.idempotence = true
    	interceptor.classes = []
    	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    	<http://linger.ms|linger.ms> = 0
    	<http://max.block.ms|max.block.ms> = 60000
    	max.in.flight.requests.per.connection = 5
    	max.request.size = 1048576
    	<http://metadata.max.age.ms|metadata.max.age.ms> = 300000
    	<http://metadata.max.idle.ms|metadata.max.idle.ms> = 300000
    	metric.reporters = []
    	metrics.num.samples = 2
    	metrics.recording.level = INFO
    	<http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
    	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    	receive.buffer.bytes = 32768
    	<http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
    	<http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
    	<http://request.timeout.ms|request.timeout.ms> = 30000
    	retries = 2147483647
    	<http://retry.backoff.ms|retry.backoff.ms> = 100
    	sasl.client.callback.handler.class = null
    	sasl.jaas.config = null
    	sasl.kerberos.kinit.cmd = /usr/bin/kinit
    	sasl.kerberos.min.time.before.relogin = 60000
    	sasl.kerberos.service.name = null
    	sasl.kerberos.ticket.renew.jitter = 0.05
    	sasl.kerberos.ticket.renew.window.factor = 0.8
    	sasl.login.callback.handler.class = null
    	sasl.login.class = null
    	<http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
    	<http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
    	sasl.login.refresh.buffer.seconds = 300
    	sasl.login.refresh.min.period.seconds = 60
    	sasl.login.refresh.window.factor = 0.8
    	sasl.login.refresh.window.jitter = 0.05
    	<http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
    	<http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
    	sasl.mechanism = GSSAPI
    	sasl.oauthbearer.clock.skew.seconds = 30
    	sasl.oauthbearer.expected.audience = null
    	sasl.oauthbearer.expected.issuer = null
    	<http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
    	<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
    	<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
    	sasl.oauthbearer.jwks.endpoint.url = null
    	sasl.oauthbearer.scope.claim.name = scope
    	sasl.oauthbearer.sub.claim.name = sub
    	sasl.oauthbearer.token.endpoint.url = null
    	security.protocol = PLAINTEXT
    	security.providers = null
    	send.buffer.bytes = 131072
    	<http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
    	<http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
    	ssl.cipher.suites = null
    	ssl.enabled.protocols = [TLSv1.2]
    	ssl.endpoint.identification.algorithm = https
    	ssl.engine.factory.class = null
    	ssl.key.password = null
    	ssl.keymanager.algorithm = SunX509
    	ssl.keystore.certificate.chain = null
    	ssl.keystore.key = null
    	ssl.keystore.location = null
    	ssl.keystore.password = null
    	ssl.keystore.type = JKS
    	ssl.protocol = TLSv1.2
    	ssl.provider = null
    	ssl.secure.random.implementation = null
    	ssl.trustmanager.algorithm = PKIX
    	ssl.truststore.certificates = null
    	ssl.truststore.location = null
    	ssl.truststore.password = null
    	ssl.truststore.type = JKS
    	<http://transaction.timeout.ms|transaction.timeout.ms> = 600000
    	transactional.id = duplicate-13-402
    	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    also FlinkKafkaProducer properties snapshot:
    Copy code
    org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    	acks = -1
    	batch.size = 16384
    	bootstrap.servers = [10.7.111.32:9092]
    	buffer.memory = 33554432
    	client.dns.lookup = use_all_dns_ips
    	client.id = producer-duplicate-4-402
    	compression.type = none
    	<http://connections.max.idle.ms|connections.max.idle.ms> = 540000
    	<http://delivery.timeout.ms|delivery.timeout.ms> = 120000
    	enable.idempotence = true
    	interceptor.classes = []
    	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    	<http://linger.ms|linger.ms> = 0
    	<http://max.block.ms|max.block.ms> = 60000
    	max.in.flight.requests.per.connection = 5
    	max.request.size = 1048576
    	<http://metadata.max.age.ms|metadata.max.age.ms> = 300000
    	<http://metadata.max.idle.ms|metadata.max.idle.ms> = 300000
    	metric.reporters = []
    	metrics.num.samples = 2
    	metrics.recording.level = INFO
    	<http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
    	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    	receive.buffer.bytes = 32768
    	<http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
    	<http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
    	<http://request.timeout.ms|request.timeout.ms> = 30000
    	retries = 2147483647
    	<http://retry.backoff.ms|retry.backoff.ms> = 100
    	sasl.client.callback.handler.class = null
    	sasl.jaas.config = null
    	sasl.kerberos.kinit.cmd = /usr/bin/kinit
    	sasl.kerberos.min.time.before.relogin = 60000
    	sasl.kerberos.service.name = null
    	sasl.kerberos.ticket.renew.jitter = 0.05
    	sasl.kerberos.ticket.renew.window.factor = 0.8
    	sasl.login.callback.handler.class = null
    	sasl.login.class = null
    	<http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
    	<http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
    	sasl.login.refresh.buffer.seconds = 300
    	sasl.login.refresh.min.period.seconds = 60
    	sasl.login.refresh.window.factor = 0.8
    	sasl.login.refresh.window.jitter = 0.05
    	<http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
    	<http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
    	sasl.mechanism = GSSAPI
    	sasl.oauthbearer.clock.skew.seconds = 30
    	sasl.oauthbearer.expected.audience = null
    	sasl.oauthbearer.expected.issuer = null
    	<http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
    	<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
    	<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
    	sasl.oauthbearer.jwks.endpoint.url = null
    	sasl.oauthbearer.scope.claim.name = scope
    	sasl.oauthbearer.sub.claim.name = sub
    	sasl.oauthbearer.token.endpoint.url = null
    	security.protocol = PLAINTEXT
    	security.providers = null
    	send.buffer.bytes = 131072
    	<http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
    	<http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
    	ssl.cipher.suites = null
    	ssl.enabled.protocols = [TLSv1.2]
    	ssl.endpoint.identification.algorithm = https
    	ssl.engine.factory.class = null
    	ssl.key.password = null
    	ssl.keymanager.algorithm = SunX509
    	ssl.keystore.certificate.chain = null
    	ssl.keystore.key = null
    	ssl.keystore.location = null
    	ssl.keystore.password = null
    	ssl.keystore.type = JKS
    	ssl.protocol = TLSv1.2
    	ssl.provider = null
    	ssl.secure.random.implementation = null
    	ssl.trustmanager.algorithm = PKIX
    	ssl.truststore.certificates = null
    	ssl.truststore.location = null
    	ssl.truststore.password = null
    	ssl.truststore.type = JKS
    	<http://transaction.timeout.ms|transaction.timeout.ms> = 600000
    	transactional.id = duplicate-4-402
    	value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
  • r

    Raefaldhi Amartya Junior

    03/01/2023, 2:07 PM
    I'm trying to use flink on my local i got
    Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
    Any idea?
    d
    • 2
    • 2
  • s

    Siddhesh Kalgaonkar

    03/01/2023, 2:55 PM
    Hi #C03G7LJTS2G I had a doubt about Flink CEP. I was going through this link and just had one question for now. Can we use SQL by converting the datastream into Table API and perform the same thing for which Flink CEP is designed for? Is there any limitation? I didn't see that actually but just wanted to related the two. Also, as per the documentation is this thing not designed for Python API or just missing from the documentation?
    m
    • 2
    • 4
  • h

    Hunter Medney

    03/01/2023, 6:03 PM
    Need some help understanding how to approach reprocessing historical data. Suppose we have S3 buckets A, B, and C with time series data. All 3 buckets are in Parquet format, partitioned by day, vary in volume, and have a common primary key and event time. Goal Execute a job that joins data from each bucket by the primary key such that at a given moment (millisecond precision), the job sees only the latest data from each bucket based on event time (watermark). In essence, the job needs to time travel as if it only had the latest data from A, B, and C for each past moment. This would seem possible in Flink but here's where I'm stuck: • According to the documentation, "When using a directory as the source path, there is no defined order of ingestion for the files inside the directory." which suggests events won't be processed in order in a partition dir. For example, "part-13.parquet" could be read before "part-5.parquet" in dir "dt=2022-10-01". This would seem to prevent a job from processing a bulk format stream in event time order. • Will the different volumes of the S3 data create an issue? Suppose bucket A had 4x more data than bucket B - will the job still be able to look at the latest rows from A, B, and C for a given moment when bucket B is processed much quicker than A?
    s
    t
    • 3
    • 5
  • y

    Yaroslav Bezruchenko

    03/01/2023, 6:03 PM
    Hello everyone, will repeat my question: Can please someone suggest how to configure Flink 1.15 configuration, so it won't restart taskmanagers on each exception? Thing is that we have an Flink 1.14 app, which is not restarting taskmanagers on exceptions, but continues using same taskmanagers. Config of these seems identical, but I'm probably missing something
  • c

    chunilal kukreja

    03/01/2023, 7:21 PM
    Hi Team, Any recommendations/suggestions, how to trigger savepoints in production system? I am using flink 1.16.0 version with native kubernetes deployment. And would like to know standard approach or one that is mostly used by folks in production to take savepoints & restore a job with the same. Thanks in advance !
  • h

    Herat Acharya

    03/01/2023, 7:35 PM
    Hello, Is there a way to run multiple async data stream operations within a single job and then combine them later .. any resources or blog posts will be helpful .. Basically i want to do something like this
    Copy code
    Master Async Operation -> Based data of master async operation conditionally run one or more Async operations -> Combine these async operations and spit out into single sink
    • 1
    • 1
  • t

    Tony Wang

    03/01/2023, 9:27 PM
    Is there a way for Flink watermarks to advance per record when reading from Kafka instead of at a fixed period? I was under the assumption of SET 'pipeline.auto-watermark-interval' = '0ms'; would do it, but instead it just generated NULL watermarks for everything. Seems like it is related to this issue here: https://issues.apache.org/jira/browse/FLINK-21931.
  • p

    Prithvi Dammalapati

    03/01/2023, 9:39 PM
    I'm migrating a Flink workload from EMR to K8s and wondering if K8s has a resource allocation concept similar to Dominant Resource Fairness (DRF) that is available in other resource management solutions like Yarn and Mesos? We explicitly specify the requests and limits in K8s manifests but that is limiting the performance of TaskManagers with heavy operators
  • s

    Sergio Sainz

    03/01/2023, 11:40 PM
    Hello 👋 We would like to use fine-grained resource management feature at the TableAPI level. The documentation mentions fine-grained resource management is only supported at the DataStreamAPI level. We are experimenting with the TableAPI-DataStreamAPI bridge feature but we are seeing an extra slot being required when we convert from TableAPI to DataStreamAPI. Do you have some ideas for this?
    Copy code
    Table res = getTableEnvironment().sqlQuery(query);
                    StreamTableEnvironment stEnv = (StreamTableEnvironment) getTableEnvironment();
    
                    DataStream dataStream = stEnv.toDataStream(res);
                    dataStream.getTransformation().setSlotSharingGroup(ssgA);
    
                    dataStream.getExecutionEnvironment().registerSlotSharingGroup(ssgA);
    
                    res = stEnv.fromDataStream(dataStream);
    w
    • 2
    • 3
  • m

    Mohit Aggarwal

    03/02/2023, 5:51 AM
    Hi I am using using Flink K8s operator 1.4 with Flink 1.17. I am using upgradeMode: last-state with checkpointing enabled which are written to GCS bucket. What I am observing is when the job autoscales the job is not restoring from the last checkpoint before the autoscaling. Even the config-map to which the checkpoints get stored is recreated after the job autoscales. Has anyone faced a similar issue ?
    g
    • 2
    • 4
1...606162...98Latest