https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Rafael Jeon

    01/27/2023, 9:46 AM
    Hi, Can I use
    flink-sql
    on native k8s ?
    w
    • 2
    • 1
  • k

    Kosta Sovaridis

    01/27/2023, 10:07 AM
    Hi, I am looking at the DataStream connectors and I saw there is an avro connector, unfortunatly I am using Confluent Avro Schemas. Am I forced to use table API to fetch/write the data? I already have POJOs and Schemas for all the models and I was hoping I could just pass the POJO to DataStream instead of having to define the table for each pojo in table API
    r
    b
    • 3
    • 5
  • l

    Leon Xu

    01/27/2023, 3:38 PM
    We are trying to upgrade Flink from 1.12.7 to 1.16.0. But we run into the following issue. After the upgrade, when we submit the job through application mode and now it gets the following exception. Looks like it failed to upload the temp flink conf file onto S3. In Flink 1.12.7 we don’t have this issue. I am wondering if we can get some help here.
    Copy code
    org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster
    	at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:478) ~[flink-yarn-1.16.0.jar!/:1.16.0]
            ......
    Caused by: org.apache.hadoop.fs.PathIOException: `Cannot get relative path for URI:file:///tmp/application_1674531932229_0030-flink-conf.yaml587547081521530798.tmp': Input/output error
    	at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.getFinalPath(CopyFromLocalOperation.java:360) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.uploadSourceFromFS(CopyFromLocalOperation.java:222) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.execute(CopyFromLocalOperation.java:169) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$copyFromLocalFile$25(S3AFileSystem.java:3920) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[hadoop-common-3.3.3.jar!/:?]
    	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) ~[hadoop-common-3.3.3.jar!/:?]
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFromLocalFile(S3AFileSystem.java:3913) ~[flink-s3-fs-hadoop-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnApplicationFileUploader.copyToRemoteApplicationDir(YarnApplicationFileUploader.java:397) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnApplicationFileUploader.uploadLocalFileToRemote(YarnApplicationFileUploader.java:202) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnApplicationFileUploader.registerSingleLocalResource(YarnApplicationFileUploader.java:181) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1047) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:623) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:471) ~[flink-yarn-1.16.0.jar!/:1.16.0]
    	... 35 more
  • d

    Darin Amos

    01/27/2023, 3:39 PM
    Hi All! I’ve seen a few anecdotes suggesting using the
    ContinuousFileReaderOperator
    is discouraged in favour of the new
    FileSource
    . I’m wondering if there is any risk of the
    ContinuousFileReaderOperator
    being deprecated in future versions. This is a critical operator for my team. We use this as a source in the middle of our pipeline because the true data source depends on other upstream events (user actions).
    m
    • 2
    • 7
  • b

    Bobby Richard

    01/27/2023, 3:39 PM
    With exactly once enabled for my kafka producers, I am seeing extremely long startup times in my jobs. The sink task is stuck in Initializing for 30-60mins and I see hundreds of log messages like this
    Copy code
    ProducerId set to 969204 with epoch 8
    Invoking InitProducerId for the first time in order
    Discovered transaction coordinator
    ProducerId set to 970010 with epoch 8
    Invoking InitProducerId for the first time in order
    Why are so many producers being created, and what determines the number of producers? I would expect the number of producers to equal the parallelism (8 in this case), but that doesn't seem to be the case?
    • 1
    • 1
  • a

    Arran

    01/27/2023, 3:53 PM
    Hi all, I'm trying to make a simple proof of concept ahead of a larger migration of some existing basic data streaming jobs into Flink from another framework we're moving away from. The PoC is a streaming env with two KafkaSources, that sink to a JDBC connector to a PGSQL database, using Flink 1.16. I'm able to run the job without issue and continuously within my IDE (IntelliJ on windows), however when submitting it to a standalone flink deployment (running on a Linux EC2), every 29m50-something seconds it cancels and restarts, like clockwork. The regularity suggests a misconfiguration, but I've been through everything I can think of and am now a little lost about where to look. Any thoughts?
    s
    • 2
    • 3
  • a

    Ans Fida

    01/27/2023, 8:02 PM
    Question about Flink checkpointing: If Flink checkpoints while streaming data from S3, does it: 1. Save the file’s state (filenames, timestamp etc.) in checkpoints? 2. Can this state be read manually to extract some of the checkpoint metadata if needed?
    m
    s
    • 3
    • 6
  • a

    Adesh Dsilva

    01/27/2023, 9:31 PM
    Hi, I was wondering how can I optimize the distribution of topic partitions across my kafka consumers I have 5 kafka topics, with 4, 4, 4, 2, 2 partitions (each topic respectively) which comes to 16 partitions. I started flink with parallelism of 8 with 2 slots per TM. Attached is the screenshot where you can see one sub task is idle. How can I ensure all my sub tasks are assigned 2 partitions each so that all of them are consuming messages? (Want to avoid any rebalance or shuffle if possible)
    s
    • 2
    • 3
  • s

    Sumit Nekar

    01/28/2023, 10:22 AM
    Hello, I am currently using flink 1.13.6 and deploying jobs in application mode using flink kubernetes operator (default native mode). I would like assign different requests and limits for the TM pods . How can I specify this in FlinkDeployment. Seems
    kubernetes.jobmanager.cpu.limit-factor
    was added in flink >= 1.15.X. I am using 1.13.6 currently and tried following options. But din work.
    Copy code
    taskManager:
        resource:
          memory: "2000m"
          cpu: "1"
        podTemplate:
          apiVersion: v1
          kind: Pod
          metadata:
            name: task-manager-pod-template
          spec:
            containers:
            - name: flink-main-container
              resources:
                requests:
                  memory: "2000m"
                  cpu: "1"
                limits:
                  memory: "3000m"
                  cpu: "2"
    Section resource overrides the resources section inside containers section. Tried removing resources all together and that was setting CPU requests and limits equal to number of task slots . Its completely ignoring the resources section inside containers section
    Copy code
    taskManager:
        podTemplate:
          apiVersion: v1
          kind: Pod
          metadata:
            name: task-manager-pod-template
          spec:
            containers:
            - name: flink-main-container
              resources:
                requests:
                  memory: "2000m"
                  cpu: "1"
                limits:
                  memory: "3000m"
                  cpu: "2"
    Any options in flink 1.13.6 or does flink-kubernetes-operator provide any such options (edited)
    b
    g
    • 3
    • 4
  • s

    Slackbot

    01/29/2023, 5:07 AM
    This message was deleted.
    m
    • 2
    • 1
  • p

    P Singh

    01/29/2023, 12:30 PM
    Hey There, I had beam in production which was running immune without any issues. though past couple of months beam pipeline getting down without any trackback which is weird too. Anyways... I have decided to run the beam pipelines on top on Flink cluster and followed this blog https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html I have setup a Flink cluster on Kubernetes locally. I pulled flink:1.14.6 and apache/beam_python3.10_sdk:2.43.0. when I run python basic.py(beam pipeline code) it doesn't submit the job to Flink (I don't see any job on Flink UI) I searched on stack overflow and got to know that's version issues. the things is that no one know's which latest version is supported by both. can someone help me here...? Any help would be appreciated!
    s
    • 2
    • 1
  • k

    Kosta Sovaridis

    01/29/2023, 12:57 PM
    Hello there, I have a question about connected streams, I have a data stream of events (A) which needs additional mappings from a stream of events (B). However the streams A might need an event from B which has not been produced yet ( the inverse is also true ) and might be produced in x hours, days, weeks. They both have a shared key I can use to match the corresponding events My solution is to connect stream B to stream A then using a RichCoFlatMapFunction to hold the events of type A or B until I have an event of each to produce the mapping. Does this solution ensures that all events no matter how far appart will be indeed processed? ( I did not set any windowing )?
  • w

    Wai Chee Yau

    01/29/2023, 1:12 PM
    Newbie question, I have a DataStream<MyPoJo>, I like to convert it to a DataStream<Row> or DataStream<RowData> in order to write out to iceberg. Is there a convenience method in the flink api to automatically do this? MyPojo in this my case can be hundreds of different classes. Thanks
    i
    n
    • 3
    • 3
  • s

    Srivatsav Gorti

    01/29/2023, 1:18 PM
    Hello flink community, I am working on a flink job to consume events from kafka as source and write the data stream events to S3 bucket (s3 as sink basically). Here’s the sample code for the same. I have the S3 credentials configured in the flink-conf.yml. The checkpointing works well and I can see the metadata files being written to S3. But the events from data stream are not being written to the S3 buckets and there’s no error in the sink while the job is running. I am unable to figure out what’s going wrong ! Any help on this pls ! Thanks
    Copy code
    public void run() throws Exception {
            StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
            environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            environment.setStateBackend(new FsStateBackend("<s3://dev/checkpoint>"));
            environment.enableCheckpointing(50000L);
            environment.getConfig().setGlobalJobParameters(parameters);
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "temp");
    
            DataStream<ObjectNode> source = environment
                    .addSource(new FlinkKafkaConsumer<>("test", new JSONKeyValueDeserializationSchema(false), properties))
                    .rebalance();
    
            SingleOutputStreamOperator<Tuple4<String, String, Integer, Long>> workflow = source
                    .map(new RecordMapper())
                    .keyBy(0, 1)
                    .timeWindow(Time.minutes(2))
                    .aggregate(new TimeStampAggregateFunction());
    
            //S3 sink
            StreamingFileSink<Tuple4<String, String, Integer, Long>> sink = StreamingFileSink
                    .forRowFormat(new Path("<s3://dev/sink-test>"),
                            (Encoder<Tuple4<String, String, Integer, Long>>) (element, stream) -> {
                                PrintStream out = new PrintStream(stream);
                                out.println(element.f1);
                            })
                    .withBucketAssigner(new BucketAssigner())
                    .build();
    
            workflow.addSink(sink);
    
            environment.execute();
        }
    r
    • 2
    • 1
  • s

    Sumit Aich

    01/30/2023, 12:14 PM
    Hi team, i was trying to upgrade from Flink K8s Operator version 1.1.0 to 1.3.1 . I followed the Operator Upgrade doc . But I’m seeing the following error while upgrading the Helm chart -
    Copy code
    Error: UPGRADE FAILED: rendered manifests contain a resource that already exists. Unable to continue with update: ServiceAccount "flink" in namespace "flink-operator" exists and cannot be imported into the current release: invalid ownership metadata; label validation error: missing key "<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>": must be set to "Helm"; annotation validation error: missing key "<http://meta.helm.sh/release-name|meta.helm.sh/release-name>": must be set to "flink-operator"; annotation validation error: missing key "<http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>": must be set to "flink-operator"
    Can you guys help check this ? or is this happening because the Operator Helm chart is not backward compatible ?
    g
    • 2
    • 3
  • m

    Mohit Aggarwal

    01/30/2023, 12:30 PM
    Hi Team, I am following these instructions mentioned here to run Flink K8s operator on Google Kubernetes Engine. After running the following command the cert-manager pods fail to start.
    Copy code
    kubectl create -f <https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml>
    I checked the logs for pods. I can see this error in pod logs
    Copy code
    exec /app/cmd/cainjector/cainjector: exec format error
    Has anyone faced a similar issue ?
    g
    • 2
    • 10
  • s

    Sumit Aich

    01/30/2023, 1:24 PM
    Hi Team, is the Helm chart for the Flink K8s Operator backward compatible?
    g
    • 2
    • 3
  • c

    chunilal kukreja

    01/30/2023, 1:55 PM
    Hi Team, While using asynretry stratergies with async i/o operator, is there a way to know the retry count in asyncInvoke() api?
  • g

    Guruguha Marur Sreenivasa

    01/30/2023, 6:28 PM
    Hi All, We're seeing that Flink consumers are one of the main reasons for a spike in our AWS data transfer costs. We'd like to use a rack-aware Flink consumer client so we can reduce these costs. Is it currently possible in Flink? I see this ticket: https://issues.apache.org/jira/browse/FLINK-29398 and its still in open state. There is a FLIP page as well for the same. Could someone please clarify on this? Thank you! 🙂
    j
    r
    • 3
    • 30
  • e

    Eric Xiao

    01/30/2023, 10:28 PM
    We have a simple test pipeline that looks like this. I've been observing the storage space taken by RocksDB
    kubernetes.ephemeral_storage.usage
    and when the pipeline has been running for over a day every TM has about 20 GB of storage used. I'm trying to understand if: • This is "normal" and the pipeline just has a large amount of state • If one of our operators is keeping state indefinitely or • If our RocksDB is not clearing out old state properly. I have been noticing that the
    flatmap
    operator's state has only been increase, only have 1 hour of data as we just enabled the rocksDB metric (
    state.backend.rocksdb.metrics.estimate-num-keys: true
    ). I saw in this guide to turn on some other RocksDB metrics, so we are waiting for that as well. I was wondering if anyone else has encountered this situation before?
  • s

    Sumit Nekar

    01/31/2023, 3:26 AM
    Hello, I am getting following error in Job manager and job gets restarted every time we hit this error. I am using flink 1.13.6 (Using flink k8s operator) and can not upgrade version immediately. Strangely Job Manager or Task manager pods never restarted but job itself restarted and checkpoints also failed. Any suggestions on this?
    Copy code
    java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[?:?] at java.net.SocketInputStream.socketRead(Unknown Source) ~[?:?] at java.net.SocketInputStream.read(Unknown Source) ~[?:?] at java.net.SocketInputStream.read(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketInputRecord.read(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketInputRecord.readHeader(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketInputRecord.decode(Unknown Source) ~[?:?] at sun.security.ssl.SSLTransport.decode(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketImpl.decode(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketImpl.startHandshake(Unknown Source) ~[?:?] at sun.security.ssl.SSLSocketImpl.startHandshake(Unknown Source) ~[?:?] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:319) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:283) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.RealConnection.connect(RealConnection.java:168) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor.intercept(BackwardsCompatibilityInterceptor.java:134) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createHttpClient$3(HttpClientUtils.java:109) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:254) ~[flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:200) [flink-dist_2.12-1.13.6.jar:1.13.6] at org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [flink-dist_2.12-1.13.6.jar:1.13.6] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.
  • d

    Dheeraj Panangat

    01/31/2023, 4:18 AM
    Hi Team, What is the ideal way to pass configuration to Flink using Native Kubernetes, For Eg: I am connecting to a database and the url, username password has to come from configuration. Also these values are environment specific dev,qa,prod. What is the ideal way to handle this ? Also how do I read the configuration inside the job (java Example) ? Appreciate any help on this. Thanks.
    g
    • 2
    • 2
  • a

    Abhinav sharma

    01/31/2023, 9:17 AM
    Is there a way to store flink v1.14 or v1.15 results into OpenSearch for it to be used later?
    m
    • 2
    • 1
  • s

    soudipta das

    01/31/2023, 9:19 AM
    Hi, Team, Is is possible to convert a Group Window to an Over Window using Table API. I am trying to use session window using the event time attribute which creates a group window. Is it possible to convert this to an Over Window. I am on version 1.16 Sample of the snippet am trying
    Copy code
    GroupWindowedTable groupWindowedTable = baseTable
                .window(Session.withGap(lit(10).minutes())
                        .on($("received_timestamp")).as("window"));
    
        WindowGroupedTable windowGroupedTable = groupWindowedTable.groupBy($("window"), $("account_id"));
    
    TablePipeline tablePipeline = windowGroupedTable.select(
                        $("account_id"), $("play_device_upload_time").lastValue().over($("account_id")))
    From what the log says, seems like its not possible? But wanted to check if there is something wrong with the way I have chained the calls. Sample log
    Copy code
    Caused by: org.apache.flink.table.api.ValidationException: Could not resolve over call.
    	at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.lambda$visit$0(OverWindowResolverRule.java:75) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at java.util.Optional.orElseThrow(Unknown Source) ~[?:?]
    	at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:73) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule$ExpressionResolverVisitor.visit(OverWindowResolverRule.java:57) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:97) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.lambda$apply$0(OverWindowResolverRule.java:53) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
    	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
    	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
    	at java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
    	at org.apache.flink.table.expressions.resolver.rules.OverWindowResolverRule.apply(OverWindowResolverRule.java:54) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:247) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at java.util.function.Function.lambda$andThen$1(Unknown Source) ~[?:?]
    	at java.util.function.Function.lambda$andThen$1(Unknown Source) ~[?:?]
    	at java.util.function.Function.lambda$andThen$1(Unknown Source) ~[?:?]
    	at java.util.function.Function.lambda$andThen$1(Unknown Source) ~[?:?]
    	at java.util.function.Function.lambda$andThen$1(Unknown Source) ~[?:?]
    	at org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:210) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:199) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:174) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:638) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
    	at *******************.*****.execute(Test.java:195) ~[?:?]
    	at ****************************.******.main(Test.java:23) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    Any pointers or suggestions much appreciated, thanks in advance
    m
    • 2
    • 3
  • y

    Yang LI

    01/31/2023, 9:30 AM
    Hello guys, I have seen a new release of frocksdb which bring some good performance improvement https://apache-flink.slack.com/archives/C03FYR328H4/p1675084023554609, somebody knows how I can use this rocksdb version for our current flink 16 job
    👍 1
    m
    • 2
    • 3
  • r

    Reme Ajayi

    01/31/2023, 11:39 AM
    How can I properly use map (or other operators) on Generic and specific records with the Java Datastream API? I have tried to use map on generic records but I get
    Copy code
    Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
    	at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
    	at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    	at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    	... 14 more
    Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    	... 22 more
    My transformation code:
    Copy code
    DataStream<GenericRecord> mappedStream = historyLedgerStream.map(new MapFunction<GenericRecord, GenericRecord>() {
    			@Override
    			public GenericRecord map(GenericRecord value) throws Exception {
    				String result ="";
    				if (value.get("currency") == "cad") {
    					value.put("currency", "Canadian Dollar" );
    				}
    				return value;
    			}
    		});
    w
    • 2
    • 5
  • n

    Nitin Agrawal

    01/31/2023, 2:13 PM
    Hi Everyone, For one of SQL Flink Job we set the source as filesystem (EFS). The day at which job went live i.e. 25th Jan,2023 the data is being continuously read from the EFS and the required event is sinked to Kinesis successfully. But from 26th no data is being read from the EFS by Flink Job though EFS have new files (confirmed by looking at checkpoints that contains only 25th Jan files). The job state is RUNNING and in the logs we are not seeing and failures or error message. Need the input from the community what can be the probable reason which might have caused the issue.
    m
    s
    • 3
    • 19
  • n

    Nitin Agrawal

    01/31/2023, 2:14 PM
    Copy code
    WITH (
      'auto-compaction' = 'true',
      'connector' = 'filesystem',
      'format' = 'json',
      'path' = '/flink-data-service/',
      'source.monitor-interval' = '30s'
    )
  • p

    PRASHANT GUPTA

    01/31/2023, 3:00 PM
    Hi Everyone , i am using flink JDBC Connector for sinking data to postgres db. i wanted to have some alerting mechanism incase my flink job is disconnected from Postgres db . i am not able to figure out any jdbc connector related metrics that i could use to raise an alert . Is there any metrics available to check out postgres connection status , if not how can i implement a custom metrics of type guage to perform this task. CONTEXT : Source -> Kafka Sink -> Postgres db , Flink version 1.16 , Operator Flink Kubernetes operator
    • 1
    • 1
  • p

    Piotr Pawlaczek

    01/31/2023, 3:30 PM
    Hi, is there
    unnest
    function counterpart in Flink SQL? For example, lets say I have a following csv file:
    Copy code
    Date,USD,GBP
    2022-08-01,1.01,1.30
    In postgres I can run:
    Copy code
    SELECT date,
       unnest(array['USD','GBP']) as currency
       unnset(array[USD,GBP]) as rate
    so I get:
    Copy code
    date.     |currency|rate
    ----------+--------+----
    2022-08-01|USD.    |1.01
    2022-08-01|GBP.    |1.30
    In other words: is there a way to convert columns into rows?
    m
    • 2
    • 1
1...515253...98Latest