i’m curious if there is a “cleanest” way to spin u...
# troubleshooting
t
i’m curious if there is a “cleanest” way to spin up an HTTP server in each TM. creating a custom operator that opens and closes seems to hook into the lifecycle of the job closely, which is good. creating a static singleton seems to work and survive across restarts but the pattern doesn’t inspire confidence. is there a third option? (context: we have custom prometheus metrics in library code. we want to expose those metrics. we’ve managed to get away with jacking them into flink’s prometheus registry so long as we avoid dynamic classloading and exclude job_id from scope variables… but with the k8s operator this no longer appears reliable. we’d like to separate our “custom” metrics from flink’s metrics and have a clean break)
m
Why wouldn’t you just implement a custom metric? For example, like explained on https://docs.immerok.cloud/docs/how-to-guides/development/measuring-latency/ ?
t
we do in some cases! but it has two major limitations (or i’m not adept enough to see my way around them 😉) 1. we cannot use custom labels. grouping doesn’t add labels, it appends group key/values to the metric itself, not labels. this makes rollups and querying near impossible 2. we often have library metrics that are pure prometheus metrics. say a service library that tracks reqs - it’s simple a static Counter that hooks into the prom defaultRegistry. due to the
FlinkUserCodeClassLoader
we end up in a second registry a. one way around it seems to be
Copy code
metrics.reporter.prom.scope.variables.excludes: job_id
classloader.parent-first-patterns.additional: io.prometheus
but that doesn’t always work and feels increasingly hacky
if there is a better solution i’d love to know! or if you have a better way to hook our secondary server into the job lifecycle that’d be great too
m
Have you checked Jira if an item already exists for these limitations? I think that’s the problem that should be addressed.
t
i have not found anything in jira. it seems being able to use flink metrics with labels is a very common question (in this slack and mailing list). as far as hooking non-flink, user-defined prom metrics into the same registry, i’m not at all sure how that can be done short of how we’re attempting it due to the classloading behavior (and even that, sometimes, fails completely and runs into a metric collision)
c
grouping doesn’t add labels, it appends group key/values to the metric itself, not labels
Can you elaborate on this? At least from Flinks perspective we are not attaching labels to a singular measurement but the time series itself. Afaict in the prometheus client it's not even possible to attach a label to a single measurement.
If you can share some code as to how you use the prometheus client to setup labels and/or some links to the prometheus docs on that topic it would be highly appreciated.
t
sure! for your flink metrics question, when i do
Copy code
flinkCounterOne = register.addGroup("key1").addGroup("value1").counter("counter_one");
this is what i get when scraping flink’s prom reporter:
Copy code
flink_taskmanager_job_task_operator_key1_value_counter_one
(i’m excluding some labels it attaches by default, like job_id, operator_name, etc) what i want is this:
Copy code
flink_taskmanager_job_task_operator_counter_one{key1="value1"}
c
try
addGroup("key1", "value1")
t
will test that again in a minute! i thought i had done so but it’s very possible i had a typo and they wound up as a single key
Copy code
flink_taskmanager_job_task_operator_key1_counter_one{job_id="b6626816beb11a6539b3b7dc06f57cdf",key1="value1"...}
so that’s still not quite it -
key1
is added to the metric name itself. but this is closer! i definitely did have a typo when originally testing the key/value addGroup (which gave me something unexpected) - that’s a very subtle difference in what the two versions of that method do
secondarily, having to preregister values is very difficult. label values are often determined at runtime - so there might be some workaround functions to cache these in a map and add them on the fly but it still doesn’t get around the fact that now i have 3 metrics instead of 1 metric with 3 label keys it still doesn’t solve registering custom prom metrics with the collector, either, for cases like a shared lib that has its own internal metrics
c
key1
is added to the metric name itself.
We could add an option to change this behavior such that the key is not added to the metric name.
🙌 1
t
that’d be great! imo that (and dynamic label values) are critical for using this properly with prometheus
c
What is preventing you from creating a new metic group on the fly at runtime with whatever label values you like?
it still doesn’t solve registering custom prom metrics with the collector, either, for cases like a shared lib that has its own internal metrics
This is ultimately something that you'll need to solve by extending the reporter I think. There is no good story in Flink for generic cluster lifecycle hooks (== stuff outside outside of jobs) that'd allow you to register things like this.
t
What is preventing you from creating a new metic group on the fly at runtime with whatever label values you like?
because once the metric is registered it’s bound with key/value pairs already. what we need is essentially to give it the keys ahead of time and values at runtime. example “raw” prometheus usage declaration:
Copy code
public static final Counter counterOne = Counter.build()
          .name("counter_one_total")
          .help("Count of events")
          .labelNames("schema", "source")
          .register();
and usage:
Copy code
counterOne.labels(event.schema, event.source).inc();
(this is an example with two labels - schema, source and the values being populated during runtime via event props)
This is ultimately something that you’ll need to solve by extending the reporter I think. There is no good story in Flink for generic cluster lifecycle hooks (== stuff outside outside of jobs) that’d allow you to register things like this.
this is helpful, thank you! i’m still not sure precisely how to tie code loaded by the
FlinkUserCodeClassLoader
to something controlled outside of that scope. prom seems to expect a global, jvm-wide singleton registry. which is why i was attempting to spin up a server “inside” the job such that the user prom metrics and a job-local http server wound up in the same class loading “scheme” (i’m probably not using the correct words for this…)
c
because once the metric is registered it’s bound with key/value pairs already. what we need is essentially to give it the keys ahead of time and values at runtime.
The current way to achieve this is creating separate groups/counters for each label value, It's not great admittedly (in particular since you need to cache all the groups!), but it should be at least possible.
i’m still not sure precisely how to tie code loaded by the
FlinkUserCodeClassLoader
to something controlled outside of that scope.
The simplest way to achieve this should be adding the metrics library (aka, the prometheus client and maybe some utils around that) into the lib/ directory of the Flink distribution; then you should be able to call that from the job code without any classloading problems, so long as you make sure that the job jar doesn't bundle the prometheus client (or you added these classes to the parent-first classloading patterns).
t
it feels like we’re very nearly there, then! we did add
classloader.parent-first-patterns.additional: io.prometheus
that fixes it most of the time - once in a blue moon we have an duplicate registration issue when the job restarts 😞 there’s also one particular job which doesn’t start at all (immediate duplicate metrics error from prom). one thing we did not do, though, was add prom directly to flink’s lib/ dir - we are copying
flink-metrics-prometheus
to the lib/ dir. maybe that’s wrong…
c
we are copying
flink-metrics-prometheus
to the lib/ dir. maybe that’s wrong…
That should be fine-ish. I would advise against it though personally because you aren't in control of the prometheus client library version; wouldn't want your code to break because we updated the client.
once in a blue moon we have an duplicate registration issue when the job restarts
Do you ensure that you unregister all metrics when a job fails/restarts, e.g., in the close() method of user-defined functions?
👍 1
t
i’ll try just using the prom-client jar and leaving
flink-metrics-prometheus
in the plugins dir! we don’t unregister all metrics right now. flink is an unusual case (for us) where the jvm lifecycle isn’t tied to the metric lifecycle. typically the jvm just crashes and that’s that. also, the user job jar is presumably just loaded once, and started N times so metrics shouldn’t re-register, right? the classloader has already loaded those statically-defined metrics once i want to look into clearing metrics but the same issue arises - we don’t have a lifecycle hook to unregister. and if we did, i suspect we’d be clearing flink’s registered metrics and that might be a problem, too (unless it re-registers when incrementing if not present?). definitely an option to test, though edit: i do see that the prom reporter itself clears the metrics registry on
close
, but that’s likely on flink runtime exit, right? not job exit
c
just loaded once, and started N times so metrics shouldn’t re-register, right
2 things: • Since you said that they are statically-defined it should work, although you do leak some memory if you don't clean them up because the user classloader cannot be cleaned up if the prom registry has a reference to an object that comes from user-code. • There are some scenarios where job classes are unloaded from a TM and later reloaded. So long as a taskmanager has a slot for the job the calsses will stay, meaning that If the job just restarts this doesn't happen. If the job is rescaled then this might no longer hold; scale down could leave 1 TM not being involved in a job, it cleans up the job classloader, and then later the job is running on it again.
we don’t have a lifecycle hook to unregister
see
RuntimeContext#registerUserCodeClassLoaderReleaseHookIfAbsent
i suspect we’d be clearing flink’s registered metrics
So this depends. If io.prometheus is in the parent-first classloading patterns then closing the registry does indeed affect Flink metrics, because the reporter will be backed by the same registry. If you don't add it to the parent-first pattern and adjust the packaging instead (prom client in lib/ and not in user-jar) then they should be separate registries. You can unregister individual metrics though.
edit: i do see that the prom reporter itself clears the metrics registry on
close
, but that’s likely on flink runtime exit, right? not job exit (edited)
This is indeed the exit from the runtime, but it also removes individual metrics in
notifyOfRemovedMetric()
.
t
thank you for all your help! seriously appreciated. i’m trying to repro our situation where we end up with a fatal collision on restart and i just can’t consistently cause it. i thought your second bullet point could do it, but with the k8s operator there seems to be no real way to have extra TMs (was thinking i could kill pods selectively in order to cause it to reload on a “spare” TM). so far no leads
hope i’m not bothering you with this follow up, but i wonder if there is another possible scenario to cause the job classes to be unloaded a reloaded. is there any world in which time could play a role? like reference got cleaned up and so the usercode classloader may have to reload it? i had a job running fine for 3h. killed a TM, got a collision - so presumably the job classes were reloaded. did a clean deploy, killed a TM within a few minutes, no collision - so the job classes weren’t reloaded.
c
Well, yes and no. At it's core if a TM was used for a job then slots were allocated on that TM for the job, and the user classloader will remain open for as long as slot is allocated. If that slot is unused for a period of time it may be released, causing the user classloader to be closed and garbage collected.
t
interesting - both TMs were in use. but maybe it’s possible that during the time it took for the replacement TM to recover, the first went unallocated, GCed while waiting, then had to reload once the replacement became active..?
c
Id suggest to register a class loader release hook to log when it is being unloaded, along with all metrics that your job is registering. This should give you a clear picture as to what is going on. Oh, depending on how your static metric registration code works it could also be that this very class gets GC"d, and the next time a job restarts and accesses the class it reregisters metrics again
t
thanks, i’ll try that!
okay one last update. we just found this: https://issues.apache.org/jira/browse/FLINK-30020, effective in 1.17, which almost certainly makes all this pain a moot point - the
defaultRegistry
(which our libs all use), is no longer used by the prom reporter - so our backdoor hook into flink/prom’s registry will also be disconnected. time to find another solution! it’s probably for the best; this has always felt a bit hacky 🙂 thanks again for all your help, it’s really helped clarify some of the order of operations and interplay between flink components that i was conceptually missing!