Hi All , when developing flink App that read from ...
# troubleshooting
a
Hi All , when developing flink App that read from kafka and write into FS Every things work fine , But when add maven
Copy code
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parquet_2.11</artifactId>
    <version>1.12.7</version>
</dependency>
the program gets this error > at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) > at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69) > at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) > at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47) > at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75) > at org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145) > at java.base/java.util.Collections$SingletonList.forEach(Collections.java:4856) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145) > at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128) > at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:72) > at org.apache.flink.table.operations.FilterQueryOperation.accept(FilterQueryOperation.java:68) > at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186) > at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:217) > at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:164) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike.map(TraversableLike.scala:233) > at scala.collection.TraversableLike.map$(TraversableLike.scala:226) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) > at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) > at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) > at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) > at org.orangeFlinkDS.TableAPIExample.main(TableAPIExample.java:59) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) > ... 8 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kafka' > at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:385) > at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:372) > at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ... 56 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > > Available factory identifiers are: > > blackhole > datagen > filesystem
d
Hey Aly, just wanted to get a bit more information on this. Looks like it might be a classpath issue. Are you building as a ‘fat’ jar? and also if you could run mvn dependency tree and check for any duplicate dependencies. You may want to consider using Maven Shade Plugin if you are building as a ‘fat’ jar which is can help resolve conflicting dependencies and using ‘fat’ jar is a way to at least get everything on the classpath for sure. Exclusions might be ncessary for duplicates as well if you find them.
a
Yes I create a fat jar
And it works very well
After I add the dependency of parquet
d
Ok
a
I got the error
d
ok check dependency tree for any duplicates
if you want to try the shade plugin that might help as well
a
I used shade plugin when creating a fat jar before but the problem is the same
d
ok, well there might be duplicate somewhere among the jars that is using a different version of Parquet.
I do see similar issue https://github.com/apache/hudi/issues/6297 which was noticed when adding that version of parquet I think
a
If i changed to pyflink the problem will be remained ? Because I think it’s dependepies and errors in java if I go to python I will run away from java and these errors
@D. Draco O'Brien
d
Are you using Table API or SQL API? I heard Table API more stable in PyFlink
a
I intend to use table API , i will give a try
d
Yes, that seems to be what works best with PyFlink
Flink 1.19, and Java 11 work best with PyFlink and Table API