https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • u

    饶俊

    12/24/2022, 3:22 PM
    DataStream API how custome cumuldate window assigner?
  • s

    Sudhan Madhavan

    12/25/2022, 7:18 AM
    Hi all, Hope all of you guys are doing good ! Right now, I am facing some difficulty on scraping JM and TM metrics by using a prometheus reporter. To collect a metrics, I have exposed a metric reporter port on both JM and TM. But since the flink kubernetes operator creates a service object only for a JM, prometheus is unable to scarpe a metrics from TM pods using a service monitor object. Can you suggest me any way apart from creating a separate service object for TM to scrape its metric? Note: Podmonitor is restricted in my ecosystem. Flink version: 1.14.2
    g
    • 2
    • 4
  • a

    Ashutosh Joshi

    12/26/2022, 11:19 AM
    Hi all, I am trying to apply some custom transformation on DataStream<Row> inside a map function. But to use that transformation first I need to know the name and type of a field which is wrapped inside row. Below is the information I am getting through datastream.getType() but how to extract the row field information like name and type.
    Copy code
    ROW<`topic` STRING, `partition` INT, `offset` BIGINT, `kafka_timestamp` TIMESTAMP(3), `source_name` STRING, `source_id` INT, `event_type` STRING, `version` INT, `source_timestamp` BIGINT, `event_timestamp` TIMESTAMP(3), `payload` ARRAY<MAP<STRING, STRING>>> NOT NULL(org.apache.flink.types.Row, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
    Can anyone please help ?
    s
    • 2
    • 3
  • u

    饶俊

    12/24/2022, 3:19 PM
    flink1.13 similar functions can be realized?
    m
    • 2
    • 1
  • n

    Nithin Kumar Vokkarla

    12/26/2022, 2:43 PM
    hi all, I am trying to deploy a flink-app using flinkdeployment but the jobmanager is becoming missing after some time and logs of the operator are showing showing this. Can any one help me solving this issue
    Copy code
    2022-12-26 14:32:49,188 i.j.o.p.e.ReconciliationDispatcher [DEBUG][flink-operator/<application-name>] Reconciling resource <application-name> with version: 154257956 with execution scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}	
    2022-12-26 14:32:49,188 i.j.o.p.e.ReconciliationDispatcher [DEBUG][flink-operator/<application-name>] Handling dispatch for resource <application-name>
    2022-12-26 14:32:49,188 i.j.o.p.e.EventProcessor       [DEBUG][flink-operator/<application-name>] Executing events for custom resource. Scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}
    Show context
    2022-12-26 14:32:49,187 i.j.o.p.e.s.i.ManagedInformerEventSource [DEBUG][flink-operator/<application-name>] Resource not found in temporal cache reading it from informer cache, for Resource ID: ResourceID{name='<application-name>', namespace='flink-operator'}	
    2022-12-26 14:32:49,187 i.j.o.p.e.EventProcessor       [DEBUG][flink-operator/<application-name>] Marking event received for: ResourceID{name='<application-name>', namespace='flink-operator'}
    2022-12-26 14:32:49,187 i.j.o.p.e.EventProcessor       [DEBUG] Received event: Event{relatedCustomResource=ResourceID{name='<application-name>', namespace='flink-operator'}}
    2022-12-26 14:32:49,187 i.j.o.p.e.s.t.TimerEventSource [DEBUG] Producing event for custom resource id: ResourceID{name='<application-name>', namespace='flink-operator'}
    2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor       [DEBUG][flink-operator/<application-name>] ReScheduling event for resource: ResourceID{name='<application-name>', namespace='flink-operator'} with delay: 15000
    2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor       [DEBUG][flink-operator/<application-name>] Cleanup for successful execution for resource: <application-name>
    2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor       [DEBUG][flink-operator/<application-name>] Event processing finished. Scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}, PostExecutionControl: PostExecutionControl{onlyFinalizerHandled=false, updatedCustomResource=null, runtimeException=null}
    2022-12-26 14:32:34,187 o.a.f.k.o.u.StatusRecorder     [DEBUG][flink-operator/<application-name>] No status change.
    2022-12-26 14:32:34,187 o.a.f.k.o.c.FlinkDeploymentController [INFO ][flink-operator/<application-name>] End of reconciliation
    2022-12-26 14:32:34,187 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink-operator/<application-name>] Resource fully reconciled, nothing to do...
    2022-12-26 14:32:34,187 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [WARN ][flink-operator/<application-name>] Could not recover lost deployment without HA enabled	
    2022-12-26 14:32:34,186 o.a.f.k.o.u.StatusRecorder     [DEBUG][flink-operator/<application-name>] No status change.
    2022-12-26 14:32:34,186 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink-operator/<application-name>] Observing JobManager deployment. Previous status: MISSING
    2022-12-26 14:32:34,186 o.a.f.k.o.c.FlinkDeploymentController [INFO ][flink-operator/<application-name>] Starting reconciliation
    g
    • 2
    • 4
  • s

    Slackbot

    12/27/2022, 4:07 AM
    This message was deleted.
    d
    • 2
    • 1
  • s

    Shira Bodenstein

    12/27/2022, 6:34 AM
    Hi everyone, I have a streaming application that has Elasticsearch sink. I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help me. In the previous version I created ElasticsearchSink and added a failure handler, which protected the sink to not fail on some exceptions.
    Copy code
    final ActionRequestFailureHandler failureHandler = (action, failure, restStatusCode, indexer) -> {
                if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
                    indexer.add(action);
                } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
                    log.warn("Got malformed document , action {}", action);
                    // malformed document; simply drop elasticsearchSinkFunction without failing sink
                } else if (failure instanceof IOException && failure.getCause() instanceof NullPointerException && failure.getMessage().contains("Unable to parse response body")) {
                    //issue with ES 7 and opensearch - that does not send type - while response is waiting for it
                    //at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127) -- this.type = Objects.requireNonNull(type);
                    log.debug("known issue format the response for ES 7.5.1 and DB OS (opensearch) :{}", failure.getMessage());
                } else {
                    // for all other failures, log and don't fail the sink
                    log.error("Got error while trying to perform ES action {}", action, failure);
                }
            };
    		
    
     final ElasticsearchSink.Builder<T> builder = new ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
    In the new version the class ActionRequestFailureHandler is deprecated and after investigation I can't find any way to handle failures. For all failures the sink fails. Is there anything I didn't see? Thanks is advance!
    m
    • 2
    • 4
  • s

    Suriya Krishna Mariappan

    12/27/2022, 3:49 PM
    We have been trying out the kubernetes operator for the past week. Everything was working fine until we enabled metrics pushing using statsd metrics reporter. We use beam to write the pipeline and use the flink runner. Now, once we enabled the metrics, the pods seem to be restarting because there is an exception that is occurring that says the following class cannot be found.
    Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.core.metrics.ShortIdMap
    But the dependency for this is packaged as part of the job fat JAR. And yet we get this error saying class was not found. • So, is flink not able to find the classes from the fat JAR? • Should we put the dependency in a path where flink can find this? If so , how to make sure some other dependency is not missed like this? this is the full stack trace. Any help on this would be greatly appreciated.
    Copy code
    2022-12-27 15:16:59,450 WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry    [] - Failed to serialize accumulators for task.
    java.lang.NoClassDefFoundError: org/apache/beam/runners/core/metrics/ShortIdMap
    	at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
    	at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
    	at java.lang.Class.getDeclaredMethod(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass.getPrivateMethod(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass$2.run(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass$2.run(Unknown Source) ~[?:?]
    	at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    	at java.io.ObjectStreamClass.<init>(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass$Caches$1.computeValue(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass$Caches$1.computeValue(Unknown Source) ~[?:?]
    	at java.io.ClassCache$1.computeValue(Unknown Source) ~[?:?]
    	at java.io.ClassCache$1.computeValue(Unknown Source) ~[?:?]
    	at java.lang.ClassValue.getFromHashMap(Unknown Source) ~[?:?]
    	at java.lang.ClassValue.getFromBackup(Unknown Source) ~[?:?]
    	at java.lang.ClassValue.get(Unknown Source) ~[?:?]
    	at java.io.ClassCache.get(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass.lookup(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:?]
    	at java.util.concurrent.ConcurrentHashMap.writeObject(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.GeneratedMethodAccessor93.invoke(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    	at java.io.ObjectStreamClass.invokeWriteObject(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
    	at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:?]
    	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:51) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:54) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1883) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:181) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:2289) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.6.jar:1.13.6]
    Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.core.metrics.ShortIdMap
    	at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
    	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    	... 62 more
    @Arun
  • l

    Liubov

    12/27/2022, 5:09 PM
    Hello all. Could you please help me understand if ProtobufSerializer is able to solve class loading issue that happens when ListState in Flink is used with TTL (this one) or it also uses kryo and the only way to solve the issue is to make classes POJOs? Also I think I saw an option of implementing fixed length custom serializer, is there any good example of that ?
  • m

    Mehul Batra

    12/28/2022, 11:10 AM
    I see the cookbook recipe is inaccessible, can anyone please help me with the same. https://github.com/immerok/recipes/tree/main/enrichment-join-with-buffering
    j
    • 2
    • 3
  • a

    Ari Huttunen

    12/28/2022, 1:34 PM
    How should I actually develop some pyflink SQL code? As far as I know, everything is working fine. I have a stream coming from Kafka, I have table(s) defined in pyflink code, I can get output to the console when I run the pyflink code in ptpython, like this
    Copy code
    table_env.execute_sql("""
        CREATE TABLE print_aggregated_data (
            EVENT_DAY STRING,
            EVENT_HOUR STRING,
            ..other fields..
        ) WITH (
            'connector' = 'print'
        )
    """)
    
    table_env.execute_sql("""
        INSERT INTO print_aggregated_data
            SELECT 
                *
            FROM aggregated_data
            LIMIT 10
    """).wait(10000) # 10 seconds = 10000 milliseconds
    If I copy&paste all the code to the ptptython console, it displays 10 lines of selected data, and in 10 seconds it fails due to
    java.util.concurrent.TimeoutException
    . This works, but feels a bit awkward, so maybe I'm missing something. I'd like a bit more interactive way of seeing the data that it is gathering.
  • a

    Ashutosh Joshi

    12/28/2022, 3:34 PM
    Hi Everyone. I am trying to parse a nested field in a row of a data stream through RichMapFunction<Row, Row>. The input and output of this is Row type. This nested column in a row can have any number of fields.
    Copy code
    DataStream<Row> outStream =  stream.map(new ParsePayload(functionMap, typeInformation))
            .uid("ParseNestedColumn");
    
    private static class ParsePayload extends RichMapFunction<Row, Row> implements Serializable
    {
    @Override
    public Row map(Row row) throws Exception {
    	<business logic>
    	…….
    	return resultRow;
    }
    }
    The issue is that, I can return type information of a row only after evaluating map function or by creating output row because of fields are not fixed in nested column. I have tried both Types.ROW_NAMED() and ResultTypeQueryable interface, but both checks type information before evaluating map function and this way I can’t supply type information to the stream. Can anyone please help by giving solution through some example ? P.S - I do not want to enable Generic Types for my job.
  • g

    Gerald Schmidt

    12/28/2022, 4:24 PM
    I have a Kafka question (Flink on Kubernetes + MSK). I have installed
    kafka-clients-2.6.0.jar
    ,
    flink-sql-connector-kafka-1.16.0.jar
    and
    aws-msk-iam-auth-1.1.5-all.jar
    in /opt/flink/lib. I have also set up and tested IAM access for the service account. The creation of table
    mytable
    from a topic using
    sql-client.sh
    succeeds, but then
    SELECT * FROM mytable;
    fails after ~2m with:
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
    Am I missing
    client.properties
    ? I'd like to specify:
    Copy code
    security.protocol=SASL_SSL
    sasl.mechanism=AWS_MSK_IAM
    sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    If that's a sensible next step, where on my jobmanager pod should the file go?
    s
    а
    • 3
    • 4
  • j

    Jonathan Weaver

    12/29/2022, 1:22 AM
    I've included the HADOOP dependencies + Apache Iceberg into the docker containers of our flink cluster.. But I am having issues with the flink-dropwizard-metrics module because amongst those additional dependencies com.codehale.metrics is included which results in the following error
    Copy code
    java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(com.codahale.metrics.Histogram)' the class loader org.apache.flink.util.ChildFirstClassLoader @61697817 of the current class, com/maxar/di/fids/catalog/fids_sink/FidsSinkWriter, and the class loader 'app' for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature (com.maxar.di.fids.catalog.fids_sink.FidsSinkWriter is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @61697817, parent loader 'app'; org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper is in unnamed module of loader 'app')
    Which forces me to set the dropwizard metrics to
    provided
    on the POM, but that prevents me from running the jar, say with the sql-client locally without the hadoop dependencies.. Anyone have any pointers on resolving this conflict?
    g
    • 2
    • 1
  • d

    Doğaç Eldenk

    12/29/2022, 5:57 PM
    I am trying to move a SQL database to DynamoDB but instead of directly dumping data to dynamo, I would like to use APIs provided by a microservice to backfill data. The data (transaction logs) come from a Kinesis stream. I am trying to forward that data to
    gRPC
    endpoints. We need to be able to write thousands of requests per second. I am trying to use
    streaming rpc
    to forward data to the service but I have no luck. (Using Kotlin) 1. When I use
    RichSinkFunction
    the endpoints timeout unless I spin up a co-routine that ends each stream after 5-10 seconds. 2. When I use
    ProcessingWindow
    functions, the issue is a processing window processes thousands of requests at once and I am not sure how to retry if some requests fail without retrying the whole window (which can create an absurd amount of spike on service and thus making it fail again). 3.
    Flow
    in kotlin is tricky to work, because rpc streams are bidirectional, responses come async and very hard to capture. Any suggestions on how I can use bidirectional streams to sink data with
    at least once
    guarantee.
    m
    • 2
    • 11
  • l

    Lee xu

    12/30/2022, 3:33 AM
    Hello: The zip file in the python.files parameter does not have the permission to execute. A permission error will be reported during program execution
    d
    • 2
    • 8
  • l

    Lucas Alcântara Freire

    12/30/2022, 10:23 AM
    Hello All, I am new in Flink and I built a Job that has kinesis as the source and should do real-time event consumption. Meaning when we have an event on the job we should process it and save it without waiting. Currently, the setup polls 100 events from kinesis and processes them which brought me to an issue. The idea of the job is to count the number of orders a customer has placed therefore if we have an order A for customer X we would save in our DB the order count as 1 and in case another order is placed for customer X we increment with two. To know the previous order count number we fetch the customer from our DB before doing the count in one of the Flink steps but this brings a problem. As we poll kinesis events by batch it can happen that we have Order A and B for the customer X in the batch and when we lookup the customer order count it would show zero for instance, and when we process the batch events it will store the order count as 1 as orders A and B are currently in the pipeline. Any idea how I can solve this?
  • j

    Jirawech Siwawut

    12/30/2022, 10:40 AM
    Hi All, I have quick question. Is it possible to set parallelism on specific operator while using Flink SQL. I would like to set parallelism for Compact Operator to some fixed number because my job will fail if i restore Flink job from savepoint while i also change the parallelism of the job.
    m
    • 2
    • 1
  • e

    Eric Laguer

    12/30/2022, 4:03 PM
    kubernetes
  • e

    Eric Laguer

    12/30/2022, 7:17 PM
    Hi, I'm trying to configure logback as the logging system for Flink. I'm using the Flink Kubernetes Operator [tag: release-1.1] and for some reason if I configure the CRD FlinkDeployment with only logback it doesn't emit logs, see below for snippet. If exec into the pod I see the following pid with arguments, I also see that the config-map mounted correctly onto /opt/flink/conf/logback-console.xml
    Copy code
    flink          1       0 74 19:21 ?        00:00:13 /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/flink--kubernetes-taskmanager-0-basic-session-deployment-only-example-taskmanager-1-1.log -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-cep-1.15.3.jar:/opt/flink/lib/flink-connector-files-1.15.3.jar:/opt/flink/lib/flink-csv-1.15.3.jar:/opt/flink/lib/flink-json-1.15.3.jar:/opt/flink/lib/flink-scala_2.12-1.15.3.jar:/opt/flink/lib/flink-shaded-zookeeper-3.5.9.jar:/opt/flink/lib/flink-table-api-java-uber-1.15.3.jar:/opt/flink/lib/flink-table-planner-loader-1.15.3.jar:/opt/flink/lib/flink-table-runtime-1.15.3.jar:/opt/flink/lib/log4j-1.2-api-2.17.1.jar:/opt/flink/lib/log4j-api-2.17.1.jar:/opt/flink/lib/log4j-core-2.17.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.17.1.jar:/opt/flink/lib/flink-dist-1.15.3.jar::: org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner --configDir /opt/flink/conf -Djobmanager.memory.jvm-overhead.min=214748368b -Dtaskmanager.resource-id=basic-session-deployment-only-example-taskmanager-1-1 -Djobmanager.memory.off-heap.size=134217728b -Dweb.tmpdir=/tmp/flink-web-98d0f80f-0660-47b6-ba8a-466e9bde4723 -Djobmanager.memory.jvm-metaspace.size=268435456b -Djobmanager.memory.heap.size=1530082096b -Djobmanager.memory.jvm-overhead.max=214748368b -D taskmanager.memory.network.min=166429984b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=214748368b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=166429984b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=665719939b -D taskmanager.memory.task.heap.size=563714445b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=214748368b
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
    .......
      logConfiguration:
        "logback-console.xml": |
          <configuration>
            <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
            </encoder>
            </appender>
    
            <appender name="rolling" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${log.file}</file>
            <append>false</append>
    
            <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>${log.file}.%i</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>10</maxIndex>
            </rollingPolicy>
    
            <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>100MB</maxFileSize>
            </triggeringPolicy>
    
            <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
            </encoder>
            </appender>
    
            <!-- This affects logging for both user code and Flink -->
            <root level="INFO">
            <appender-ref ref="console"/>
            <appender-ref ref="rolling"/>
            </root>
    
            <!-- Uncomment this if you want to only change Flink's logging -->
            <!--<logger name="org.apache.flink" level="INFO"/>-->
    
            <!-- The following lines keep the log level of common libraries/connectors on
            log level INFO. The root logger does not override this. You have to manually
            change the log levels here. -->
            <logger name="akka" level="INFO"/>
            <logger name="org.apache.kafka" level="INFO"/>
            <logger name="org.apache.hadoop" level="INFO"/>
            <logger name="org.apache.zookeeper" level="INFO"/>
    
            <!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
            <logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"/>
          </configuration>
    g
    • 2
    • 9
  • j

    Jonathan Weaver

    12/30/2022, 8:07 PM
    If I have a lookup join condition that is easily short-circuited evaluated as false, why does Flink still do the lookup query?
    Copy code
    LEFT OUTER JOIN sos FOR SYSTEM_TIME AS OF cdc.proc_time soss ON ios.demand_identifier IS NOT NULL AND ios.order_routing LIKE 'StandingOrder' AND soss.ofs_identifier = ios.demand_identifier
    No matter what value
    order_routing
    is set too it still is performing the lookup
  • u

    申凯(lordk)

    12/31/2022, 12:23 AM
    I want to use WindowDeduplicate to deduplicate records from a stream table:
    Copy code
    TableResult result = tEnv.executeSql("select * from " +
        "(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
            "PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
            ") as row_num from " +
        "TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +
         ") where row_num <= 1");
    
    result.print();
    but no result print out to the console. when I try :
    Copy code
    TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
    result.print();
    I can see the result print out to the console after each time the checkpoint completed. I don’t what mistake I’ve made.
  • r

    Rafael Jeon

    12/31/2022, 12:06 PM
    I'm trying to syncro
  • r

    Rafael Jeon

    12/31/2022, 12:08 PM
    Hi, I am using Flink CDC Connector to synchronize data from MySQL to StarRocks. When I use the SQL client and stop and resume in the middle, it seems that it reads the data from the source table from the beginning. This is also the case when I put a WHERE clause in the INSERT INTO SELECT clause. Is it necessary to have additional settings in pipeline mode to synchronize the data from the middle? (1.15.2 Flink Version)
  • g

    Gaurav Miglani

    01/02/2023, 4:10 AM
    We are running flink 1.16 on k8 using flink k8 operator, the documentation mention about
    numRestarts
    metrics, but I'm unable to find out one in jm url, am i missing some configuration to enable it, previously on flink 1.13 it was present 🤔
    s
    • 2
    • 4
  • a

    Ari Huttunen

    01/02/2023, 11:06 AM
    What could be the problem here? The example is from the docs. This is a cluster of three instances
    Copy code
    ./sql-client.sh 
    
                                       ▒▓██▓██▒
                                   ▓████▒▒█▓▒▓███▓▒
                                ▓███▓░░        ▒▒▒▓██▒  ▒
                              ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                              ██▒         ░▒▓███▒    ▒█▒█▒
                                ░▓█            ███   ▓░▒██
                                  ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                                █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                                ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                             ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                       ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                      ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                    ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
                   ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
                  ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
               ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
               ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
               ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
               ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
              ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
              █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
              ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
              ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
               ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
               ▓█   ▒█▓   ░     █░                ▒█              █▓
                █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
                 █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
                  ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
                   ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                    ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                      ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                          ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
              
        ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
       |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
       | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
       |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
       | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
       |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
              
            Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
    
    Command history file path: /home/user/.flink-sql-history
    
    Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    [INFO] Session property has been set.
    
    Flink SQL> SET 'execution.runtime-mode' = 'batch';
    [INFO] Session property has been set.
    
    Flink SQL> SELECT
    >   name,
    >   COUNT(*) AS cnt
    > FROM
    >   (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
    > GROUP BY name;
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink_1.16_rc1/lib/flink-dist-1.16.0.jar) to field java.lang.Class.ANNOTATION
    WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    [ERROR] Could not execute SQL statement. Reason:
    java.net.ConnectException: Connection refused
    
    Flink SQL>
    g
    m
    • 3
    • 15
  • s

    Sami Badawi

    01/02/2023, 6:24 PM
    My PyFlink JDBC call:
    Copy code
    def setup_catalog(t_env: TableEnvironment) -> None:
        jar_filenames = ['flink-connector-jdbc-1.16.0.jar', 'postgresql-9.4.1212.jar']
        jar_path = make_jar_paths(jar_filenames)
    
        print("JAR PATH: ", jar_path)
        t_env.get_config().set("pipeline.jars", jar_path)
    
        name = "public"
        default_database = "postgres"
        username = "postgres"
        password = "postgres"
        base_url = "localhost"
    
        print(f"{name}, {default_database}, {username}, {password}, {base_url}")
        catalog = JdbcCatalog(name, default_database, username, password, base_url)
        t_env.register_catalog("my_catalog", catalog)
        t_env.use_catalog("my_catalog")
    generate this error:
    Copy code
    python pystreaming/event_files_jdbc.py
    Running event_files_jdbc.py
    JAR PATH:  file:///Users/sami/code/pystreaming/jars/flink-connector-jdbc-1.16.0.jar;file:///Users/sami/code/pystreaming/jars/postgresql-9.4.1212.jar
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker (file:/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/opt/flink-python-1.16.0.jar) to method java.net.URLClassLoader.addURL(java.net.URL)
    WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    public, postgres, postgres, postgres, localhost
    Traceback (most recent call last):
      File "/Users/sami/code/pystreaming/pystreaming/event_files_jdbc.py", line 96, in <module>
        setup_catalog(t_env)
      File "/Users/sami/code/pystreaming/pystreaming/event_files_jdbc.py", line 37, in setup_catalog
        catalog = JdbcCatalog(name, default_database, username, password, base_url)
      File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/table/catalog.py", line 1192, in __init__
        j_jdbc_catalog = gateway.jvm.org.apache.flink.connector.jdbc.catalog.JdbcCatalog(
      File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/java_gateway.py", line 1585, in __call__
        return_value = get_return_value(
      File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
        return f(*a, **kw)
      File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.connector.jdbc.catalog.JdbcCatalog.
    : java.lang.IllegalArgumentException
    	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
    	at org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils.validateJdbcUrl(JdbcCatalogUtils.java:37)
    	at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.<init>(AbstractJdbcCatalog.java:110)
    	at org.apache.flink.connector.jdbc.catalog.JdbcCatalog.<init>(JdbcCatalog.java:76)
    	at org.apache.flink.connector.jdbc.catalog.JdbcCatalog.<init>(JdbcCatalog.java:50)
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
    	at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    	at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    d
    • 2
    • 2
  • l

    Lauri Suurväli

    01/02/2023, 6:49 PM
    When submitting a Flink job from the CLI in application mode (e.g. “flink run-application …“), the command will return after the job has been submitted. This behaves similarly to having the --detached flag when submitting the job in session mode (e.g. “flink run --detached”). Is it somehow possible to submit the job in application mode, but have the command wait for the job to finish before it returns? (This would mimic the behaviour of submitting the job in session mode without the detached flag.) I am using YARN for deployment if this makes any difference.
    ✅ 1
    m
    • 2
    • 1
  • u

    申凯(lordk)

    01/03/2023, 1:11 AM
    I want to use WindowDeduplicate to deduplicate records from a stream table:
    Copy code
    TableResult result = tEnv.executeSql("select * from " +
        "(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
            "PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
            ") as row_num from " +
        "TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +
         ") where row_num <= 1");
    
    result.print();
    but no result print out to the console. when I try :
    Copy code
    TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
    result.print();
    I can see the result print out to the console after each time the checkpoint completed. I don’t what mistake I’ve made.
    g
    • 2
    • 5
  • u

    申凯(lordk)

    01/03/2023, 9:09 AM
    https://issues.apache.org/jira/browse/FLINK-30546
1...434445...98Latest