https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Justin

    09/14/2022, 7:49 PM
    Hi. I am trying to register a Postgres table in Flink SQL but I am encountering the following error: org.apache.flink.table.api.TableException: Could not execute CreateTable in path
    postgres
    .
    wilbur
    .
    nfdc.countries
    Full stack trace is attached. DDL: CREATE TABLE postgres.wilbur.
    nfdc.countries
    ( country_code STRING, country_name STRING, numeric_country_code INT, batch_date TIMESTAMP(6) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://localhost:5432/wilbur', 'table-name' = 'postgres.wilbur.`nfdc.countries`' ); I created a catalog and I am able to query the table without any issues (e.g., SELECT * FROM postgres.wilbur.
    nfdc.countries
    LIMIT 3;) – however, it fails each time I try to register the table. Any help would be appreciated! Thanks, Justin
    d
    t
    m
    • 4
    • 8
  • s

    Satya

    09/15/2022, 6:46 AM
    Hi All, I am using
    FileSource.forRecordStreamFormat
    for reading
    Avro
    files in S3 with a build config of
    .monitorContinuously(Duration.ofMillis(1000))
    . To create
    StreamFormat
    I have used
    Avro4s
    as below
    Copy code
    object AvroStreamFormat extends SimpleStreamFormat[Send] {
        override def createReader(config: Configuration, stream: FSDataInputStream): StreamFormat.Reader[Send] = {
          val schema = AvroSchema[Send]
          val reader: AvroInputStream[Send] = AvroInputStream.data[Send].from(stream).build(schema)
          new StreamFormat.Reader[Send] {
            override def read(): Send =
              reader.iterator.next()
    
            override def close(): Unit = reader.close()
          }
        }
    
        override def getProducedType: TypeInformation[Send] = TypeInformation.of(classOf[Send])
      }
    With above I created a source as below:
    Copy code
    val source = FileSource.forRecordStreamFormat(
          AvroStreamFormat,
          new Path(s3Path))
          .monitorContinuously(Duration.ofMillis(1000))
          .build()
    And then datastream like below:
    Copy code
    val recordStream: DataStream[Send] = env.fromSource(
          source,
          WatermarkStrategy.noWatermarks(),
          "email-send"
        )
    When I run the app I am getting
    java.util.NoSuchElementException
    Full stacktrace in comment. Question I have: 1. FileSource api on S3 can recursively find avro file and any new avro file created on S3 or do I have to give the S3 uri where the avro lives? 2. Is
    avro4s
    is not compatible with
    FileSource
    api ?
    m
    • 2
    • 7
  • k

    Kwangin Jung

    09/15/2022, 9:51 AM
    Hello, I'm trying to sink out gzip file to s3, referring this https://apache-flink.slack.com/archives/C03G7LJTS2G/p1658243067031639
    Copy code
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    // ...
    
    ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    
    StreamingFileSink<String> streamingFileSink = StreamingFileSink
                    .forRowFormat(
                            new Path("<s3://output-bucket/>"),
                            new Encoder<String>() {
                                @Override
                                public void encode(String record, OutputStream stream)
                                        throws IOException {
    
                                    GzipParameters params = new GzipParameters();
                                    params.setCompressionLevel(Deflater.BEST_COMPRESSION);
                                    GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
                                    OBJECT_MAPPER.writeValue(out, record.getBytes("UTF-8"));
                                    out.finish();
                                }
    
                            }
                    )
                    .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd_HH-mm"))
                    .build();
    but this just shows following error
    Copy code
    "locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1079)",
        "logger": "org.apache.flink.runtime.taskmanager.Task",
        "message": "Sink: Trip process sink (1/1)#0 (022614337997d7b7702590819c0d931a) switched from RUNNING to FAILED with failure cause: java.io.IOException: Stream closed.\n\tat org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:73)\n\tat org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream.write(RefCountedFileWithStream.java:53)\n\tat org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)\n\tat org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:200)\n\tat org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:168)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:62)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:262)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:227)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)\n\tat org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)\n\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n",
    does someone familiar with this kind of issue? 🙏
  • h

    Hari Krishna Poturi

    09/15/2022, 11:09 AM
    Hello is flink support MDC logging ?
    ✅ 2
    c
    • 2
    • 4
  • m

    Matyas Orhidi

    09/15/2022, 12:03 PM
    Hi folks, is there a way to tell if a savepoint is a NATIVE or a CANONICAL apart from how it was triggered?
    c
    • 2
    • 4
  • s

    Satya

    09/15/2022, 1:12 PM
    Hello Team, We having a weird issue here with FileSource API. The api is working fine when we have two date partition in a directory but it just throws timeout exception when there are more than 2 date partition. For example: with given S3 URI:
    Copy code
    <s3://export-staging/xport/dataexport.stg-07.S3.integration.asdh43kwdhasjrw23423/event_type=users.messages.email.Send/>
    let’s we have 2 date partition as below:
    Copy code
    date=2022-09-12-10/
    date=2022-09-12-11/
    With this 2 sets of partition flink filesource API reads avro file fine but when it gets a another partition (example below)
    Copy code
    date=2022-09-12-10/
    date=2022-09-12-11/
    date=2022-09-12-12/
    It throws a below error:
    Copy code
    Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'data-export'.
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
    	at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
    	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
    	at com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:74)
    	at com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31)
    	at scala.Function0.apply$mcV$sp(Function0.scala:39)
    	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    	at scala.App.$anonfun$main$1$adapted(App.scala:80)
    	at scala.collection.immutable.List.foreach(List.scala:392)
    	at scala.App.main(App.scala:80)
    	at scala.App.main$(App.scala:78)
    	at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31)
    	at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala)
    Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
    	at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
    	at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83)
    	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
    	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    	at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
    Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    	at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89)
    	at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
    	... 9 more
    Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
    	at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source)
    	at org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704)
    	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
    	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
    	at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751)
    	at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703)
    	at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86)
    	... 10 more
    Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<akka://flink/user/rpc/dispatcher_2#248541682]]> after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
    	at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
    	at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
    	at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
    	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874)
    	at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
    	at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
    	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872)
    	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
    	at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
    	at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
    	at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
    	at java.lang.Thread.run(Thread.java:748)
    Is it a bug with Flink?
    s
    • 2
    • 3
  • j

    Jirawech Siwawut

    09/15/2022, 4:30 PM
    Hi. I wonder how could Flink assign watermark to complex data format like this
    Copy code
    {
        "name" : "david",
        "events" : [
            {
                "timestamp" : "2022-08-01 00:00:00",
                "id" : "1"
            },
            {
                "timestamp" : "2022-08-01 00:00:01",
                "id" : "2"
            }
        ]
    }
    or
    {
        "name" : "david",
        "events" : {
            "timestamp: "2022-08-01 00:00:00"
        }
        
    }
    I would like to create watermark on column timestamp
    d
    • 2
    • 4
  • d

    Darin Amos

    09/15/2022, 4:41 PM
    Hi Team! (Moving this question from #C03JKTFFX0S) I am wondering if there are any suggestions when using Global windows with an unbounded keyset. Our window should close when it see’s all the elements it expects to see (based on a count) but errors do happen. How can we ensure an application error can’t cause runaway state (and window pane) growth? I had thoughts of setting a processing time timer of an appropriate length (say 6-24h) and deleting the timer when we trigger.
    s
    • 2
    • 2
  • r

    Rishabh Kedia

    09/15/2022, 5:39 PM
    Hello, We are using
    RocksDB
    as backend for checkpointing and storing the
    checkpoint
    in
    GCS
    . After a few mins of a job running, the first and all the checkpoint fails with the error:
    Copy code
    asynchronous part of checkpoint 1 could not be completed.
    java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
    	at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
    	at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    	at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128) [flink-dist_2.12-1.13.6.jar:1.13.6]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: java.lang.OutOfMemoryError: Java heap space
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:579) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:380) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:308) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:528) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565) ~[gcs-connector-latest-hadoop2.jar:?]
    	at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85) ~[gcs-connector-latest-hadoop2.jar:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    	... 3 more
    Size of first checkpoint - 150mb Size of second checkpoint - 350 mb … The task managers have enough memory 4G assigned. As you can see from the image, no memory limits are being reached. Any idea why are we still seeing OOM errors during checkpointing?
    j
    • 2
    • 1
  • j

    Jeesmon Jacob

    09/15/2022, 5:54 PM
    Hi team, snakeyaml v1.30 included in flink operator is reporting CVE-2022-25857 with High CVSS score. It is pulled in as transitive dependency to operator.
    Copy code
    [INFO] +- io.fabric8:kubernetes-client:jar:5.12.2:compile
    [INFO] |  +- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.13.2:compile
    [INFO] |  |  \- org.yaml:snakeyaml:jar:1.30:compile
    Anyone looking at fixing it in v1.1 or in upcoming v1.2? Thanks.
    • 1
    • 2
  • a

    Aeden Jameson

    09/15/2022, 6:26 PM
    I'm running Flink on EKS and I'm wondering what the metric Status.JVM.CPU.Load actually reflects when there's multiple cpu's per container. I'm observing that Status.JVM.CPU.Load is a lot higher than say the EKS container metrics. For example when compared to something like this
    Copy code
    100 * sum by (pod) (rate(container_cpu_usage_seconds_total{pod=~".*$flinkdeployment.*", container="taskmanager"}[$__rate_interval])/ on (pod)
    kube_pod_container_resource_requests{pod=~".*$flinkdeployment.*", resource="cpu"})
  • m

    Michael LeGore

    09/15/2022, 9:14 PM
    I have a question about HybridSource/Batch/Streaming, are there plans to allow HybridSource to run in BATCH mode for one source, then start in Streaming mode for the final, unbounded source? The “Looking Into the Future” section of this blog post indicates thats something that might be possible in the future
    k
    • 2
    • 1
  • m

    Michael LeGore

    09/16/2022, 12:25 AM
    I am also seeing an error when dealing with a HybridSource that uses a protobuf type - I’m getting a serialization error, even though the proto type is registered with Kryo.
    c
    • 2
    • 4
  • h

    Huo

    09/16/2022, 12:42 AM
    Hi all, running into a weird exception with Flink 1.12, App continue to fail and restart, the App is nothing but reading events from kafka topic and convert it to some case class object, error message doesn’t show any info related with our code. The App has been up running for 1 year and it’s the 1st time we see this Exception, we have verified there’s no schema change in source topics. Can anyone help look into it? Thanks!
    Copy code
    Source: KafkaTableSource(event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime) -> SourceConversion(table=[default_catalog.default_database.ux_channel_events_progress, source: [KafkaTableSource(event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime)]], fields=[event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime]) -> Calc(select=[account_id, device_id, roku_content_id, channel_id, profile_type, provider_id, 30 AS streaming_seconds, event_ts], where=[((playback_mode SEARCH Sarg[_UTF-16LE'auto', _UTF-16LE'user']:CHAR(4) CHARACTER SET "UTF-16LE") AND (channel_id SEARCH Sarg[_UTF-16LE'151908':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (account_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (roku_content_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (device_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (profile_type SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (provider_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (() <= 1.0:DECIMAL(2, 1)) AND account_id IS NOT NULL)]) -> SinkConversionToStreamEvent (24/128)
    2022-09-14 22:36:47
    java.lang.IllegalArgumentException
    	at sun.reflect.GeneratedConstructorAccessor39.newInstance(Unknown Source)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvokeraw(JavaMirrors.scala:336)
    	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:339)
    	at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:355)
    	at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$$anonfun$lookupConstructor$2.apply(ScalaCaseClassSerializer.scala:114)
    	at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$$anonfun$lookupConstructor$2.apply(ScalaCaseClassSerializer.scala:113)
    	at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:49)
    	at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:39)
    	at org.apache.flink.table.data.util.DataFormatConverters$CaseClassConverter.toExternalImpl(DataFormatConverters.java:1498)
    	at org.apache.flink.table.data.util.DataFormatConverters$CaseClassConverter.toExternalImpl(DataFormatConverters.java:1470)
    	at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:417)
    	at SinkConversion$63.processElement(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    	at StreamExecCalc$61.processElement(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    	at SourceConversion$11.processElement(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
    	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
    	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
    	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
    • 1
    • 2
  • k

    Kwangin Jung

    09/16/2022, 2:00 AM
    Hello, When I use
    StreamingFileSink
    ,
    Copy code
    OutputFileConfig outputConfig = OutputFileConfig
                    .builder()
                    .withPartPrefix("myfile")
                    .withPartSuffix(".txt")
                    .build();
    
    StreamingFileSink fileSinker = StreamingFileSink
                    .forBulkFormat(
                            new Path(bucket),
                            new SimpleStringEncoder(),
                     )
                    .withOutputFileConfig(outputConfig)
                    .build();
    
    dataStream
                    .map(DataObject::toString)
                    .addSink(fileSinker);
    Is there a way to define output file name, by using data inside
    DataObject
    ?
  • k

    Kwangin Jung

    09/16/2022, 3:50 AM
    Sorry one more... This is not troubleshooting, but does someone knows about difference between
    FileSink
    and
    StreamingFileSink
    , and history of these two? Looks like
    FileSink
    is more improved one (also seems support both batch and streaming), but just want more detail 🙏
    m
    d
    • 3
    • 2
  • s

    Sumit Nekar

    09/16/2022, 7:27 AM
    Hello, Does Flink operator’s FlinkSessionJob support podTemplate?
    g
    • 2
    • 11
  • d

    Don Li

    09/16/2022, 2:37 PM
    Hello, does anyone have recommendations on how to scale a Flink job in AWS EMR? I have a job manager and 4 task managers as m5.xlarge instances, but it's running a little slow. I'm looking to use a m5.2xlarge but unsure what parameters to adjust while doing so. I know I'll need to adjust how many slots/task manager memory but unsure exactly what values.
    r
    • 2
    • 1
  • j

    Jirawech Siwawut

    09/16/2022, 3:36 PM
    Hello. Have anyone here tried Filesystem Sink with auto compaction on Viewfs ( HDFS )? I found that I can write uncompacted files fine but the job seems to fail at compaction stage. Here is the error. Note that if i disable compaction everything works fine. viewfs path :
    <viewfs://path/mytable>
    Copy code
    2022-09-16 22:34:29
    java.io.IOException: ViewFs: Cannot initialize: Empty Mount table in config for <viewfs://path/>
    	at org.apache.hadoop.fs.viewfs.InodeTree.<init>(InodeTree.java:337)
    	at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.<init>(ViewFileSystem.java:169)
    	at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:169)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
  • j

    James Timotiwu

    09/16/2022, 4:53 PM
    Hey folks, I use the KafkaSource connector to consume messages from Kafka. My job consumes from a large list of topics, and recently I have removed some from that list. However, all messages from that topic is somehow still clearly being processed by the job. Anybody have any clues as to what might be happening?
    p
    • 2
    • 1
  • e

    Erwin Cabral

    09/16/2022, 6:47 PM
    I am not sure if I missed something in the Flink metrics documentation or just need a different approach. I am using the Flink Kubernetes Operator in my Flink deployment. I am trying to display Flink Job and Taskmanager metrics in Grafana using Prometheus as data source. I have configured prometheus to scrape at port 9249. However, I have 2 job managers and 3 taskmanagers. Since there is a dynamic nature to these pods (in case of restarts or failure or even increasing the taskManagers beyond 3), does it mean I have to manually assign ports to each and configure it to Prometheus scrape config? Thanks in advance.
    c
    s
    • 3
    • 4
  • i

    Isaac Pohl-Zaretsky

    09/16/2022, 7:13 PM
    👋 Howdy! How many flink jobs is too many? We're dealing with a use-case where job definitions are frequently changing, and we're thinking about how we can scale it to many (more targeted) jobs. Would it present problems if we have >1,000 separate jobs (many of which are typically idle)? What constraints exist for flink jobs?
    d
    k
    d
    • 4
    • 3
  • r

    Rashmin Patel

    09/17/2022, 4:03 AM
    Hey folks Is there any way to configure taskmanager memory from within application code ?
    p
    • 2
    • 3
  • j

    Jirawech Siwawut

    09/18/2022, 5:31 PM
    Hello. Have anyone here tried Filesystem Sink with auto compaction on Viewfs ( HDFS )? I found that I can write uncompacted files fine but the job seems to fail at compaction stage. Here is the error. Note that if i disable compaction everything works fine. viewfs path :
    <viewfs://path/mytable>
    Copy code
    2022-09-16 22:34:29
    java.io.IOException: ViewFs: Cannot initialize: Empty Mount table in config for <viewfs://path/>
    	at org.apache.hadoop.fs.viewfs.InodeTree.<init>(InodeTree.java:337)
    	at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.<init>(ViewFileSystem.java:169)
    	at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:169)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
    m
    • 2
    • 4
  • s

    Slackbot

    09/19/2022, 10:47 AM
    This message was deleted.
    m
    c
    • 3
    • 7
  • a

    Adesh Dsilva

    09/19/2022, 1:31 PM
    Hi I was wondering about difference in performances when setting parallelism. Consider an example where you are getting an input at 10 events per second and we have these operators: A -> B -> C -> D Important: These can be chained so they can process in a single task Now, assume A, B and D can process at 10 events per second and C can process 5 events per second. To get 10 per second for the entire stream I can either do A (1) -> B (1) -> C (2) -> D (1) or just set system level parallelism of 2. Since input is still at 10 and anyway all of them are chained so both approaches will still use 2 tasks (similar resources) So, is there any benefit in giving individual parallelism and a follow up question is will reactive mode in flink actually do this automatically for me?
    👍 1
  • m

    Martijn Visser

    09/19/2022, 2:08 PM
    set the channel topic: Asking for help on using Apache Flink! (Do not mention specific people.)
  • h

    Henrik Feldt

    09/19/2022, 2:43 PM
    I was wondering how you should link separate Flink jobs together ("best practises")? Do you keep multiple Flink Application deployments active and synchronise via Kafka queues? Or is everything in your Flink jobs effectively side-chains and a huge workflow? Or do you use some other method that allows you to keep EventTime and data and intermediate schemas correctly in sync between different Flink workflows (DAG:s)?
  • s

    Sergey Postument

    09/19/2022, 3:20 PM
    Is there users who running Kubernetes Native with logback logger ? I want to use logback and configure log level via env variable.
    h
    • 2
    • 3
  • s

    Sylvia Lin

    09/19/2022, 6:09 PM
    Hi forks, is there a way we can enable the detailed checkpoint log? We only see below log from Flink job, but not sure what's the root cause for checkpoint failure.
    Copy code
    2022-09-18 01:30:24,154 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14733 (type=CHECKPOINT) @ 1663464623841 for job 18447894cc13c2736d1753cedc9a7bb4.
    2022-09-18 01:30:24,538 INFO  com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream  [] - close closed:false <s3://instacart-emr/flink-streaming/event-router/checkpoints/18447894cc13c2736d1753cedc9a7bb4/chk-14733/_metadata>
    2022-09-18 01:30:24,688 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 14733 for job 18447894cc13c2736d1753cedc9a7bb4 (424894 bytes in 781 ms).
    2022-09-18 01:31:24,108 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14734 (type=CHECKPOINT) @ 1663464683841 for job 18447894cc13c2736d1753cedc9a7bb4.
    2022-09-18 01:34:24,108 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 14734 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
    2022-09-18 01:34:24,489 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14735 (type=CHECKPOINT) @ 1663464864113 for job 18447894cc13c2736d1753cedc9a7bb4.
    2022-09-18 01:37:24,489 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 14735 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
    2022-09-18 01:37:24,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14736 (type=CHECKPOINT) @ 1663465044489 for job 18447894cc13c2736d1753cedc9a7bb4.
    2022-09-18 01:40:24,815 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 14736 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
    2022-09-18 01:40:25,089 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14737 (type=CHECKPOINT) @ 1663465224815 for job 18447894cc13c2736d1753cedc9a7bb4.
    2022-09-18 01:43:25,089 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 14737 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
    2022-09-18 01:43:25,380 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14738 (type=CHECKPOINT) @ 1663465405089 for job 18447894cc13c2736d1753cedc9a7bb4.
    s
    h
    • 3
    • 6
1...181920...98Latest