Hi team, Can someone assist with sink and source ...
# troubleshooting
l
Hi team, Can someone assist with sink and source from Kinesis streams on 1.19 using the flink-connector-aws libs? I am trying to use flink-connector-aws-kinesis-streams with the latest version
4.3.0-1.19
. The examples in the public repo in the test folder appear to be for 1.18, and when I try to write the same code locally using 1.19 I get errors when attempting to sink to my DataStream:
Cannot resolve method 'sinkTo(KinesisStreamsSink<String>)'
with suggestions to cast arguments to get it working with the sinkTo methods.
d
I noticed that some of the dependencies do have updates like for example: https://mvnrepository.com/artifact/org.apache.flink/flink-table-common/1.19.1
I think you might need to ensure that you are applying those updates for it to work
Make sure that all dependencies are set for Flink 1.19 explicitly so you get the version you need. If you are using mvn it would look something like:
Copy code
<dependencies>
    <!-- Flink Core Dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.19.1</version> <!-- Ensure this matches Flink 1.19 -->
    </dependency>

    <!-- Flink AWS Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-aws-kinesis-streams</artifactId>
        <version>4.3.0-1.19</version>
    </dependency>

    <!-- ensure other Flink-related dependencies are on version 1.19 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>1.19.1</version>
    </dependency>
    <!-- and so on for other Flink dependencies -->

    <!-- AWS SDK dependency, choose a compatible version -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>kinesis</artifactId>
        <version>2.26.24</version> <!-- Or the version compatible with your Flink connector -->
    </dependency>

    <!--oOther dependencies like testcontainers, equalsverifier, etc., adjust versions as needed -->
</dependencies>
Try setting each dependency in your pom or gradle file
Let us know if that works! if not there are some other options
l
Hi @D. Draco O'Brien, thanks for the response. I have tried doing what you suggested and put it into a seperate new simplified repo, I still get the build error. Is casting it to the Sink type the correct behaviour?
Copy code
fromGen.sinkTo((Sink<String>) kdsSink);
I believe it was adding this dependency which has fixed my issue 🙂
Copy code
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.19.1</version>
        </dependency>
d
ok, so it’s working now?
l
Encountering another problem now with the upgrade, I get an error about
return type of function ' Kinesis Record Stream' could not be determined automatically
. I believe this is coming from my DeserializationSchema where I have a
getProducedType()
method. The issue is I have a type which is a List<T>. Currently I just have getProducedType return :
Copy code
return TypeInformation.of(new TypeHint<>() {});
However that doesn’t seem to work now with the new Flink version.
d
you may need to give it more information about the type … replace
Copy code
return TypeInformation.of(new TypeHint<>() {});
with something showing the actual type if possible
e.g
Copy code
return new ListTypeInfo<>(TypeInformation.of(MyRecord.class));
if the type it should resolve to is MyRecord.class
With simple types you typically dont need to do this but with more complex types some hint might be needed.
l
ok thanks Ill try the ListTypeInfo !
I got it working by splitting the DataStream into two, but this would be better
d
this assumes that MyRecord is pojo or type that Flink can understand ie public no arg constructor etc
getter/setters
if MyRecord is generic itself than you might instead have to TypeInformation or TyoeExtractor to extract type info at runtime
🙌 1
if complex or nested you may need to define TypeInfo for the class
🙌 1
l
hey @D. Draco O'Brien, encountered another issue and asked in a new thread