:wave: Hello All, I have a "fun" Scala problem th...
# troubleshooting
r
πŸ‘‹ Hello All, I have a "fun" Scala problem that maybe the community can help with. Essentially I am trying to have a DataStream support CoVariant types so that I can union traits. Ex.
Copy code
@SerialVersionUID(1L)
abstract class Base {
  val base: Any
}
Generic Wrapper:
Copy code
@SerialVersionUID(1L)
@TypeInfo(classOf[BaseTypeInfoFactory])
case class AggBase[+T <: Base](
  key: String,
  base: T
) {
  // Required to be a POJO for TypeInfo Serialization
  def this() = this("", null.asInstanceOf[T])
}

class BaseTypeInfoFactory extends TypeInfoFactory[Base[_]] {
  override def createTypeInfo(
    t: Type,
    genericParameters: java.util.Map[String, TypeInformation[_]]
  ): TypeInformation[Base[_]] = {
    val baseType = genericParameters.get("T")
    baseType match {
      case basex if basex.getTypeClass == classOf[BaseX] =>
        val fields = Map(
          "key" -> Types.STRING,
          "base" -> Types.POJO(classOf[BaseX])
        )
        Types.POJO(classOf[AggBase[_]], fields)
      // Add more cases for other supported types
      case _ => throw new Exception("Insight type not supported")
    }
  }
}
Then the datastream would union a bunch of these types
Copy code
val baseDataStreams: List[DataStream[AggBase[_ <: Base]]] = List(
      basexDataStream,
      baseyDataStream
    )
This obviously doesn't work since DataStream isn't covariant. Thoughts on the best alternative way to approach this. (I've tried to make the Scala DataStream type covariant, that looks like a lift). There are a lot of classes that inherit "Base", so Either or some equivalent isn't ideal if it can be avoided.
a
Hi @Ryan van Huuksloot. Last time I used covariant types, it was long time ago. What do you want to achieve is to have some common type of the elements in the DataStream? Perhaps some ADT approach like here: https://github.com/flink-extended/flink-scala-api#flink-adt ?
r
πŸ‘‹ Hi Alexey, I'd not convinced the ADT would work but I would need to try it with the
deriveTypeInformation
. The TypeMapping also looks like it could be useful! I'll fiddle with it a bit today.
a
yeah, I am not sure if that can really help to achieve the desired logic. Please note, this is an additional library which comes with
deriveTypeInformation
function
r
No dice with the
deriveTypeInformation
. I did download the lib. That ADT requires you to cast to Event - no? Which means you lose the generic parameter during type casting that I need. The TypeInformation isn't the problem. I am curious more about the TypeMapping. Have you ever used it before? It looks like you can cast to a "generic" type to stay invariant but it holds the type that it should be.
^ More information TLDR:
Copy code
class BaseTypeInfoFactory extends TypeInfoFactory[Base[_]] {
  override def createTypeInfo(
    t: Type,
    genericParameters: java.util.Map[String, TypeInformation[_]]
  ): TypeInformation[Base[_]] = {
    val baseType = genericParameters.get("T")
    baseType match {
      case basex if basex.getTypeClass == classOf[BaseX] =>
        val fields = Map(
          "key" -> Types.STRING,
          "base" -> Types.POJO(classOf[BaseX])
        )
        Types.POJO(classOf[AggBase[_]], fields)
      // Add more cases for other supported types
      case _ => throw new Exception("Insight type not supported")
    }
  }
}
Copy code
`val baseType = genericParameters.get("T")
This returns nothing because it is abstracted away so I can't use POJOs. If I want to use Generics then I can use Covariants but Generics are bad
My current work around is to just use Generic for the specific field that is a dynamic type in the Base class. At least then the whole class is not Generic.
With POJO TypeInfoFactories on each of the final case classes that looks like it worked! It nukes the logs though - any reason for that πŸ€” (They don't show up anymore)
a
No, I have not used TypeMapping before
πŸ‘Œ 1
r
Update: Logs - I needed to exclude slf4j Flink-ADT didn't really work because you still can't have a DataStream that has a Base class with multiple final case classes as the individual types. It is only really nice if you want to gen a bunch of TypeInformation for single streams
πŸ‘ 1