https://pinot.apache.org/ logo
e

Elon

02/09/2021, 9:44 PM
We had an issue where confluent schema registry had downtime, and realtime ingestion failed with "Read from kafka failed" but did not recover until we restarted the servers. Is this a known issue? Or is there something else we could have done? The realtime table was on the default tenant and I issued a rebalance, that did not help (old data was there but no consuming segments).
x

Xiang Fu

02/09/2021, 9:50 PM
i feel we should have a cache for schema?
e

Elon

02/09/2021, 9:50 PM
here's the stack trace we saw:
x

Xiang Fu

02/09/2021, 9:50 PM
for confluent schema client
e

Elon

02/09/2021, 9:52 PM
Copy code
2021/02/09 20:45:06.889 ERROR [ServerSegmentCompletionProtocolHandler] [oas_integration_operation_event__3__331__20210209T2044Z] Could not send request <http://pinot-us-central1-contro>
ller-1.pinot-us-central1-controller-headless.pinot.svc.cluster.local:9000/segmentStoppedConsuming?name=oas_integration_operation_event__3__331__20210209T2044Z&offset=9259054&instance=
Server_pinot-us-central1-server-0.pinot-us-central1-server-headless.pinot.svc.cluster.local_8098&reason=org.apache.kafka.common.errors.SerializationException&streamPartitionMsgOffset=
9259054
java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method) ~[?:?]
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:115) ~[?:?]
        at java.net.SocketInputStream.read(SocketInputStream.java:168) ~[?:?]
        at java.net.SocketInputStream.read(SocketInputStream.java:140) ~[?:?]
        at shaded.org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) ~[pinot-all-0.6.0-jar-with-dependencies.jar:0.6.0-7115670e5c2b152812a09bd0
1
x

Xiang Fu

02/09/2021, 9:53 PM
this seems to be pinot controller issue not schema registry?
e

Elon

02/09/2021, 9:56 PM
Oh, sorry, wrong log, let me see if I can still get it
I lost the log message, but have you ever seen an error reading from kafka (ex. kafka is down) - and then realtime ingestion stops until the servers are restarted?
x

Xiang Fu

02/09/2021, 10:00 PM
ic
do we configured this MaxCachedSchemas
e

Elon

02/09/2021, 10:01 PM
It's default to 1000
x

Xiang Fu

02/09/2021, 10:01 PM
ok
e

Elon

02/09/2021, 10:01 PM
I think this was more that the kafka consumer failed due to schema registry being down (for ~1 hour) and then ingestion stopped, did not see any errors in externalview or idealstate, also did not see consuming segments until I restarted the servers
x

Xiang Fu

02/09/2021, 10:01 PM
@Neha Pawar ^^
I saw that this client is inside the avrodeserializer
e

Elon

02/09/2021, 10:02 PM
Yep
x

Xiang Fu

02/09/2021, 10:02 PM
so if there is no msg consumed from kafka then this logic shouldn’t be triggered
n

Neha Pawar

02/09/2021, 10:07 PM
i think the behavior is by design. The consumer was likely seeing exceptions when trying to consume from kafka with the schema registry down.
Copy code
while (!_shouldStop && !endCriteriaReached()) {
      // Consume for the next readTime ms, or we get to final offset, whichever happens earlier,
      // Update _currentOffset upon return from this method
      MessageBatch messageBatch;
      try {
        messageBatch = _partitionLevelConsumer
            .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
        consecutiveErrorCount = 0;
      } catch (TimeoutException e) {
        handleTransientStreamErrors(e);
        continue;
      } catch (TransientConsumerException e) {
        handleTransientStreamErrors(e);
        continue;
      } catch (PermanentConsumerException e) {
        segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
        throw e;
      } catch (Exception e) {
        // Unknown exception from stream. Treat as a transient exception.
        // One such exception seen so far is java.net.SocketTimeoutException
        handleTransientStreamErrors(e);
        continue;
      }
if permanent exception or more than 5 transient exceptions, we stop consuming by marking the consuming segment offline
and an operator would need to either reset the partition, or restart the server
e

Elon

02/09/2021, 10:08 PM
Thanks:) This helps
Is there a metric we can use to detect when there's a delay between kafka topic and pinot realtime table?
n

Neha Pawar

02/09/2021, 10:13 PM
that metric is much requested but much missing 🙈 i recall it has something to do with Kafka not exposing the “latest” offset in an easy way.
😄 1
but i remember in LinkedIn we used to use
Copy code
ServerGauge.LLC_PARTITION_CONSUMING
this flag turns to 0, whenever the partition is not consuming
we used to have alerts that fired if this metric stayed 0 for > 15 minutes on any partition
e

Elon

02/09/2021, 10:16 PM
Thanks! I'll check that one out.
👍 1
@Neha Pawar that gauge works for us! The cluster usually has a steady rate of ingestion so we alert on sudden decrease and so far it's working as expected.
n

Neha Pawar

02/11/2021, 8:09 PM
awesome