https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • o

    Or Keren

    05/25/2023, 7:33 AM
    Anyone? 😊
  • s

    Sumit Nekar

    05/25/2023, 10:55 AM
    Hi , I am experimenting with flink operator s feature on restarting unhealthy job, with following configs.
    Copy code
    # Restart of unhealthy job deployments by flink-kubernetes-operator
    # <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.3/docs/custom-resource/job-management/#restart-of-unhealthy-job-deployments>
    kubernetes.operator.cluster.health-check.enabled: true
    kubernetes.operator.cluster.health-check.restarts.threshold: 2
    kubernetes.operator.cluster.health-check.restarts.window: 15 min
    kubernetes.operator.job.restart.failed: true
    As per the above configs, if the job has restarted more than 2 times, flink operator is redeploying the job with available state. That means in application mode, new JM pod is coming up after the running job has restarted 2 times within 15 min . Need some clarifications on this. 1. In case job is not able to recover at all, will flink operator continue restarting forever? Is there any threshold count after which flink operator gives up this activity? 2. How the restartegy-startegy configured at JM level is honoured by flink operator in this case? 3. flink operator referes ā€œflink_jobmanager_job_numRestartsā€ metric to decide if restartCount threshold is breached but everytime operator redeploys the job , new JM comes and flink_jobmanager_job_numRestarts value starts from 0 again and flink operator continues to redeploy after every 2 numRestarts . In this case, when the job will be marked as failed?
    g
    • 2
    • 3
  • o

    Oleksandr Nitavskyi

    05/25/2023, 12:15 PM
    šŸ‘‹ We have encountered a memory leak related to ClassLoaders in Apache Flink. ChildFirstClassLoader is not properly garbage collected, when job is being restarted. Heap Dump has shown that Log4j starts a configuration watch thread, which then has Strong reference to ChildFirstClassLoader via AccessControlContext. Since thread is never stopped, ChildFirstClassLoader is never cleaned. All stack traces we have got points to HadoopFileSystem, which is using
    PluginClassLoader
    , see details in 🧵 Removal
    monitorInterval
    introduced in FLINK-20510 helps to mitigate the issue. I wonder if it is a known issue and should Flink provide any workaround: 1. Disable
    monitorInterval
    by default in log configuration, so users who do not tune Flink doesn’t hit memory leak 2. Add some mechanism on unload, which properly stops such Log4jThread. cc @Nicolas Fraison
    m
    • 2
    • 4
  • s

    slowratatoskr

    05/25/2023, 3:21 PM
    https://apache-flink.slack.com/archives/C03G7LJTS2G/p1684500950364639?thread_ts=1684500950.364639&amp;cid=C03G7LJTS2G
  • c

    Carlos Santos

    05/25/2023, 5:43 PM
    Hi everyone, i'm looking for some tips with the Async IO retry strategy, the documentation isn't clear and there isn't many examples that I can find about it in the internet. I want to do some work async and retry 3 times at max, if at the 3th time it fails again just ignore the exception and move on without restarting the job. Does any one has an example on how to do this, is it even possible? It feels like the
    RichAsyncFunction
    is missing an onError method.
    m
    • 2
    • 5
  • j

    Jeremy Ber

    05/25/2023, 9:29 PM
    Hi there, trying to use the State Processor API with a toy problem-- I have a datastream app that will emit results if the keyed process function state is
    true
    , but i’ve purposely set it all to
    false
    . I run this app for some time to gather checkpoints, then i read that checkpoint into my StateProcessorAPI app. Within that app, I want to read the state and modify some elements to
    true
    , so when I restore the state, some keys will emit results. However, after modifying the state and reading this back in, there appears to be no elements in my written checkpoint. The examples of reading / modifying and writing state seem disconnected. Is there any end-to-end example of reading state, modifying, then writing it back out I can refer to? Or am I using this incorrectly?
    šŸ‘€ 1
  • a

    Amir Hossein Sharifzadeh

    05/26/2023, 3:38 AM
    What techniques are there to make the process faster (
    setParallelism
    ,
    EmbeddedRocksDBStateBackend,...
    )?
    k
    • 2
    • 1
  • s

    Sudhan Madhavan

    05/26/2023, 6:52 AM
    Hi šŸ‘‹, What's the default bucketing value used in state latency metrics? In doc, its type was mentioned as Histogram, but haven't mentioned about its bucketing value.
  • j

    Jan Kevin Dick

    05/26/2023, 9:25 AM
    Hello everyone, i have a short question regarding the Flink Connector JDBC we use this one together with an oracle database. Sometimes when a error happens during writing data into the database and flink retries to write the data and also failing during the retries, the sessions seems not to be closed from Flink. We took a look into the code of the JDBC Connector and i have a short question: In the above code taken from here: Github.com/flink-connector-jdbc In the
    close
    Method the sessions are not closed if an exception happens during the call to
    flush()
    is this correct? Or should there be the
    connectionProvider.closeConnection()
    call be put into an
    finally
    block?
    Unbenannt.java
    m
    j
    • 3
    • 6
  • u

    ē”°ę˜Žåˆš

    05/26/2023, 10:37 AM
    if key of keyby operator is unbound ,the memory usage will grow for ever ?
    plusone 2
    d
    • 2
    • 5
  • s

    Slackbot

    05/26/2023, 10:53 AM
    This message was deleted.
    d
    m
    k
    • 4
    • 4
  • u

    ē”°ę˜Žåˆš

    05/26/2023, 11:21 AM
    attach jobmanger processs id but exception as follows: hsdb> attach 1178948 caused by: sun.jvm.hotspot.debugger.DebuggerException: cannot open binary file
  • u

    ē”°ę˜Žåˆš

    05/26/2023, 11:36 AM
    hsdb> jhisto taskmanage process is slow ,how can I solve it? Thanks
  • o

    Oscar Perez

    05/26/2023, 1:29 PM
    hei! we are testing the upsert-kafka connector in order to create temporary tables but we have one question. When defining the columns how can we map the column names to certain fields. We are reading from a kafka topic containing protobuf and we would like , for instance, a certain column named "userId" to be mapped with a value from the protobuf called "value.event.customer.userId" How can we do this mapping? We could not see any example of this. Thanks!
  • o

    Oscar Perez

    05/26/2023, 1:58 PM
    Also, we were wondering if there is any other way of creating a table using upsert-kafka other than using CREATE table statement and reading using the connector upsert-kafka. For normal kafka topics, we can create the stream from the kafka with env.fromSource and then do tableEnv.fromDataStream(). Is there an equivalent of creating an instance of table without having to use the connector and via API ? Thanks!
    m
    • 2
    • 5
  • p

    Pritam Agarwala

    05/26/2023, 4:59 PM
    Hi Team, If Flink checkpointing tolerable limit is exceeded , will it consider restart strategy or will override it and simply failed the job ??
  • v

    Vitor Leal

    05/26/2023, 5:16 PM
    I'm trying to create a docker-compose service,
    flink-sqlclient
    , which sends SQL queries to Apache Flink (init script):
    Copy code
    flink-jobmanager:
        build:
          dockerfile: ./apache-flink/Dockerfile
        ports:
          - '8081:8081'
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: flink-jobmanager
      flink-taskmanager:
        build:
          dockerfile: ./apache-flink/Dockerfile
        depends_on:
          - flink-jobmanager
        command: taskmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: flink-jobmanager
            taskmanager.numberOfTaskSlots: 2
      flink-sqlclient:
        build:
          dockerfile: ./apache-flink/Dockerfile
        command: bin/sql-client.sh -f init.sql
        depends_on:
          - flink-taskmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: flink-jobmanager
            rest.address: flink-jobmanager
    it says "Execute statement succeed" for each statement in
    init.sql
    but nothing is actually created? Also if I change
    Copy code
    jobmanager.rpc.address: flink-jobmanager
            rest.address: flink-jobmanager
    to gibberish:
    Copy code
    jobmanager.rpc.address: adsasd
            rest.address: asdasdf
    it still works somehow
  • u

    ē”°ę˜Žåˆš

    05/27/2023, 1:09 AM
    I dumped a taskmanager heap,why I saw 27,671 org.rocksdb.Checkpoint objecks ? Thanks.
  • u

    ē”°ę˜Žåˆš

    05/27/2023, 4:21 AM
    Many my flink entiry objects was referced by FLINK StreamRecords ,but the StreamRecord objects are not referced by any object , I see this in VisualVM. My objects were 30 days ago ,should be garbage collected that time . Why I still can see them now ? Thanks!
  • o

    Or Keren

    05/27/2023, 7:45 AM
    Still need help with that one šŸ™šŸ¼
    • 1
    • 1
  • m

    Mohan M

    05/29/2023, 4:42 AM
    1) There are 3 tables – master table (employee) and two child tables (employeeDept, employeestatus) 2) Data is stored in Master Kafka Topics (here kafka topics are used as a DB more than streaming, data of all employees stored in these topics), so we have 3 master kafka topics. 3) There are 3 Change Data Capture(CDC) kafka topics. 4) There are 1 Kafka sink topic Requirement: when there change in data, CDC kafka topic(point3) will trigger message to flink, and flink needs to send consolidate information of all data for particular row to kafka sink topic(point4) Employee table : Name, ID, Address, DeptId,statusID columns employeeDept : DeptId,DeptName columns employeestatus: StatusID,StatusName columns John(Employee Table) had an address change, employee CDC kafka topic triggers(point3), flink listen to CDC kafka topic, and flink program should be able to get the data from all three tables employee, employeeDept, employeestatus that belongs to John and send consolidated data of john(Name, ID, Address, DeptId,statusID, DeptId,DeptName, StatusID,StatusName) to the corresponding kafka sink topics(point5) with the updated address What is the best approach for this use case ?
  • s

    Suparn Lele

    05/29/2023, 6:51 AM
    Hello everyone, I am trying to run unit tests using apache flink and scalatest module. I am declaring a StreamExecutionEnvironment and I am using that reference in all of my UTs. I have written 16 UTs. What happens is first 8 out of them pass while the next 8 fails. Tests which were failing earlier are passing if I shift them to 1-8 position and those passing start to fail if I keep them at 9-16 position. So in short there is no logic issue here. I am getting following error -
    Copy code
    java.lang.RuntimeException: Failed to fetch next result
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
      at scala.collection.TraversableOnce$<http://class.to|class.to>(TraversableOnce.scala:310)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$<http://1.to|1.to>(CloseableIterator.scala:35)
      ...
      Cause: java.io.IOException: Failed to fetch job execution result
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
      at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
      ...
      Cause: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
      at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
      at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
      ...
      Cause: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
      at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
      at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
      at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
      at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
      at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
      at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
      at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
      ...
      Cause: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
      at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
      at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
      at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
      at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
      at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
      at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      ...
      Cause: com.esotericsoftware.kryo.KryoException: Unable to find class: sun.reflect.GeneratedConstructorAccessor88
    Serialization trace:
    delegate (sun.reflect.DelegatingConstructorAccessorImpl)
    constructorAccessor (java.lang.reflect.Constructor)
    _constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
    _fromLongCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
    _valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
    _rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
    objectMapper (com.jayway.jsonpath.spi.mapper.JacksonMappingProvider)
    mappingProvider (com.jayway.jsonpath.Configuration)
    configuration (com.jayway.jsonpath.internal.JsonContext)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
      at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
      at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
      at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
      at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
      at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
      ...
      Cause: java.lang.ClassNotFoundException: sun.reflect.GeneratedConstructorAccessor88
      at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
      at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
      at java.lang.Class.forName0(Native Method)
      at java.lang.Class.forName(Class.java:348)
      at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    I am using object to initialize StreamExecutionEnvironment. The code goes like following
    Copy code
    def getStreamExecutionEnvironment: StreamExecutionEnvironment = {
      val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
      val unmodifiableCollection = Class.forName("java.util.Collections$UnmodifiableCollection")
      streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])
    
      streamExecutionEnvironment
    }
    I suspect this to be a memory issue. Could someone please help??
  • k

    Keyur Makwana

    05/29/2023, 8:55 AM
    Hey I want to consume data stream from RedPanda, What can be best way to consume the data? (FYI: I have tried by socket but I am unable to get particular topic data)
    m
    • 2
    • 2
  • s

    Sumit Lakra

    05/29/2023, 8:59 AM
    Hi team. I wish to use fine-grained resource management in our flink cluster (not running on Kubernetes or YARN), with 2 taskmanagers. Until now, I had set the number of slots per task manager to 2 via ā€œtaskmanager.numberOfTaskSlots: 2ā€ in the flink’s conf. What I am unable to figure from the documentation is that if we choose to enable fine-grained resource management via ā€œcluster.fine-grained-resource-management.enabled: trueā€ option, then what happens to the statically defined number of slots ? Flink seems to start up with both these configurations set. So does the statically defined number of slots need not be removed/ignored. Even when it is commented out, it seems to take it’s default value of 1 and TM starts with 1 slot. This is confusing me because as per https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/finegrained_resource/#resource-allocation-strategy , it says that ā€œThe TaskManager will be launched with total resources but no predefined slotsā€ but how is this achieved ?
    d
    • 2
    • 2
  • o

    Oscar Perez

    05/29/2023, 9:52 AM
    hei team! we are facing an issue when using flink sql client and using kafka connector. this is my statement: CREATE TEMPORARY TABLE IF NOT EXISTS payments ( userId STRING, paymentId STRING, paymentStatus STRING, direction STRING, amount BIGINT, eventTime TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'payments', 'format.type' = 'json', 'connector.properties.bootstrap.servers' = 'kafka-rt:9093', 'connector.properties.group.id' = 'ethanol-test' ); and I made sure I have the flink-sql-connector-kafka in the sql client docker. However when running the command I face the following issue: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory What could be the issue when the connector kafka lib is in the classpath? Regards, Oscar
  • a

    Ari Huttunen

    05/29/2023, 12:32 PM
    I have trouble understanding where to put different Flink configurations. Examples for this are
    s3.connection-timeout
    , the hashmap state backend configuration,
    execution.checkpointing.interval
    . There are these places I could put them in my case • The flink-conf.yaml file • StreamExecutionEnvironment • StreamTableEnvironment • The filesystem SQL connector settings • (The Kafka connector settings) Usually when the manual explains a setting, it doesn't say where the setting should be placed. It could also have different syntax for different places. Any chance of getting the manual more clear on this?
  • o

    Oscar Perez

    05/29/2023, 5:01 PM
    Hei, we are trying to use flink-protobuf support with the kafka connector. When trying to define it in the code like this:
    Copy code
    CREATE TEMPORARY TABLE users_tnc
    (
        userId STRING,
        country STRING,
        acceptedTime TIMESTAMP(3),
        eventMetadata row(eventId STRING, eventTime TIMESTAMP(3)),
        PRIMARY KEY (userId) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'usertnc.v1beta1',
      'properties.bootstrap.servers' = 'localhost:9094',
      'properties.group.id' = 'ethanol-bmp',
      'key.format' = 'raw',
      'value.format' = 'raw',
      'format' = 'protobuf',
      'protobuf.message-class-name' = 'com.test.UserTnCChangedEvent',
      'protobuf.ignore-parse-errors' = 'true'
    );
    we are facing with the following exception: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'protobuf' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. We are using flink 1.16 and made sure that the flink protobuf library is in the classpath (packaged as part of the uber jar) thanks!
    s
    h
    • 3
    • 9
  • y

    Yaroslav Bezruchenko

    05/29/2023, 8:47 PM
    Hey, I have a Flink job on Java 11. After updating Flink from 1.15.2 to 1.16.2 I recieve tons of these:
    Copy code
    will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema
    On 1.15 it was totally fine and used POJO. Is there something I'm missing here? Why Flink isn't using POJO anymore?
    m
    • 2
    • 18
  • o

    Oscar Perez

    05/30/2023, 8:08 AM
    Hei, I am trying to create a table from connector kafka like this:
    Copy code
    CREATE TEMPORARY TABLE IF NOT EXISTS devices
    (
        deviceId STRING PRIMARY KEY NOT ENFORCED,
        userId STRING,
        status STRING,
        pairedOn TIMESTAMP,
        eventTime TIMESTAMP
     )WITH (
          'connector' = 'kafka',
          'topic' = 'devices',
          'format' = 'json',
          'properties.bootstrap.servers' = 'kafka-rt:9093',
          'properties.group.id' = 'ethanol-test',
          'scan.startup.mode' = 'earliest-offset'
          );
    but when running the select query I get the following error: org.apache.flink.table.api.ValidationException: The Kafka table 'default_catalog.default_database.devices' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
    m
    • 2
    • 14
  • e

    Eli Golin

    05/30/2023, 8:23 AM
    I have defined the following sink:
    Copy code
    object ParquetSink {
    
     def parquetFileSink[A <: Message: ClassTag](
       assigner: A => String,
       config: Config
     )(implicit lc: LoggingConfigs): FileSink[A] = {
      val bucketAssigner = new BucketAssigner[A, String] {
       override def getBucketId(element: A, context: BucketAssigner.Context): String = {
        val path = assigner(element)
        <http://logger.info|logger.info>(LogMessage(-1, s"Writing file to ${config.getString(baseDirKey)}/$path", "NA"))
        path
       }
    
       override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
      }
    
      def builder(outFile: OutputFile): ParquetWriter[A] =
       new ParquetProtoWriters.ParquetProtoWriterBuilder(
        outFile,
        implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
       ).withCompressionCodec(config.getCompression(compressionKey)).build()
    
      val parquetBuilder: ParquetBuilder[A] = path => builder(path)
      FileSink
       .forBulkFormat(
        new Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.<http://blob.core.windows.net|blob.core.windows.net>"),
        new ParquetWriterFactory[A](parquetBuilder)
       )
       .withBucketAssigner(bucketAssigner)
       .withOutputFileConfig(
        OutputFileConfig
         .builder()
         .withPartSuffix(".parquet")
         .build()
       )
       .build()
     }
    }
    After deploying the job I get the following exception: Caused by: java.lang.UnsupportedOperationException: Recoverable writers on AzureBlob are only supported for ABFS at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44) ~[?:?] at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] The question is whether I can make any changes to this code/configs to make it work with wasbs protocol and not abfs ? Tnx everyone.
    m
    • 2
    • 9
1...838485...98Latest