Ryan van Huuksloot
09/25/2023, 7:07 PM@SerialVersionUID(1L)
abstract class Base {
val base: Any
}
Generic Wrapper:
@SerialVersionUID(1L)
@TypeInfo(classOf[BaseTypeInfoFactory])
case class AggBase[+T <: Base](
key: String,
base: T
) {
// Required to be a POJO for TypeInfo Serialization
def this() = this("", null.asInstanceOf[T])
}
class BaseTypeInfoFactory extends TypeInfoFactory[Base[_]] {
override def createTypeInfo(
t: Type,
genericParameters: java.util.Map[String, TypeInformation[_]]
): TypeInformation[Base[_]] = {
val baseType = genericParameters.get("T")
baseType match {
case basex if basex.getTypeClass == classOf[BaseX] =>
val fields = Map(
"key" -> Types.STRING,
"base" -> Types.POJO(classOf[BaseX])
)
Types.POJO(classOf[AggBase[_]], fields)
// Add more cases for other supported types
case _ => throw new Exception("Insight type not supported")
}
}
}
Then the datastream would union a bunch of these types
val baseDataStreams: List[DataStream[AggBase[_ <: Base]]] = List(
basexDataStream,
baseyDataStream
)
This obviously doesn't work since DataStream isn't covariant. Thoughts on the best alternative way to approach this. (I've tried to make the Scala DataStream type covariant, that looks like a lift).
There are a lot of classes that inherit "Base", so Either or some equivalent isn't ideal if it can be avoided.Alexey Novakov
09/26/2023, 11:43 AMRyan van Huuksloot
09/26/2023, 2:06 PMderiveTypeInformation
. The TypeMapping also looks like it could be useful!
I'll fiddle with it a bit today.Alexey Novakov
09/26/2023, 2:28 PMderiveTypeInformation
functionRyan van Huuksloot
09/26/2023, 2:33 PMderiveTypeInformation
. I did download the lib.
That ADT requires you to cast to Event - no? Which means you lose the generic parameter during type casting that I need.
The TypeInformation isn't the problem. I am curious more about the TypeMapping. Have you ever used it before? It looks like you can cast to a "generic" type to stay invariant but it holds the type that it should be.Ryan van Huuksloot
09/26/2023, 2:34 PMclass BaseTypeInfoFactory extends TypeInfoFactory[Base[_]] {
override def createTypeInfo(
t: Type,
genericParameters: java.util.Map[String, TypeInformation[_]]
): TypeInformation[Base[_]] = {
val baseType = genericParameters.get("T")
baseType match {
case basex if basex.getTypeClass == classOf[BaseX] =>
val fields = Map(
"key" -> Types.STRING,
"base" -> Types.POJO(classOf[BaseX])
)
Types.POJO(classOf[AggBase[_]], fields)
// Add more cases for other supported types
case _ => throw new Exception("Insight type not supported")
}
}
}
`val baseType = genericParameters.get("T")
This returns nothing because it is abstracted away so I can't use POJOs. If I want to use Generics then I can use Covariants but Generics are badRyan van Huuksloot
09/26/2023, 2:37 PMRyan van Huuksloot
09/26/2023, 2:56 PMAlexey Novakov
09/26/2023, 3:00 PMRyan van Huuksloot
09/27/2023, 8:37 PM