https://flink.apache.org/ logo
Docs
Join the conversationJoin Slack
Channels
announcements
builds
builds-debug
dev
flink-dev-benchmarks
introductions
lang-中文
random
show-and-tell
test-slack-archive
troubleshooting
Powered by Linen
troubleshooting
  • n

    Neha

    03/20/2023, 11:21 AM
    can anyone give me the reference to a good article to understand the difference between ingestion time vs processing time vs event time. where there are examples to explain when should we use what?
    m
    • 2
    • 14
  • f

    Fredrik

    03/20/2023, 11:23 AM
    👋 I’m a newbie trying to figure out how the parallelism is supposed to work using the K8s operator. I’m working based on the basic example. How is the
    parallelism
    setting in the
    job
    -spec supposed to work? If I change the replica count on the
    taskManager
    or change the
    taskmanager.numberOfTaskSlots: "2"
    but keep the
    job.parallelism
    setting as is, the parallelism of the job (according to the UI) changes. Changing the
    job.parallelism
    does not seem to have an effect. I tried to read the documentation for the CRD but it did not clarify the issue. Am I misunderstanding something?
    • 1
    • 1
  • m

    Mohit Aggarwal

    03/20/2023, 2:00 PM
    Hi When I am trying to run Flink job in HA mode with GCS path as a HA directory (eg : gs://flame-poc/ha) or while starting a job from checkpoints in GCS (Job is able to write checkpoints to GCS successfully). I am getting following exceptions:
    2023-03-20 13:53:34,477 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.
    2023-03-20 13:53:34,478 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	... 4 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	... 4 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    Has anyone faced a similar issue ?
    k
    • 2
    • 7
  • t

    Thijs van de Poll

    03/20/2023, 2:03 PM
    Hi all, I am attempting to write to an Elasticsearch 8 cluster. Is it correct that an ES-Flink connector for ES 8 is not available yet? And also that an ES:7-Flink connector is not compatible with Elasticsearch 8?
    m
    • 2
    • 9
  • r

    Rafał Trójczak

    03/20/2023, 2:49 PM
    Hi, All! I have a question concerning the following info from Flink:
    15:05:22.357 [main] INFO  o.a.f.a.java.typeutils.TypeExtractor -- Field Person#hobbies will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
    I have the following class:
    public class Person {
    
        private String name;
        private List<String> hobbies;
    
        public Person() {
        }
    
        public Person(String name, List<String> hobbies) {
            this.name = name;
            this.hobbies = hobbies;
        }
    // getters and setters
    }
    and I prepared this `TypeInformation`:
    TypeInformation<Person> personTypeInformation = Types.POJO(Person.class, Map.of(
        "name", Types.STRING,
        "hobbies", Types.LIST(Types.STRING)));
    I saw a few options that don't work for me: •
    @TypeInfo(PersonSerializerFactory.class)
    - but I can't use this approach because the
    Person
    class is in a different module. •
    returns
    method, e.g.:
    env.fromCollection(persons)
       .returns(personTypeInformation)
    but this doesn't seem to remove the problem. How can I add this type information to the environment?
  • s

    Siddhesh Kalgaonkar

    03/20/2023, 6:09 PM
    Hi #troubleshooting, I know that Pyflink uses Java APIs under the hood so does it mean that it uses JVM for each process? or does it uses PVM? I didn't find anything about this therefore wanted to clarify how it works internally. Can somebody enlighten me on this? Any links would also help. TIA
    d
    d
    • 3
    • 2
  • u

    Usman Ismail

    03/20/2023, 9:37 PM
    Hi #troubleshooting Does anyone have experience using the Open Search flink connector? I am trying to follow the guide here and getting the following exception:
    Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://localhost:9200>, response=HTTP/1.1 200 OK}
    	at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
    	at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
    	at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
    	at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
    ....
    Caused by: java.lang.NullPointerException
    	at java.base/java.util.Objects.requireNonNull(Objects.java:208)
    	at org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
    	at org.opensearch.action.index.IndexResponse.<init>(IndexResponse.java:67)
    This seems to be known issue for https://github.com/elastic/elasticsearch/issues/84173 elastic search but I don’t have clear path for open search
    t
    • 2
    • 7
  • s

    Simon Lawrence

    03/21/2023, 9:58 AM
    Hi all, I am currently looking at Flink deployment, high availability using zookeeper being the goal. The documentation provides examples for deploying a flink cluster running in session mode, however I am wondering if it is possible to deploy in application mode using zookeeper for high availability? If anyone could point me in the right direction that would be amazing. Thank you!
    k
    • 2
    • 8
  • t

    Thijs van de Poll

    03/21/2023, 10:21 AM
    Hi all, I am trying to write to an OpenSearch instance using the
    flink-connector-opensearch
    connector. However, it fails with the following error:
    java.lang.ClassNotFoundException: org.opensearch.common.Strings
    . I am unsure what is causing it, because I have been loading the
    .jar
    dependency to
    ${FLINK_HOME}/lib
    . Can anyone help me out? 🙂
    • 1
    • 1
  • t

    Thijs van de Poll

    03/21/2023, 11:46 AM
    @Martijn Visser Is it correct that Datastream API is compatible with OpenSearch 2.x and Table API is not compatible with OpenSearch 2.x yet? I am currently using Table API with OpenSearch 2.6, and I am getting the following error:
    Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://host.docker.internal:9200>, response=HTTP/1.1 200 OK}
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    jobmanager     |        ... 1 more
    jobmanager     | Caused by: java.lang.NullPointerException
    jobmanager     |        at java.base/java.util.Objects.requireNonNull(Unknown Source)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse.<init>(UpdateResponse.java:86)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:193)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:181)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:172)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:208)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1836)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1940)
    jobmanager     |        ... 18 more
  • o

    Oscar Perez

    03/21/2023, 1:06 PM
    hei, we are using testharness to test flink event time. I wonder how to use the processElement method in the test harness, should we pass there the event time as second parameter?
  • o

    Oscar Perez

    03/21/2023, 1:07 PM
    we are facing the problem that ctx.currentWatermark always return Instant.MIN.toEpochTime() even though we set processWatermark before and after calling processElement with values
  • o

    Oscar Perez

    03/21/2023, 1:12 PM
    also the onTimer method is never called even though we initialize a timer in the processElement function. We suspect this has to do with the fact that ctx.currentWatermark is always Instant.MIN no matter what
  • o

    Oscar Perez

    03/21/2023, 1:19 PM
    as a matter of fact when calling testHarness.processElement we can call it using streamRecord like this testHarness.processelement(StreamRecord(event)). In this case, no processing time is set. wonder what is the use case for this?
  • a

    Ari Huttunen

    03/21/2023, 1:44 PM
    I made this function for calculating the median with the t-digest library written in Java. It doesn't work 😭.
    package fi.elisa.datalake.flink.flinktools.aggregation;
    
    import org.apache.flink.table.api.*;
    import org.apache.flink.table.functions.AggregateFunction;
    import org.apache.flink.table.annotation.DataTypeHint;
    import static org.apache.flink.table.api.Expressions.*;
    
    import com.tdunning.math.stats.MergingDigest;
    
    public class MergingDigestMedian extends AggregateFunction<Double, MergingDigest> {
    
      @Override
      public @DataTypeHint("RAW") MergingDigest createAccumulator() {
        return new MergingDigest(100); // 100 is a common value for normal uses.
      }
    
      @Override
      public Double getValue(@DataTypeHint("RAW") MergingDigest acc) {
        return acc.quantile(0.5);
      }
    
      public void accumulate(@DataTypeHint("RAW") MergingDigest acc, Double value) {
        acc.add(value);
      }
    
    }
    I'm calling it in pyflink code by defining it as a temporary system function.
    table_env.create_java_temporary_system_function("udf_median", "fi.elisa.datalake.flink.flinktools.aggregation.MergingDigestMedian")
    It fails. I'll put the logs in the comments.
    • 1
    • 1
  • r

    Raghunadh Nittala

    03/21/2023, 1:45 PM
    Hi Team, Can the temporal join be achieved on Temporary views? I’m creating 2 tables using ‘kafka’ and ‘upsert-kafka’ connectors respectively, creating temporary views with some transformations on these tables. Finally joining both the temporary views. But this will be an equi-join and not temporal join. While tried to do temporal join, the client throws the exception -
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found
    . The table has primary key but when view is created on top of it, there is no primary key.
  • v

    Virender Bhargav

    03/21/2023, 4:34 PM
    Hey, I am trying to use State Processor API to cleanup an existing savepoint.
    ExistingSavepoint savepoint = Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend());
    DataSet<KeyedState> keyedState = savepoint.readKeyedState(""uid1"", new CoreProcessStateReaderFunction());
    BootstrapTransformation<KeyedState> transformation = OperatorTransformation
                    .bootstrapWith(keyedState)
                    .keyBy(state -> state.id )
                    .transform(new StateBootstrapper());
    Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend())
            .withOperator("uid1", transformation)
            .removeOperator("uid1")
            .write(newSavePointPath);
    The intent is to perform a "transformation" on KeyedState and replace older operate state with modified one. • CoreProcessStateReaderFunction : KeyedStateReaderFunction to read existing savepoint • StateBootstrapper : KeyedStateBootstrapFunction for state modification/cleanup I end up with an empty new savepoint(it has only _metadata folder and nothing else). Can someone help me with what I might be doing wrong?
    d
    • 2
    • 1
  • h

    Huaqiang Kang

    03/21/2023, 7:59 PM
    Is it possible to set
    auth-no-challenge
    in Flink ?
  • a

    Amir Hossein Sharifzadeh

    03/21/2023, 9:15 PM
    I need help to understand the mechanism process function better and debug my code: Here is the implementation of acquiring DataStream and I will need to pass the results to a process function for further data analysis.
    static void processWorkflow(
            StreamTableEnvironment tableEnv,
            DataStream<DataFileChunk> rawDataStream,
            DataStream<DataFileChunk> bkgdDataStream,
            String jsonCalibrationData
            ) {
        tableEnv.createTemporaryView("EMPAD_RAW_TBL", rawDataStream);
        tableEnv.createTemporaryView("EMPAD_BKGD_TBL", bkgdDataStream);
    
        String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_BKGD_TBL.data as bkgd_enc_data " +
                "FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
    
        Table resultTable =
                tableEnv.sqlQuery(data_query);
    
        DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
    and this is the implementation of my ProcessFunction:
    package org.varimat.process;
    
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    
    public class EMPADProcessor extends ProcessFunction<Row, Row> {
    
        @Override
        public void processElement(Row row, ProcessFunction<Row, Row>.Context context,
                                   Collector<Row> collector) throws Exception {
    
            // process row here...
            System.out.println(row.getField(0));
            String rawObject = String.valueOf(row.getField(1));
            String bkgdObject = String.valueOf(row.getField(2));
        }
    }
    When I run my application, it never stops and prints duplicated data of
    row.getField(0)
    in the
    processElement
    method. Is there any suggestion? In other words, do I need to implement class
    EMPADProcessor
    different?
  • b

    Ben Thomson

    03/21/2023, 9:24 PM
    Does anyone know the correct format for submitting program arguments via the dashboard?:
    d
    • 2
    • 2
  • n

    Neha

    03/22/2023, 4:02 AM
    Hello, I need to understand which way is better to register a remote udf jar. setting the classloader in the tableEnvironment or using create function. Both of them will work. I want to choose one based on some rationale. If one of them is a good practice please let me know. URLClassLoader urlCl = URLClassLoader.newInstance( new URL[]{new URL(<path to my jar>)}, this.getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(urlCl); ClassLoader cl = Thread.currentThread().getContextClassLoader(); EnvironmentSettings envSetting = EnvironmentSettings.newInstance().withClassLoader(cl); StreamTableEnvironment.create(exeEnv, envSetting); vs Create function ... using jar which one should i use? any inclination towards one of these? Please explain with the reason which one would be better?
  • t

    Thijs van de Poll

    03/22/2023, 6:36 AM
    Hi all! I am trying to understand if usage of windows are relevant for me. My goal is to sync a postgres database (source) with Elasticsearch (sink) by making transformations on the data.I have the following conceptual problem: • source A: CDC • source B: CDC • source: C: CDC • table D :
    D = select * from A left join B on <http://a.pk|a.pk>=<http://b.fk|b.fk> left join C on <http://a.pk|a.pk>=<http://c.fk|c.fk>
    • table E:
    E = select <some aggregations> from D group by D.group_key
    • table F:
    F = select *, <some expensive transformations> from E
    • F gets inserted in Elasticsearch. So what I notice is that because of the left joins, and some events arriving later than others, table F get’s updated a couple of times for the same key. Which is logical I think. However, since the transformations made to create F are expensive, it recomputes that over and over again. For me it would be better if there was some sort of timed buffer on which events to E could arrive such that the expensive transformations do not need to be calculated on every event updating E. I am trying to understand if windows can help me out here, but I am unsure. Also, it is very important that the aggregations in E do contain all of the group elements. Thanks!
    ➕ 1
    m
    • 2
    • 4
  • c

    chenlu

    03/22/2023, 9:00 AM
    Hi,team. I got a question need to confirm:No matter what version of flink, its 8082 (historyserver) will not show the running jobs
  • t

    Tal Sheldon

    03/22/2023, 10:47 AM
    I currently have a (plain) Kafka consumer that consumes from a dynamic list of topics (every minute the list is being recalculated, usually it's the same list). Can I do the same with Flink? And how? (Flink to consume from a dynamic list of kafka topics). Every minute there's some method that gets all the topics by some logic (topics can be added, and removed in this logic, but usually it's the same).
    m
    • 2
    • 1
  • n

    Neha

    03/22/2023, 12:32 PM
    Hello, I need to understand which way is better to register a remote udf jar. setting the classloader in the tableEnvironment or using create function. Both of them will work. I want to choose one based on some rationale. If one of them is a good practice please let me know. URLClassLoader urlCl = URLClassLoader.newInstance( new URL[]{new URL(<path to my jar>)}, this.getClass().getClassLoader()); Thread.currentThread().setContextClassLoader(urlCl); ClassLoader cl = Thread.currentThread().getContextClassLoader(); EnvironmentSettings envSetting = EnvironmentSettings.newInstance().withClassLoader(cl); StreamTableEnvironment.create(exeEnv, envSetting); vs Create function ... using jar which one should i use? any inclination towards one of these? Please explain with the reason which one would be better?
  • j

    Jalil Alchy

    03/22/2023, 12:59 PM
    Hey Everyone, I have a problem that feels off, but maybe I am doing this wrong. I want to create a KafkaSink, so I have a class that has a method:
    public KafkaSink<OutboxRecord> getKafkaSink() {
            KafkaRecordSerializationSchema<OutboxRecord> serializer = KafkaRecordSerializationSchema.<OutboxRecord>builder()
                    .setTopicSelector(x -> x.topic)
                    .setValueSerializationSchema(new KafkaOutboxRecordSerializer())
                    .build();
    
            Properties p = new Properties() {
                {
                    put("<http://transaction.timeout.ms|transaction.timeout.ms>", (Integer) 60000);
                }
            };
    
            return KafkaSink.<OutboxRecord>builder()
                    .setBootstrapServers("localhost:9092")
                    .setKafkaProducerConfig(p)
                    .setRecordSerializer(serializer)
                    .build();
        }
    However this method causes the application to throw a not serializable error. If I make the method static, it gets better. Is there a better way to do this that I am missing?
  • a

    Amir Hossein Sharifzadeh

    03/22/2023, 5:17 PM
    Hello folks. I need help to create a join on two streams but am not sure what’s the best way to do that. I have two corresponding tables
    EMPAD_BKGD_TBL
    and
    EMPAD_BKGD_TBL
    where each table has equal rows (64). Both tables have chunk_i field with uniques values (1..64). I am trying to create join on both tables (stream) and I would expect that my joined_query will give me 64 rows but I see duplicated rows there.
    String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_RAW_TBL.n_total_chunks as n_total_chunks, " +
            "EMPAD_BKGD_TBL.data as bkgd_enc_data FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
    Table raw_table =
            tableEnv.sqlQuery(raw_query);
    DataStream<Row> raw_stream = tableEnv.toDataStream(raw_table);
    raw_table
    contains 128 rows but I expect to have 64 rows. I don’t know how to fix the issue here. Thanks you.
  • h

    Herat Acharya

    03/22/2023, 11:38 PM
    We are deploying in kubernetes natively using
    kubernetes-session.sh
    and specifying
    taskmanager.numberOfTaskSlots=8
    these denote task slots per task manager right?? So how does flink know how many task managers to create ? Btw our source is kafka and sink is a database... kafka will constantly have events
  • l

    Lee xu

    03/23/2023, 1:07 AM
    Hello, how does the memory of python udf function manage? How to configure? I am having this problem now. You can look at the uploaded log file. Can you tell from the log file why python server failed?
    error-log.txt
    d
    • 2
    • 3
  • c

    Chen-Che Huang

    03/23/2023, 2:02 AM
    Hello. I have a question about the restore mode of Flink. Flink 1.15 starts to provide three restore modes:
    CLAIM
    ,
    NO_CLAIM
    , and
    LEGACY
    . Assume that my Flink application restores from a savepoint
    SVP-1
    with restore mode
    CLAIM
    . As time goes by, my Flink application creates new savepoints
    SVP-2
    ,
    SVP-3
    , and so on. From the doc, the
    CLAIM
    mode may delete
    SVP-1
    when Flink thinks
    SVP-1
    is not needed for recovery anymore. How about
    SVP-2
    ,
    SVP-3
    and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.
    m
    • 2
    • 2
Powered by Linen
Title
c

Chen-Che Huang

03/23/2023, 2:02 AM
Hello. I have a question about the restore mode of Flink. Flink 1.15 starts to provide three restore modes:
CLAIM
,
NO_CLAIM
, and
LEGACY
. Assume that my Flink application restores from a savepoint
SVP-1
with restore mode
CLAIM
. As time goes by, my Flink application creates new savepoints
SVP-2
,
SVP-3
, and so on. From the doc, the
CLAIM
mode may delete
SVP-1
when Flink thinks
SVP-1
is not needed for recovery anymore. How about
SVP-2
,
SVP-3
and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.
m

Martijn Visser

03/23/2023, 4:51 AM
Savepoints are already self-contained, claim mode is for checkpoints
🙏 1
c

Chen-Che Huang

03/23/2023, 5:34 AM
Sorry. I’m not good at English. From the sentences below, I thought claim mode works for savepoint as well. I am curious about the difference between claim and no_claim modes. My understanding is that claim mode might delete the snapshot (either savepoint or externalized checkpoints) that is used to restore the Flink application but no_claim mode won’t. There is no difference between claim and no_claim modes for the savepoints and checkpoints created after restoring. If I misunderstand the two modes, please let me know. Thanks.
The
Restore Mode
determines who takes ownership of the files that make up savepoints or externalized checkpoints after they are restored. Snapshots, which are either checkpoints or savepoints in this context, can be owned either by a user or Flink itself. (doc)
View count: 1