Hi All! I have a problem I am not sure the best wa...
# troubleshooting
d
Hi All! I have a problem I am not sure the best way to solve. I have a highly parallel high volume non-keyed stream that is being fed into a keyed aggregate window. The problem is that records with the same key are processed close together and the volume will overwhelm a single parallel task in a keyed window. I’d like to add a non-keyed “Reducer” upstream of my keyed window to do some pre-aggregation to reduce the load on my window. Something like a session or tumbling window with a few seconds session length would be great, however it seems I cannot have non-keyed windows with greater than 1 parallelism. Does anyone have any suggestions for such a problem?
The best idea I have right now is to use a keyed window but to append a numeric suffix to the key between 0 and the parallelism I set: •
key_0
key_2
key_5
This will at least create some distribution to not overload a single parallel operator. But I’d rather use a non-keyed stream because I think it’ll be more efficient.
d
The behavior you’re describing might be a misconfiguration of the Flink Kubernetes Operator or how it is set up to watch multiple namespaces. Check that the update to the Helm chart with the additional namespaces was successfully applied to the Flink Operator’s deployment
Copy code
.kubectl describe deployment <flink-operator-deployment-name>
also check logs carefully with DEBUG level settings
Copy code
kubectl logs <operator-pod>
d
You mean the parallelism being 1? It’s also documented in the docs: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic will be performed by a single task, i.e. with parallelism of 1.
d
That might also be an issue. We need to confirm that the changes you are applying actually get applied to the environment.
Describing the deployment will show this. operator pod logs may also shed more light on the problem.
Also did you check the Flink operators service account and make sure the RoleBinding exist and is correct?
d
I’m not even at the point on dealing with deployments on EKS yet. This is early design phase and testing is only in my IDE. When I try to set parallelism on a non-keyed stream I get get the following which seems to confirm that parallelism > 1 is illegal for non-keyed windows.
Exception in thread “main” java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at org.apache.flink.api.common.operators.util.OperatorValidationUtils.validateParallelism(OperatorValidationUtils.java:35)
at org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.setParallelism(SingleOutputStreamOperator.java:139)
at org.apache.flink.streaming.api.scala.DataStream.setParallelism(DataStream.scala:120)
Maybe it’s not allowed because timers are not available on non-keyed operators as well.
d
well your right about the parallelism limitation
Maybe look at pre-processing of the keyed state
You could look at using TimedWindows or SlidingWindow before the keyed aggregations. That’s one approach
d
That’s what I want to do but am not able because I can’t run parallelism over 1.
d
or …
Instead of using a non-keyed reducer, you could implement a pre-processing step using keyed state. Before your main keyed aggregation, key the stream by a derived attribute that gives a reasonable distribution given your data. This could be something like hashing a timestamp to create a pseudo-bucket or using a part of the data that distributes the load well.
Do a local aggregation by using a map operation with local keyed state to perform lightweight aggregations locally within each parallel instance. The state could store intermediate aggregates per key within each parallel task.
After the local aggregation, use a rescale() or rebalance() operator to redistribute the data evenly across tasks. This affectively reshuffles the data.
Finally, apply your main keyed window aggregation. Data volume will have been reduced by preprocessing step.
I dont think this will violate the parallelism.
d
Yeah, that’s basically the solve I posted in the first comment of this thread. Derive a key to create a distribution similar to the parallelism of the operator.
d
It should work but the implementation might need to be adjusted
Did you do a rescale() or rebalance() to distribute evenly across tasks?
The steps I outlined will not violate the parallelism requirement
d
My stream is well balanced. It uses a combination of rebalance and rescale in the appropriate places.
d
Yes, well that error is just a hard no on non-keyed operators for parallel settings
d
Yeah… seems like a strange limitation.