Hello everyone, I am trying to run unit tests usin...
# troubleshooting
s
Hello everyone, I am trying to run unit tests using apache flink and scalatest module. I am declaring a StreamExecutionEnvironment and I am using that reference in all of my UTs. I have written 16 UTs. What happens is first 8 out of them pass while the next 8 fails. Tests which were failing earlier are passing if I shift them to 1-8 position and those passing start to fail if I keep them at 9-16 position. So in short there is no logic issue here. I am getting following error -
Copy code
java.lang.RuntimeException: Failed to fetch next result
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
  at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
  at scala.collection.TraversableOnce$<http://class.to|class.to>(TraversableOnce.scala:310)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$<http://1.to|1.to>(CloseableIterator.scala:35)
  ...
  Cause: java.io.IOException: Failed to fetch job execution result
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
  at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
  ...
  Cause: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
  ...
  Cause: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
  at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
  at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
  at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
  at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
  at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
  at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
  at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
  ...
  Cause: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
  at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
  at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
  at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  ...
  Cause: com.esotericsoftware.kryo.KryoException: Unable to find class: sun.reflect.GeneratedConstructorAccessor88
Serialization trace:
delegate (sun.reflect.DelegatingConstructorAccessorImpl)
constructorAccessor (java.lang.reflect.Constructor)
_constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
_fromLongCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
_valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
_rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
objectMapper (com.jayway.jsonpath.spi.mapper.JacksonMappingProvider)
mappingProvider (com.jayway.jsonpath.Configuration)
configuration (com.jayway.jsonpath.internal.JsonContext)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
  at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
  ...
  Cause: java.lang.ClassNotFoundException: sun.reflect.GeneratedConstructorAccessor88
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
  at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
  at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:348)
  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
I am using object to initialize StreamExecutionEnvironment. The code goes like following
Copy code
def getStreamExecutionEnvironment: StreamExecutionEnvironment = {
  val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val unmodifiableCollection = Class.forName("java.util.Collections$UnmodifiableCollection")
  streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])

  streamExecutionEnvironment
}
I suspect this to be a memory issue. Could someone please help??