https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • n

    Nitin Agrawal

    12/09/2022, 10:29 AM
    Hi All, As of now we have one Kinesis Data Stream (KDS). Multiple clients publish different event type to the same KDS. For the Flink, data source is the KDS. As part of Flink we do per job deployment i.e. each job runs in a separate environment. That also means for multiple Flink jobs, data source will have KDS. Each job is generally interested in one or two types of events. There are jobs which can be interested in overlapping events as well I.e. Job1 is interested in event 1 as well as event 2 and Job 2 is interested in event 1 as well as event 3. Also in KDS, clients currently publish more than 200+ types of events. So, it is certain that each Job will end up reading 200+ kind of events even though they are interested in 2 events type. The number of Jobs deployed are 30+. If we look deeply it will be evident that most of the request made to KDS by Flink is redundant. We are aiming to have architecture in which each Job reads only the events that they are interested in rather than reading whole KDS as well as to be near real time. • We were thinking to dump the events from KDS to S3 with partition as eventType / YY / MM / DD and use same S3 as source for Flink jobs. The S3 delivery will be KDS -> Firehose -> S3. But the drawback is that it will be more of batch processing rather stream processing. • Also we are exploring the possibility that particular event type is always written to a shard of KDS and Flink job can be mapped to read those shards. This will lead to streaming structure which omits the redundant read from KDS by Flink jobs. Need help from the community to understand if anyone has solved above problem in past using similar approach as above or a different one.
    m
    • 2
    • 7
  • m

    Momir Beljic

    12/09/2022, 11:48 AM
    Hi, how would be possible to convert table api stream to pandas? I am using this code:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
        t_env = StreamTableEnvironment.create(stream_execution_environment=env)
     
        input_table_name = "input_process_table"
        output_table_name = "output_process_table"
        create_source = create_source_table(input_table_name, "process_events", "eu-central-1", "LATEST")
        t_env.execute_sql(create_source)
       
        table = t_env.from_path(input_table_name)
        print('Table')
        print(table.execute().print())
        
        pdf = table.limit(1000).to_pandas()
    Thanks!
    x
    • 2
    • 1
  • p

    Patrick Lucas

    12/09/2022, 2:49 PM
    Hellooo everyone. I hit an unpleasant surprise today when trying to use the ES connector and GS filesystem in the same job. It kind of looks like a relocation-gone-wrong in flink-gs-fs-hadoop: basically, the Elasticsearch client uses the Apache commons HTTP client, but this HTTP client also gets shaded into flink-gs-fs-hadoop, but not relocated. The error I'm seeing comes from a particular class (HttpAuthenticator) that is included with flink-gs-fs-hadoop without having been relocated (so at
    org.apache.http.impl.auth.HttpAuthenticator
    ), but yet having been rewritten to reference a relocated class (
    <http://org.apache.flink.fs.shaded.hadoop3.org|org.apache.flink.fs.shaded.hadoop3.org>.apache.commons.logging.Log
    ). When the Elasticsearch client code wants to instantiate an
    HttpAuthenticator
    , it hits this rewritten version, and it fails because it's trying to pass an un-relocated
    org.apache.commons.logging.Log
    . My guess is that
    HttpAuthenticator
    (and presumably anything else being bundled from
    org.apache.httpcomponents
    ?) should be relocated within
    flink-gs-fs-hadoop
    . I'm tempted to call this a bug, but I'd be surprised if no one else had run into this before, as using GCS and ES at the same time doesn't seem that rare. Could someone give me a sanity check that there isn't something obvious I'm doing wrong? Minimal reproducing case: https://github.com/patricklucas/flink-gs-es-problem Stacktrace in thread.
    c
    • 2
    • 11
  • a

    André Casimiro

    12/09/2022, 7:06 PM
    Hi guys, I feel there's an easy/better way to do a windowed rolling average that building my own aggregator. I found the
    AverageAccumulator
    but got wondering if there's a standard way to just plug it into some other generic aggregator. Now I have to write the same logic for other operations like max/min/sum calculation? Any suggestions?
    d
    • 2
    • 3
  • e

    Emmanuel Leroy

    12/09/2022, 9:42 PM
    when using FileSink, the rolling policy is usually CheckPointRollingPolicy, which is based on processing time I guess. Is there any way to have a RollingPolicy based on event time? Looking at the code for RollingPolicy, I don’t see a way to get access to the watermark. I see there is a way to trigger on an event, but there I don’t see how to access current time (I could look at event time vs current time bt i need current time) Is there any way to do this? The goal is to manage rolling of files when in recovery mode where I’d be fetching from kafka to catch up, and therefore i’d want to roll the files based on the event time rather than wait for the next checkpoint.
    d
    • 2
    • 1
  • j

    Jin S

    12/09/2022, 11:08 PM
    About the Kerberos Delegation Token Framework changes on Flink 1.17, the proxy user scenario is not addressed yet. Would future discussions involve loading delegation tokens from the
    HADOOP_TOKEN_FILE_LOCATION
    (given that the superuser has put DTs in that location)? Thanks :)
    m
    • 2
    • 2
  • s

    Sumit Nekar

    12/10/2022, 5:42 AM
    Hello Team, I was reading about async sink . https://flink.apache.org/2022/05/06/async-sink-base.html. Can this be used with kafka as sink to solve some of issues we see when the throughput is high?
    m
    h
    • 3
    • 5
  • l

    Lee xu

    12/10/2022, 11:09 AM
    python pemja cannot be installed on Windows? Fobuild\temp.win-amd64-3.9\Release\src/main/c/pemja/core\PythonInterpreter.obj PythonInterpreter.c D:\Program Files\Java\jdk1.8.0_281\include\jni.h(45): fatal error C1083: 无法打开包括文件: “jni_md.h”: No such file or directory error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio\\2019\\BuildTools\\VC\\Tools\\MSVC\\14.29.30133\\bin\\HostX86\\x64\\cl.exe' failed with exit code 2 [end of output] note: This error originates from a subprocess, and is likely not a problem with pip. ERROR: Failed building wheel for pemja Failed to build pemja ERROR: Could not build wheels for pemja, which is required to install pyproject.toml-based projects
    x
    • 2
    • 1
  • t

    Thiruvenkadesh Someswaran

    12/10/2022, 6:24 PM
    I seem to be messing up on how i load my plugin any help 🙏
    UnsupportedFileSystemSchemeException
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
    name: basic-session-deployment-only-example
    spec:
    image: flink:1.15
    flinkVersion: v1_15
    flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    serviceAccount: flink
    podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
    name: pod-template
    spec:
    tolerations:
    - key: "kafka"
    operator: "Equal"
    value: "true"
    effect: "NoSchedule"
    containers:
    # Do not change the main container name
    - name: flink-main-container
    envFrom:
    - configMapRef:
    name: application-properties-flink
    env:
    - name: ENABLE_BUILT_IN_PLUGINS
    value: flink-s3-fs-presto-1.15.3.jar
    jobManager:
    resource:
    memory: "2048m"
    cpu: 1
    taskManager:
    resource:
    memory: "2048m"
    cpu: 1
    my Flink Session job
    kubectl get <http://flinksessionjobs.flink.apache.org|flinksessionjobs.flink.apache.org> -n flink alerting -o yaml
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
    creationTimestamp: "2022-12-10T17:26:29Z"
    finalizers:
    - <http://flinksessionjobs.flink.apache.org/finalizer|flinksessionjobs.flink.apache.org/finalizer>
    generation: 2
    name: alerting
    namespace: flink
    resourceVersion: "2383389"
    uid: 6a2b29f5-4002-4977-8204-52c1acaa3a66
    spec:
    deploymentName: basic-session-deployment-only-example
    job:
    args: []
    jarURI: <s3://MYS3-JARbucket/myjar.jar>
    parallelism: 4
    state: running
    upgradeMode: stateless
    status:
    error: 'org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
    find a file system implementation for scheme ''s3''. The scheme is directly supported
    by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto.
    Please ensure that each plugin resides within its own subfolder within the plugins
    directory. See <https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html>
    for more information. If you want to use a Hadoop file system for that scheme,
    please add the scheme to the configuration fs.allowed-fallback-filesystems. For
    a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.>'
  • k

    kingsathurthi

    12/11/2022, 7:46 AM
    I'm setting up kubernetes standalone HA but unable to make job manager running, what I'm missing? Please help
    - The 'host' parameter of 'Jobmanager.sh' has been deprecated. Please use -D Key: 'jobmanager
    2022-12-11 07:37:39,054 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypaint 2022-12-11 07:37:39,089 INFO org.apache.flink.runtime.entrypoint. ClusterEntrypoint
    -Starting StandaloneSessionClusterEntrypoint. -Install default filesystem.
    2022-12-11 07:37:39,093 INFO org.apache.flink.core.fs.FileSystem
    [] Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via
    Hadoop is not available. 2022-12-11 07:37:39,139 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    Install security context. [] Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
    2022-12-11 07:37:39,150 INFO
    org.apache.flink.runtime.security.modules.HadoopModuleFactory 2022-12-11 07:37:39,155 INFO org.apache.flink.runtime.security.modules.JaasModule
    Jaas file will be created as /tmp/jaas-3448220580025284093.conf.
    2022-12-11 07:37:39,163 INFO org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classp
    ath.
    2822-12-11 07:37:39,165 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint 2822-12-11 07:37:39,173 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    Initializing cluster services.
    -Using working directory: WorkingDirectory(/tmp/js_fcdb578a7424085b45f144475881118a). Trying to start actor system, external address 192.168.27.109:6123, bind address 0.0.0.0:612 -
    2022-12-11 07:37:39,552 INFO org.apache.flink.runtime.rpc.akka.AldkaRpcServiceUtils 3.
    2822-12-11 07:37:48,564
    INFO akka.event.slf4j.slf4jLogger
    -Slf4jLogger started
    2822-12-11 07:37:40,612 INFO akka.remote.RemoteActorRefProvider
    -Alda Cluster not in use enabling unsafe features anyway because 'alda.remote.use-unsafe-re
    note-features-outside-cluster' has been enabled.
    2622-12-11 07:37:40,613 INFO akka.remote.Remoting 2022-12-11 07:37:40,822 INFO akka.remote.Remoting
    [] Starting remoting
    -Remoting started; Listening on addresses [<akka.tcp://link@192.168.27.109:6123|akka.tcp://link@192.168.27.109:6123>] [] Actor system started at <akka.tcp://flink@192.168.27.109:6123|akka.tcp://flink@192.168.27.109:6123> [] Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics
    2022-12-11 07:37:48,978 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
    2822-12-11 07:37:41,438 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.HubernetesHaServi
    m
    s
    • 3
    • 4
  • k

    kingsathurthi

    12/11/2022, 5:25 PM
    Always getting this permission denied error when starting deployment. What is reason behind this? I'm using K8s HA deployment and configmap yaml from official document
    Copy code
    kingsathurthi@N-5CG2124DKR:-/flink/doc-standalone$ k logs flink-jobmanager-7d4447c69d-7tfc6 /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied sed: couldn't open temporary file /opt/flink/conf/sedeqcoZa: Read-only file system /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied /docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
    
    Starting Job Manager Starting standalonesession as a console application on host flink-jobmanager-7d4447c69d-7tfc6.
    
    2822-12-11 17:27:15,459 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    
    2022-12-11 17:27:15,461 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
    
    2022-12-11 17:27:15,461 INFO org.apache.flink.runtime.entrypoint. ClusterEntrypoint
    s
    • 2
    • 7
  • l

    Liubov

    12/12/2022, 10:24 AM
    Howdy all. We have an issue with OOM: Metaspace on flink 1.14.4 and basically I can reproduce it with pretty simple app, reading from file source, writing to kafka sink. We have k8s setup, and I can emulate it with job parallelism 2, killing 1 task pod and making job manager restart the job. Metaspace on the other task manager pod grows with every restart. The issue we had was similar, just task was dying because of kafka exception and we had kinesis source, still restart of the job leaded to OOM. Is it smth known? I saw report on OOMs for flink 10, but they are closed and said to be fixed.
    s
    • 2
    • 2
  • j

    Jirawech Siwawut

    12/12/2022, 10:37 AM
    Hello. I really need help on this issue if anyone could help. I really appreciate it https://apache-flink.slack.com/archives/C03G7LJTS2G/p1670118109892989
    r
    m
    • 3
    • 3
  • c

    Conor McGovern

    12/12/2022, 12:12 PM
    Hi, we are observing checkpoint failures in our job when writing to s3 (flink 1.13.6) with the following error attached:
    Copy code
    Caused by: com.amazonaws.ResetException: The request to the service failed with a retryable reason, but resetting the request input stream has failed. See exception.getExtraInfo or debug-level logging for the original failure that caused this retry.;  If the request involves an input stream, the maximum stream buffer size can be configured via request.getRequestClientOptions().setReadLimit(int)
    Caused by:
    Copy code
    Caused by: java.io.IOException: Resetting to invalid mark
    (Full stacktrace in thread). Anybody know of a workaround/fix for this issue?
    • 1
    • 1
  • a

    Amir Halatzi

    12/12/2022, 1:18 PM
    Hey all! I’d like to add a geolocation service to our Flink environment (i.e. given an IP - returns a struct with geolocation data), and I’m wondering what is the correct way to implement it? • Should it be a Scalar UDF? I haven’t seen an explicit async implementation so I’m not sure about that • Should it be a DynamicTableSource? It seems a bit of an overkill, since it will be used in a streaming scenario (i.e. one call per event) • Something else? Any input would be much appreciated!
    s
    • 2
    • 3
  • t

    Tudor Plugaru

    12/12/2022, 1:48 PM
    Hey, I am trying to set up metrics exporting to Datadog and I have the following configuration used:
    Copy code
    metrics.reporters: dghttp
        metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
        metrics.reporter.dghttp.apikey: redacted
        metrics.reporter.dghttp.dataCenter: US
        metrics.reporter.dghttp.useLogicalIdentifier: true
        metrics.reporter.dghttp.tags: cluster_name:redacted, kube_deployment:redacted
        <http://metrics.scope.jm|metrics.scope.jm>: flink.jobmanager
        metrics.scope.jm.job: flink.jobmanager.job
        <http://metrics.scope.tm|metrics.scope.tm>: flink.taskmanager
        metrics.scope.tm.job: flink.taskmanager.job
        metrics.scope.task: flink.task
        metrics.scope.operator: flink.operator
    But from what I see, I can’t rewrite the scopes of the metrics. They arrive in Datadog with the
    taskmanager
    and
    jobmanager
    prefix and not with the
    flink
    prefix as required by Datadog docs in order to not be counted as customer metrics. Looking at the code I tried using
    tm-job
    instead of
    tm.job
    but still, the issue remains. Anyone can advice please 🙏
    c
    m
    • 3
    • 19
  • j

    Joris Basiglio

    12/12/2022, 3:14 PM
    Hey, is it possible for Flink to perform a graceful shutdown by creating a final checkpoint (rather than a savepoint?)
    d
    • 2
    • 2
  • f

    Felix Angell

    12/12/2022, 4:09 PM
    hey all, we're trying to diagnose one of our flink applications which keeps crashing after a few days. one of the correlating logs we have to the uptime is this:
    Copy code
    apache.flink.streaming.connectors.kinesis.model.SequenceNumber cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType
    we're using flink 1.13.2 on AWS KDA. it looks as though this is a known problem and would degrade performance. is it worth investigating this further or would the effect be negligible?
    s
    h
    • 3
    • 25
  • s

    Sitara Wishal

    12/12/2022, 6:07 PM
    Hi all! I am trying to use one input data stream to transform more than one tables; hence different output streams using AWS KDA studio. I am running into the RuntimeException after just a minute. Has anyone faced this issue before or if someone can suggest how to increase the retry attempts for data reading?
    h
    • 2
    • 3
  • r

    Raf

    12/12/2022, 7:36 PM
    👋 Hi Folks, I'm using DBeaver to query a SQL Gateway with a HiveServer2 Endpoint. When issuing a query like
    SELECT * FROM table
    is it supposed to show the query results as they come? The query completes successfully because it's async (table.dml-sync=false), but for some reason, the results only show up if I stop the sql-gateway process.
  • e

    Emmanuel Leroy

    12/12/2022, 11:32 PM
    is the operator 1.3.rc1 image published somewhere or do I need to build it from source?
    g
    • 2
    • 4
  • s

    Suparn Lele

    12/13/2022, 4:06 AM
    Hi, I am trying to write a batch job which takes 1-1 hour data from Raw table, aggregates it and stores it into the sink database. I want the following flow 1. load data from 12.00 - 13.00 from raw table A 2. aggregate the data 3. Store the data 4. Go to step 2 and repeat for interval 13.00 - 14.00 and so on. I want this to run sequentially, as in only when 12.00 - 13.00 data is getting stored, we would go for 13.00 - 14.00. I think the problem in this is that suppose I run my program like following.
    val streamExecutionEnv = StreamExecutionEnvironment.getEnvironment
    intervals.foreach(interval => run whole pipeline for this interval)
    streamExecutionEnv.execute()
    If I am creating executing environement just once and using execute statement just once, then flink wont run this sequqnetially. I think it would do this simulatenously because there would be one execute at the end once all loops are done. But I want it to run sequentially only. How can I achieve this?
    m
    • 2
    • 6
  • k

    Kenny Lu

    12/13/2022, 4:41 AM
    Hi all, anyone run into issues with web submit enable set: false? We wanted to disable web submit but even job sent via pipeline is failing?
    Copy code
    # disable UI submit jobs
    web.submit.enable: "false"
  • p

    Prathit Malik

    12/13/2022, 10:25 AM
    Hi All, I want to ingest data from kinesis data stream to S3 (using file sink). My use case is to consume CDC stream of dynamodb and ingest that to data-lake. Can someone please suggest what is the standard and preferred way of flink execution mode (streaming or batch) for such use cases? In Streaming mode there is rolling policy on checkpoint which makes the ingestion less error prone also checkpointing helps to keep track exactly where to read from. But in batch execution mode how does start and end of a batch is decided for kinesis data streams ? Thanks
    m
    • 2
    • 8
  • s

    Slackbot

    12/13/2022, 11:14 AM
    This message was deleted.
    ✅ 1
    c
    • 2
    • 3
  • k

    kingsathurthi

    12/13/2022, 12:04 PM
    when starting the jobamanager, getting base directory error. where i need to ensure the base directory?
    2022-12-13 11:54:25,468 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.
    2022-12-13 11:54:25,471 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0
    ]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.0.jar:1.16.0]
    ... 4 more
    2022-12-13 11:54:25,484 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostic
    s Cluster entrypoint has been closed externally..
  • a

    Abdelhakim Bendjabeur

    12/13/2022, 1:44 PM
    Hello 👋 I am experimenting on joining two cdc kafka topics using the table api and I noticed a weird behaviour:
    Copy code
    CREATE TABLE all_data (
        `id` INT,
        `accountId` INT,
        ...,
         PRIMARY KEY (id, accountId) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'property-version' = 'universal',
      'properties.bootstrap.servers' = 'host.docker.internal:29092',
      'value.format' = 'json',
      'key.format' = 'json',
      'topic' = 'some-topic'
    );
    
    INSERT INTO all_data
    WITH table_a_dedup AS (
    SELECT  *
    FROM (
      SELECT  *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY `timestamp` DESC) rn
      FROM table_a
    ) s
    WHERE rn = 1), 
    table_b_dedup AS (
      SELECT  *
      FROM (
        SELECT  *,
          ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn
        FROM table_b
      ) s
    WHERE rn = 1)
    SELECT 
        table_b_dedup.id,
        table_b_dedup.accountId,
    	table_b_dedup.`timestamp` as tableBTimestamp,
        table_a_dedup.`timestamp` as tableATimestamp,
        table_a_dedup.createdDatetime as tableACreatedDatetime,
        table_a_dedup.deletedDatetime as tableADeletedDatetime,
    FROM table_b_dedup
    LEFT JOIN table_a_dedup
    ON table_b_dedup.ticketId = table_a_dedup.id;
    whenever an update is applied on
    table_a
    , an extra log appears in the topic
    'some-topic'
    with all of its columns set to null. I can't find an explanation to it, did anyone experience this before? Any help is appreciated
    • 1
    • 1
  • m

    Matt Czyz

    12/13/2022, 4:06 PM
    Hello, I am developing an application using
    StreamTableEnvironment
    and having problems running it with Yarn on top of EMR - works fine if run from IDE or without any code dependent on
    StreamTableEnvironment
    . The application was created using gradle quickstart script without any additional packaged deps:
    Copy code
    bash -c "$(curl <https://flink.apache.org/q/gradle-quickstart.sh>)" -- 1.15.1 _2.12
    With code to narrow down the issue simply implementing the example as-is from: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/data_stream_api/#converting-between-datastream-and-table When executing with Yarn on EMR I am getting the following error:
    Copy code
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not instantiate the executor. Make sure a planner module is on the classpath
      at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
      at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
      at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
      at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
      at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
      at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
      at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
      at java.base/java.security.AccessController.doPrivileged(Native Method)
      at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
      at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
      at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
    The application executes fine if I replace
    flink-table-planner-loader-1.15.1.jar
    with
    flink-table-planner_2.12-1.15.1.jar
    under
    lib/
    directory, but if I understand correctly
    flink-table-planner-loader
    already contains the planner so it should work fine without replacing it with the legacy dependency?
    m
    • 2
    • 8
  • f

    Felix Angell

    12/13/2022, 4:33 PM
    we have a subtask that is at zero consistently across all of our flink apps consuming from the same kinesis stream. the kinesis stream has messages on all shards and so we figured that each subtask should be non-zero in what it's processing. how does the indexing/partitioning algorithm work for deciding what messages are processed by what subtask?
    h
    • 2
    • 3
  • a

    Adrian Chang

    12/13/2022, 9:28 PM
    Hello, I am using the Group Window Aggregation
    GROUP BY TUMBLE
    with a user-defined aggregate function. Can I expect the values sent to the function are in the order they were received ? I know that SQL does not guarantee order when applying an aggregation function. Does Flink SQL behave the same way or it respect the order of the events ? Thanks
    m
    • 2
    • 3
1...394041...98Latest