Hi All, we are using 0.7.1 version with gcs as the...
# troubleshooting
l
Hi All, we are using 0.7.1 version with gcs as the deepstore. In a perf test, we are seeing controller becoming the bottleneck in segment upload path to gcs. And we came across the following doc to overcome the controller bottleneck https://docs.pinot.apache.org/operators/operating-pinot/decoupling-controller-from-the-data-path Have a basic question here? • How split commit is related to peer download? As per my understanding, commit is in the ingestion path and needs upload to deep(gcs). And from peer download feature, I understand that segments are downloaded from other peer servers in case of deepstore(gcs) unavailability. Though, I had gone through this code path and figured out the implementation is same as documentation, I didn’t understand how and why these two (upload while committing and peer download) are coupled?
k
is controller bottleneck in real-time ingestion path or batch ingestion path
l
For us controller is bottleneck only in realtime ingestion path. We don’t have batch ingestion path but we use managed offline flows.
Pinot server thread dumps looks like below
Copy code
java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(java.base@11.0.11/Native Method)
	at java.net.SocketInputStream.socketRead(java.base@11.0.11/Unknown Source)
	at java.net.SocketInputStream.read(java.base@11.0.11/Unknown Source)
	at java.net.SocketInputStream.read(java.base@11.0.11/Unknown Source)
	at shaded.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
	at shaded.org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
	at shaded.org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
	at shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
	at shaded.org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
	at shaded.org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
	at shaded.org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
	at shaded.org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:157)
	at shaded.org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
	at shaded.org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
	at shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
	at shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
	at shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
	at shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendRequest(FileUploadDownloadClient.java:383)
	at org.apache.pinot.common.utils.FileUploadDownloadClient.uploadSegmentMetadataFiles(FileUploadDownloadClient.java:508)
	at org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler.sendCommitEndWithMetadataFiles(ServerSegmentCompletionProtocolHandler.java:231)
	at org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler.segmentCommitEndWithMetadata(ServerSegmentCompletionProtocolHandler.java:138)
	at org.apache.pinot.core.data.manager.realtime.SplitSegmentCommitter.commit(SplitSegmentCommitter.java:66)
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commit(LLRealtimeSegmentDataManager.java:878)
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commitSegment(LLRealtimeSegmentDataManager.java:848)
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:615)
	at java.lang.Thread.run(java.base@11.0.11/Unknown Source)

"RMI TCP Connection(idle)" #83020 daemon prio=5 os_prio=0 cpu=3998.34ms elapsed=520.00s tid=0x00007f571c58b800 nid=0x1618c waiting on condition  [0x00007f5581501000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
	- parking to wait for  <0x0000000681a08ad8> (a java.util.concurrent.SynchronousQueue$TransferStack)
	at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.SynchronousQueue$TransferStack.transfer(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.SynchronousQueue.poll(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/Unknown Source)
	at java.lang.Thread.run(java.base@11.0.11/Unknown Source)
pinot controller stack looks like this one
Copy code
"grizzly-http-server-29" #154 prio=5 os_prio=0 cpu=1273160.39ms elapsed=93474.47s tid=0x00007f651d4bf000 nid=0xe0 runnable  [0x00007f6314fd3000]
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(java.base@11.0.11/Native Method)
	at java.net.SocketInputStream.socketRead(java.base@11.0.11/Unknown Source)
	at java.net.SocketInputStream.read(java.base@11.0.11/Unknown Source)
	at java.net.SocketInputStream.read(java.base@11.0.11/Unknown Source)
	at sun.security.ssl.SSLSocketInputRecord.read(java.base@11.0.11/Unknown Source)
	at sun.security.ssl.SSLSocketInputRecord.readHeader(java.base@11.0.11/Unknown Source)
	at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(java.base@11.0.11/Unknown Source)
	at sun.security.ssl.SSLSocketImpl.readApplicationRecord(java.base@11.0.11/Unknown Source)
	at sun.security.ssl.SSLSocketImpl$AppInputStream.read(java.base@11.0.11/Unknown Source)
	at java.io.BufferedInputStream.fill(java.base@11.0.11/Unknown Source)
	at java.io.BufferedInputStream.read1(java.base@11.0.11/Unknown Source)
	at java.io.BufferedInputStream.read(java.base@11.0.11/Unknown Source)
	- locked <0x00000007ffcddf98> (a java.io.BufferedInputStream)
	at sun.net.www.http.HttpClient.parseHTTPHeader(java.base@11.0.11/Unknown Source)
	at sun.net.www.http.HttpClient.parseHTTP(java.base@11.0.11/Unknown Source)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(java.base@11.0.11/Unknown Source)
	- locked <0x00000007ffe82d30> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(java.base@11.0.11/Unknown Source)
	- locked <0x00000007ffe82d30> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
	at java.net.HttpURLConnection.getResponseCode(java.base@11.0.11/Unknown Source)
	at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(java.base@11.0.11/Unknown Source)
	at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
	at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:144)
	at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
	at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:996)
	at com.google.cloud.storage.spi.v1.HttpStorageRpc.write(HttpStorageRpc.java:753)
	at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:60)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.11/Unknown Source)
	at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105)
	at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
	at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
	at com.google.cloud.storage.BlobWriteChannel.flushBuffer(BlobWriteChannel.java:53)
	at com.google.cloud.BaseWriteChannel.flush(BaseWriteChannel.java:112)
	at com.google.cloud.BaseWriteChannel.write(BaseWriteChannel.java:139)
	at org.apache.pinot.plugin.filesystem.GcsPinotFS.copyFromLocalFile(GcsPinotFS.java:353)
	at org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers.segmentUpload(LLCSegmentCompletionHandlers.java:367)
	at jdk.internal.reflect.GeneratedMethodAccessor181.invoke(Unknown Source)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/Unknown Source)
	at java.lang.reflect.Method.invoke(java.base@11.0.11/Unknown Source)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$$Lambda$380/0x00000008405c8040.invoke(Unknown Source)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
	at org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpContainer.service(GrizzlyHttpContainer.java:353)
	at org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:200)
	at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:569)
	at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:549)
	at java.lang.Thread.run(java.base@11.0.11/Unknown Source)
k
how many controllers do you have
how many segments are getting generated per second and their size?
this should become a bottle neck only at a really large size
l
We have 2 controllers and we have 32 handlers per controller
grizzly-http-server-*
This is a perf test environment. So the number of segments are high
s
How many partitions in your stream? How often does each partition make a segment? How big are the segments? Have you tested the write latency of gcs? How long does it take to write a segment? What does your perf test do? Are you testing query performance or segment completion performace?
l
Number of controllers: 2 Number of request handler threads per controller: 32 (
grizzly-http-server-*
) Number of kafka partitions: 24 Number of segments: 8 segments per minute (1 segment per partition for every 3 minutes) Segment size before compression (pinot server on-disk size): 700MB to 1GB Segment size after compression (in deepstore): 50MB
Above are the numbers for one problematic table with high number of segments.
Have you tested the write latency of gcs?
Nope. I can give a try. But as per observation so far, that’s not issue.
How long does it take to write a segment?
Is there a pinot metric for this? Average segment upload time?
What does your perf test do? Are you testing query performance or segment completion performace?
We are testing our product. Not pinot specifically. Our queries are less frequent and mostly in the time range of last hour.
@Kishore G @Subbu Subramaniam: Any inputs on this please?
k
Its seems more like a setup issue. we need to see where its timing out
check the network io if you have graphs on the controller, its probably saturating that
or check the io to gcs, if you are throttled on the write rate to gcs
its 8GB per minute is 1gbps per second on average
most likely your network is 100mbps and you are saturating it
note 8GB is referring to Bytes and network 1gbps is bits
l
@Kishore G: This is in google cloud. I haven’t really seen network throttling on any node in our cluster. In fact, pinot to GCS network io is very low compared to other nodes in cluster
Also, how did you arrive at 8GB per minute?
Pinot writes tarred/compressed segments to deep store. So, each segment size transferred to deep store is just 50MB
Number of kafka partitions: 24
Number of segments: 8 segments per minute (1 segment per partition for every 3 minutes * 24 partitions)
Segment size before compression (pinot server on-disk size): 700MB to 1GB
Segment size after compression (in deepstore): 50MB
3 segments per minute * 50 MB per segment = 150 MB per minute = 2.5 MBps = 20 mbps
image.png
This is sent bytes from controller which I believe is very low. Also, as per my observations, there is no throttling at all (gcs/network)
Had gone through the codebase, stacktraces and the documentation. Figured out 
peer download
 policy further reduces overhead on controller. • Is my understanding correct? • Is this peer download feature stable and recommended to use in production setups?
k
Yes peer download will solve but it’s advanced and should not be needed at this scale
I took 8 segments of 1gb each per minute
l
700MB to 1GB is the size before compression. But what pinot stores in deep store is compressed one which is 50MB to 70MB
s
@Laxman Ch regarding your original question about split commit and deep store, hopefully this blog will answer the question: https://eng.uber.com/pinot-real-time-ingestion/
From your stacks, it appears that the server is trying to upload data to the controller, and the controller is waiting on that data. Firstly, you can bypass the controller by uploading to deep store directly, like the blog says. Secondly, you may want to check the controller-server bandwidth. Is it too low? It should transfer 50MB within a few seconds (or maybe 10s of seconds) in an 100Mb network. You can check the log on the server side to see if there is an indication of how long it takes. Just
grep
for any segment name on the server logs.
How many servers do you have?
l
How many servers do you have?
12 servers
Thanks for the above info @Subbu Subramaniam.
controller-server bandwidth. Is it too low?
Controllers and servers are running in same subnet (google cloud/k8s). I don’t see any throttling in the network. Thread dumps on controller clearly indicate they are waiting on GCS as posted in this thread.
Had gone through the code of split segment committer (aka
peer download
policy) few days ago. The following snippet of code from
org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory#createSegmentCommitter
backed me away from trying it. The upload timeout is hardcoded to 10 seconds in this policy. Not sure why are we assuming 10 seconds is sufficient for any size of the segment to be uploaded to any deep store.
We may need to make it configurable.
s
Yes, if it is hard-coded, we need to make it configurable.
But, do you think file upload will take more than 10s? Have you tried it?
l
That really depends several factors. Segment size, deep store throughput and latencies, etc.
I didn’t try
peer download
policy but we are seeing high latencies and low throughput from controller to gcs in our setup. So, I am thinking it happens in pinot server as well.
s
Another way to do this is to login to the controller and try to copy a file to/from gcs and see how long it takes.