Hi I have some code which works in flink 1.16 but ...
# troubleshooting
a
Hi I have some code which works in flink 1.16 but when I upgrade to flink 1.17 it stops working. The code is very simple. You just create a FileSource by reading a CSV file, convert to DataStream, convert that to Table and collect it. If I dont do the last conversion and collect the DataStream directly it works fine..so there is some issue here which only happens in 1.17. This is the exception I get:
Copy code
IllegalArgument fromIndex(2) > toIndex(0)
I will post the code in the thread so that it doesnt take up space.
Copy code
@Test
void testCsvFileReading() throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    Path csvDomainsFilePath = new Path("path/to/file.tsv");
    CsvMapper mapper = new CsvMapper();
    CsvSchema schema = mapper
            .schemaFor(CsvDomainRecord.class)
            .withoutQuoteChar()
            .withSkipFirstDataRow(true)
            .withLineSeparator("\n")
            .withColumnSeparator('\t');

    CsvReaderFormat<CsvDomainRecord> csvFormat = CsvReaderFormat
            .forSchema(schema, TypeInformation.of(CsvDomainRecord.class));

    FileSource<CsvDomainRecord> source = FileSource
            .forRecordStreamFormat(csvFormat, csvDomainsFilePath)
            .build();

    DataStreamSource<CsvDomainRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "csv-domains-file");

    Table table = tableEnv.fromDataStream(stream);
    tableEnv.toDataStream(table)
            .addSink(new CollectSinkRow());

    env.execute();
}

private static class CollectSinkRow implements SinkFunction<Row> {
    private static final long serialVersionUID = 1367758814864574150L;
    public static final List<Row> values = Collections.synchronizedList(new ArrayList<>());

    @Override
    public void invoke(Row value, SinkFunction.Context context) {
        values.add(value);
    }
}
Stacktrace:
Copy code
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
        at java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
        at java.base/java.util.AbstractList.subList(AbstractList.java:497)
        at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
        at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
        at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
        at org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
        at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
        at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
        at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
        at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
        at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
        at org.apache.calcite.rel.metadata.RelMetadataQuery.collations(RelMetadataQuery.java:604)
        at org.apache.calcite.rel.metadata.RelMdCollation.project(RelMdCollation.java:291)
        at org.apache.calcite.rel.logical.LogicalProject.lambda$create$0(LogicalProject.java:125)
        at org.apache.calcite.plan.RelTraitSet.replaceIfs(RelTraitSet.java:244)
        at org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:124)
        at org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:114)
        at org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:178)
        at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:2191)
        at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1970)
        at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:165)
        at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
        at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
        at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
        at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
        at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
        at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:76)
        at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289)
        at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
        at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:253)
        at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:226)
So I was able to pinpoint the exact reason which is causing this error. When I have jacoco prepare agent in my pom:
Copy code
<execution>
    <id>prepare-agent</id>
    <goals>
        <goal>prepare-agent</goal>
    </goals>
</execution>
I get this error...if I remove jacoco plugin and run it, it works... Seems like a bug to me with some flink and jacoco integration?
Anything I can try to fix this or is it really a bug and should I create a ticket for that?