https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • y

    Youssef Gamal

    06/29/2022, 7:23 PM
    Can we use DataStream connectors (other than the default available ones in
    pyflink.datastream.connectors
    ) in the Python API? For example, can we sink data to Cassandra through the python API? I am struggle to figure out how to even import
    CassandraSink
    m
    • 2
    • 7
  • j

    jason Wang

    06/30/2022, 4:02 AM
    Copy code
    hello everyone,i wanna  ask  a funny question,  is there anyone use flink to do some batch process?can u share some experience  or  has anyone done something like analysized  the difference between spark and flink  refer to batch process capabilites、resource use、optimization or other multi-dimensional angles.
    1)develop  with table api/flink sql  spark sql
    2)develop with dataset  or datastream api
    any advice and shared experience  will be appreciated.
    👀 1
  • v

    Veeramani Moorthy

    06/30/2022, 6:58 AM
    Hi,
    m
    • 2
    • 2
  • v

    Veeramani Moorthy

    06/30/2022, 7:00 AM
    I would like to write Flink UDF in java which takes entire ROW as an input & does some filtering based on drools & then returns boolean value. Is it possible to do so? any example? I can figure out on the drools part. Need help only in how to accept entire row as an input in my UDF function.
    d
    a
    • 3
    • 2
  • m

    Musy Tom

    06/30/2022, 7:43 AM
    I would like to stop a job with savepoint in flink webui, I can't do it...
    m
    • 2
    • 1
  • d

    Dylan Wylie

    06/30/2022, 10:50 AM
    Hey folks, In pyflink is it possible write to both a table api sink and a datastream sink within a single job? trying to find a way to intermingle statement sets and datastream sinks. I see 1.14 added
    attachAsDataStream
    to statement sets which I think would help, but it doesn't look like it is available in the python api and we're also stuck on an older version (1.13) with aws kda. context is wanting to simultaneously: • write to s3 in a bulk-encoded format. ◦ datastream connector: only row-encoded formats look to be supported in pyflink's datastream file sink ❌ ◦ table api connector: supports this just fine ✅ • write to JDBC using custom upserts to ensure exactly once ◦ datastream connector: allows this ✅ ◦ table api connector: builds the upsert itself so isn't able to add the extra conditions needed to make this exactly-once ❌
    d
    • 2
    • 2
  • s

    shishal singh

    06/30/2022, 2:30 PM
    Hi folks, In my flink job, I am trying to use elasticsearch7 connector. I could not find a way to communicate over SSL when using Elasticsearch7SinkBuilder Am I missing something? Also there is an deprecated class ElasticsearchSink.Builder where it has RestClientFactory which I can use to setup https connection but since this class is deprecated thus wondering if same is possible with Elasticsearch7SinkBuilder. Also since there is no connector for elasticsearch 8 yet, Is it possible to use any of the above client with elastic 8? Note: Earlier it used to be elastic connector compatibility matrix in Flink doc but I could not find it any more in latest version of doc.
    m
    h
    • 3
    • 16
  • d

    dario bonino

    06/30/2022, 3:17 PM
    Hi Everyone, we are consistently encountering this issue:
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Error while adding value to RocksDB
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:80)
        at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:96)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:413)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
        at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.rocksdb.RocksDBException: block checksum mismatch: stored = 225416896, computed = 263486412  in /tmp/flink-io-5f30e001-0b1f-443f-93d3-6a3cb1cd68d2/job_9d5c118e8b6643dfa7ff188e099b55e1_op_WindowOperator_1e4c6222d1927931766f7327355009c0__27_90__uuid_1228482d-13e5-4253-bfa2-c19c74c591db/db/000021.sst offset 1541261 size 34277
        at org.rocksdb.RocksDB.put(Native Method)
        at org.rocksdb.RocksDB.put(RocksDB.java:955)
        at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:78)
        ... 15 more
    in a pipeline running on Flink 1.14.3 (and also on Flink 1.14.4). The pipeline was running without issues up to today. After re -running the pipeline without changes in code (and on the running environment, theoretically) we are consistently encountering this issue. Any idea on this? Could be caused by a bug in state handling?
    a
    y
    • 3
    • 10
  • y

    Yannick - Co-Founder Flike

    06/30/2022, 4:29 PM
    hey there - young startup building a real-time recommender systems API. We run in some scalability problems with our current infra and are now checking out Apache Druid and Apache Flink. Specifically we get raw user events from Kafka and do different kinds of aggregations (e.g. transforming to a table containing all user-item interactions and aggregating newest user information about all users, etc.). So the questions is, which tool to use for this real-time data transformation and aggregation: Apache Druid or Apache Flink Constraints: • Only three engineers to setup&maintain • 100K rec/min Open Question: • Which one is faster to setup? • Which one is easier to maintain?
    s
    k
    • 3
    • 5
  • a

    Alex Cruise

    06/30/2022, 6:00 PM
    I feel like we should have a recommendation about redirecting troubleshooting questions to the mailing list; the history monster will destroy a lot of knowledge
    m
    • 2
    • 1
  • k

    Krishna Chaithanya M A

    06/30/2022, 6:16 PM
    Hi, we are trying to deploy our code, but we are facing a cluster entry point error. So we decided to have a simple wordcount jar deployed, but i see that is also failing with cluster entry point error. I want to know if there is anything i am missing in the configuration. below is the error i am seeing 2022-06-30 163723,934 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='nwsc-stream-processor-dev01-cluster-config-map'}. 2022-06-30 163723,934 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for supplychain-nwsc/nwsc-stream-processor-dev01-cluster-config-map, watching id:64757b16-1c58-44d7-a4ca-af4f499faf4a 2022-06-30 163723,941 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Start SessionDispatcherLeaderProcess. 2022-06-30 163724,012 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Resource manager service is granted leadership with session id 62168afd-9ddb-4bdf-b359-3dc99314725e. 2022-06-30 163724,012 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess. 2022-06-30 163724,014 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore. 2022-06-30 163724,015 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint. java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored. at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:181) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:190) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:184) ~[flink-dist-1.15.0.jar:1.15.0] ... 4 more 2022-06-30 163724,024 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124 2022-06-30 163724,216 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager. 2022-06-30 163724,838 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-06-30 163724,838 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. https://gitlab.com/tmobile/supply-chain-services/nwsc/flink/t-mobile-nwsc-stream-processor/-/blob/feature/wordcount/K8/templates/deployment.yaml
    a
    m
    • 3
    • 10
  • s

    Sucheth Shivakumar

    06/30/2022, 10:01 PM
    Hi All, Is there a way i can write to different files based on the event types in the input stream using FileSink ?
    e
    d
    • 3
    • 5
  • t

    Trystan

    07/01/2022, 12:49 AM
    trying to use the
    Row-encoded Format
    with a filesink using
    OnCheckpointRollingPolicy
    and i ran into an exception during checkpoint thrown here. afaik there aren’t many levers i can turn in here, nor do i know why the parts fall outside these bounds. is there a config i should look for - perhaps the MPU threshold is too low and it’s creating 10k+ parts? what config would adjust that?
    k
    • 2
    • 1
  • e

    E Leonard

    07/01/2022, 8:45 AM
    Hello folks, I’m wondering if this is normal in Flink workflow. Say we have v1.0 of the app deployed in production. Then we made a couple of changes to the data flow, and data structure on v1.1 of our app. When we’re deploying, we’re under the impression of needed to clear the state of the Flink and start fresh. This is obviously not ideal for us. Did we miss any important steps/strategies here? I wonder if you have the experience of this
    m
    a
    • 3
    • 5
  • h

    Hasan Masood

    07/01/2022, 10:59 AM
    Thread*: Minimizing latency for loading pubsub events into redshift.* We're currently using a firebase events source that flow into our data pipeline as follows: Mobile App -> Firebase -> Cloud Function -> PubSub -> Flink -> Firehose -> Redshift We'd like to minimize the latency of this flow. Firehose has restrictions around 1 MB buffer / 60 second for each batch loaded to redshift. This increases our end latency by more than a minute from start to finish. For any folks using Redshift data warehouse and loading data into it in real time in conjunction with flink to perform event processing, do you have suggestions on changes we can make to this data flow to minimize the end to end latency?
    m
    g
    +2
    • 5
    • 21
  • s

    shuaiqi xu

    07/01/2022, 11:47 AM
    I am writing data to clickhouse using a JDBC connection and want to specify the table name dynamically, but FLink's JDBC implementation uses placeholders, which will put a single quote around the table name and cause the write to fail. Is there any way to avoid this
    m
    • 2
    • 2
  • e

    Emile Alberts

    07/01/2022, 1:04 PM
    Hi all I'm having a bit of a weird issue. I'm using the Flink Delta connector to write to S3. I can successfully consume data from my Kafka topic. The data is transformed and written successfully in parquet format to S3 but I'm getting the following error when the Delta log file is created:
    Copy code
    java.io.IOException: Class class com.amazonaws.auth.WebIdentityTokenCredentialsProvider does not implement AWSCredentialsProvider
    	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:662) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:605) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3375) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
    	at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:222) ~[?:?]
    	at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:207) ~[?:?]
    	at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) ~[?:?]
    	at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:136) ~[?:?]
    	at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:215) ~[?:?]
    This only happens when it is running in Kubernetes. The delta log file is successfully created when running Flink with its
    MiniCluster
    . Any ideas of what the issue may be or can point me in the correct direction? Thank you for the assistance in advance
    ✅ 1
    a
    • 2
    • 5
  • g

    George Chen

    07/01/2022, 7:00 PM
    Hi, everyone: new to the community. I have a basic dev question. When a non-keyed operator/sink implements
    CheckpointedFunction
    interface, is
    snapshotState
    API invoked some how in a synchronous fashion with the main api call (invoke/map/flatmap … etc)? It is reasonable to infer from the
    bufferedsink
    example on the doc that it might be the case, but I hope there is some confirmation
    ✅ 1
    a
    • 2
    • 13
  • s

    Sree

    07/02/2022, 1:49 PM
    Hi everyone, I'm getting exceptions while connecting to the hive meta store. I'm using the Flink Iceberg connector to load data into Iceberg tables over S3. The data flow is Kafka -> Flink -> Iceberg - Hive metastore -> S3. The Flink cluster is running in Application mode as containerized deployment on top of Kubernetes. I have loaded htrace-core-4.1.0.jar into classpath which consists of "org/apache/htrace/core/Tracer" But I am still getting the below error. I have tried to both add the dependency to the classpath (flink/lib) and also build the fat jar. Can anyone suggest alternatives or a workaround for resolving this issue?
    m
    m
    k
    • 4
    • 8
  • s

    shuaiqi xu

    07/04/2022, 3:28 AM
    I used standload mode in my test environment, and it seemed that Flink would add a cache for the classes that were being executed, causing the code that I updated the JAR package to be executed as old after it was committed
    ✅ 1
    a
    • 2
    • 11
  • z

    Zain Haider Nemati

    07/04/2022, 10:44 AM
    Hi, We are trying to set up monitoring for flink k8s operator, we want to enable visibility of metrics on cloudwatch but was not able to find any resource online. Could you guys help a bit on that front?
    g
    • 2
    • 7
  • s

    Shqiprim Bunjaku

    07/04/2022, 12:09 PM
    Has anyone used Flink with any Datalakes such as Hudi or Iceberg? I started to work on a proof of concept using Flink and Hudi and there are a lot of issues in Hudi connector that are either open or waiting to be released?
    m
    • 2
    • 1
  • n

    Nithin kharvi

    07/04/2022, 1:19 PM
    Hi, We are facing below error while deploying flink k8s operator on Azure k8s service. But the same deployment works in second azure k8s service. we are not able to figure out what could have gone wrong with first instance k8s. Can anyone please help on this https://github.com/apache/flink-kubernetes-operator/tree/main/helm/flink-kubernetes-operator has anyone encountered this error Error: io.javaoperatorsdk.operator.OperatorException: Couldn't start source ControllerResourceEventSource Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.0.0.1/apis/flink.apache.org/v1beta1/flinkdeployments. Message: request to convert CR from an invalid group/version: flink.apache.org/v1alpha1. Received status: Status(apiVersion=v1, code=500, details=null, kind=Status, message=request to convert CR from an invalid group/version: flink.apache.org/v1alpha1, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=null, status=Failure, additionalProperties={}).
    g
    • 2
    • 17
  • n

    Nithin kharvi

    07/04/2022, 1:19 PM
    complete error stack
  • n

    Nithin kharvi

    07/04/2022, 1:20 PM
    flink-k8s-operator-error-logs.txt
    flink-k8s-operator-error-logs.txt
    🧵 1
    a
    • 2
    • 1
  • u

    仰望星空

    07/05/2022, 2:51 AM
    Can the flink k8s operator only submit jar-type jobs? Can xxx.sql or python types be supported? At present, there are more and more jobs developed using flink-SQL, but I don't know how to submit the oprator?
    n
    m
    g
    • 4
    • 20
  • m

    Mrinal Ekka

    07/05/2022, 6:05 AM
    Hi all, In my Flink application I am trying to enrich an incoming stream of data from Kafka; with the data present in a hudi table. Case 1: Reading batch Hudi table in the flink app. This working in local IDE but fails while deploying to AWS Error:
    Copy code
    org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
    
    Caused by: java.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug. org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
    c
    r
    p
    • 4
    • 5
  • o

    Omar Izhar

    07/05/2022, 9:16 AM
    flink version = 1.15
    Hi Flink community, I am trying to create a Flink application that processes incoming records from a DataStream and performs an aggregated lookup on a table. E.g. an incoming DataStream record with a batch_id has to be joined with the minimum delivery_priority for the same batch_id in the lookup table. The following example denotes what I want to achieve:
    Copy code
    Incoming stream record: (TYPE: DATASTREAM)
    batch_id,    latitude,     longitude
    2134,         24.812,       67.0034
            
    Lookup table: (TYPE: TABLE)
    batch_id    delivery_priority      order_id
    2134              4                 197168
    2134              2                 197438
    2134              1                 201426
    
    Result stream record:
    batch_id,    delivery_priority    order_id    latitude,     longitude
    2134                1              201426      24.812       67.0034
    How can I extract extract a single record from the table after applying MIN(delivery_priority) aggregation and use it's data to enrich the incoming stream record?
    k
    • 2
    • 2
  • m

    Mrinal Ekka

    07/05/2022, 10:32 AM
    Hi, In my Flink application I am trying to read a hudi table from within a Map function with a new TableEnvironment. Is this the correct way? Should this be done as Async I/O?
    Copy code
    DataStream<Events> sideOutputStreamCustomerEvent = stringInputStream.flatMap(new RichFlatMapFunction<String, Events>() {
    			
    			private TableEnvironment tableEnvironment;
    			
    			@Override
    			public void open(Configuration parameters) throws Exception {
    			  super.open(parameters);
    			  EnvironmentSettings settings = EnvironmentSettings
    	    			    .newInstance()
    	    			    .build();
    			  tableEnvironment =TableEnvironment.create(settings);
    			}.......
    The application is throwing the following error at "TableEnvironmentImpl.executeSql" while reading hudi table
    Copy code
    java.lang.NullPointerException
    	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
    	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
    	at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
    	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:78)
    	at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
    	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
    	at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
    	at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
    	at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
    	at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
    m
    g
    • 3
    • 13
  • j

    Jonathan Thomm

    07/05/2022, 12:07 PM
    Hi, when I try to connect Flink with Kafka in sql (I tried the interactive sql client but also a jar file) I get the error:
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    
    Available factory identifiers are:
    
    blackhole
    datagen
    filesystem
    print
    I’m new to Flink. Any help would be great! My sql query is:
    Copy code
    CREATE TABLE interactions(
      userId varchar,
      `properties.id` varchar,
      `properties.sessionId` varchar,
      `itemId` AS COALESCE(`properties.sessionId`, `properties.id`),
      `properties.hotSpotName` varchar,
      `properties.name`varchar,
      `hotSpotName`AS COALESCE(`properties.hotSpotName`, `properties.name`),
      `timestamp` TIMESTAMP_LTZ(3)
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'some-raw-events',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json',
      'properties.bootstrap.servers' = 'localhost:9092'
    );
    m
    • 2
    • 5
1...456...98Latest