Hi all, while trying to restore our indicies from ...
# troubleshoot
r
Hi all, while trying to restore our indicies from the DB to a fresh OpenSearch cluster, some messages could not be processed due to:
Copy code
2023-02-27 15:06:41.598 ERROR 1 --- [ool-10-thread-1] c.l.m.dao.producer.KafkaHealthChecker    : Failed to emit MCL for entity urn:li:dataHubExecutionRequest:#Snowflake-2023_02_20-09_43_34

org.apache.#Kafka.common.errors.RecordTooLargeException: The message is 1633361 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
I've already increased the allowed message size for the topic (
max.message.bytes
) and the Kafka cluster (
replica.fetch.max.bytes
). I do not find any config parameter to adjust the producer's
max.request.size
, i.e., datahub-upgrade, though. Same for consumer side - how to increase
max.partition.fetch.bytes
for MCL consumer? Any help here?
a
Hi @rich-pager-68736, could you share your deployment method? Depending on that you’ll have to change recipes to increase allocation
r
Sure, DataHub's deployed on EKS, based on your datahub-helm scripts. With manual adjustments, though. The infra is all on AWS, i.e., OpenSearch, Aurora PostgreSQL, MSK. Deployed via Terraform.
a
I think you should be able to change this manually in the kafka-container recipe
r
indeed, thanks. I had not found the right config property when I was looking at the documentation first. For anyone else stumbling across this issue: just set the ENV accordingly:
Copy code
- name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
  value: "5242880"
or
Copy code
- name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
  value: "5242880"
h
I implemented this, but now I got a new error:
Copy code
2023-09-26 19:21:49,860 [qtp350068407-182] INFO  c.l.m.r.entity.AspectResource:186 - INGEST PROPOSAL proposal: {aspectName=dataHubExecutionRequestResult, entityKeyAspect={contentType=application/json, value=ByteString(length=46,bytes=7b226964...6362227d)}, entityType=dataHubExecutionRequest, aspect={contentType=application/json, value=ByteString(length=1184594,bytes=7b227374...3337387d)}, changeType=UPSERT}
2023-09-26 19:21:50,046 [kafka-producer-network-thread | producer-1] ERROR c.l.m.d.producer.KafkaHealthChecker:34 - Failed to emit MCL for entity urn:li:dataHubExecutionRequest:30e0824e-6c9f-4035-8fb8-b5c3b2250acb
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2023-09-26 19:21:50,047 [qtp350068407-182] ERROR c.l.m.entity.EntityServiceImpl:678 - Failed to produce MCLs: [UpsertBatchItem{urn=urn:li:dataHubExecutionRequest:30e0824e-6c9f-4035-8fb8-b5c3b2250acb, aspectName='dataHubExecutionRequestResult', systemMetadata={lastObserved=1695756109880, lastRunId=no-run-id-provided, runId=no-run-id-provided}, aspect={report=~~~~ Execution Summary - RUN_INGEST ~~~~
Execution finished with errors.
the request included a message larger than the max message size the server will accept??
a
I'm in the same situation!!! I added SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES and SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE to the values.yaml, but i'm still receiving the same error when ingesting from Snowflake. Since my kafka is running on AWS MSK, do i need to modify its configuration? gms:
Copy code
org.apache.kafka.common.errors.RecordTooLargeException: The message is 14145815 bytes when serialized which is larger than 5242880, which is the value of the max.request.size configuration.
datahub-acryl
Copy code
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]  'failures': [{'error': 'Unable to emit metadata to DataHub GMS',
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                'info': {'message': "HTTPConnectionPool(host='datahub-datahub-gms', port=8080): Max retries exceeded with url: "
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                                    "/aspects?action=ingestProposal (Caused by ResponseError('too many 500 error responses'))",
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                         'id': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,abc - DEV.a01.def.def,DEV)'}},
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]               {'error': 'Unable to emit metadata to DataHub GMS: Cannot parse request entity',
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                'info': {'exceptionClass': 'com.linkedin.restli.server.RestLiServiceException',
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                         'message': 'Cannot parse request entity',
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                         'status': 400,
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                         'id': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,abc - DEV.a01.a02.a03,DEV)'}},
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]               {'error': 'Unable to emit metadata to DataHub GMS',
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                'info': {'message': "HTTPConnectionPool(host='datahub-datahub-gms', port=8080): Max retries exceeded with url: "
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                                    "/aspects?action=ingestProposal (Caused by ResponseError('too many 500 error responses'))",
[a27c81d4-bdcd-4f12-8437-9ed998489ab7 logs]                         'id': 'urn:li:dataset:(urn:li:dataPlatform:snowflake,abc - DEV.a01.a02.a03_incremental,DEV)'}},
h
yep, im also seeing this come up on some other threads i'm on
b
Hi, under which deployment (datahub-gms ?) in the value.yaml file should I add the env variables SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES and SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
i
Well, using the community helm chart, I solved the problem as follows: • I added the
maxMessageBytes: "5000000"
parameter to prerequisites-kafka as prescribed in the chart • Then I added the variables
SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
and
SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
with the same value as the variable above in the GMS components, RestoreIndicesJob and the consumers (if you are using the standalone consumers) As the chart does not have these variables by default, I included them directly in the
deployment.yaml
files which can be found in
chats/datahub/subcharts/*/templates/deployment.yaml
(link to the repository)
l
Hello @adorable-sugar-76640 I am seeing the same issue in our environment while ingesting from snowflake .I have updated the kafka message size and also enabled the compression .Below are the kafka environment variables for GMS app, please let us know if you were able to figure out the fix . @astonishing-answer-96712 @gray-shoe-75895 @brainy-tent-14503 can you please suggest if there is anything we need to update to compress the message size on producer side Error Message "
Copy code
[e1af695d-dcec-4cbc-bd71-e27024d367c7 logs] {'error': 'Unable to emit metadata to DataHub GMS: java.lang.RuntimeException: java.util.concurrent.ExecutionException: '
[e1af695d-dcec-4cbc-bd71-e27024d367c7 logs] 'org.apache.kafka.common.errors.RecordTooLargeException: The message is 14375642 bytes when serialized which is larger than '
"
Copy code
- name: KAFKA_BOOTSTRAP_SERVER
  value: prerequisites-kafka:9092
- name: KAFKA_PRODUCER_COMPRESSION_TYPE
  value: snappy
- name: KAFKA_PRODUCER_MAX_REQUEST_SIZE
  value: "3145728"
- name: KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES
  value: "3145728"
- name: KAFKA_SCHEMAREGISTRY_URL
  value: <http://prerequisites-cp-schema-registry:8081>
- name: SCHEMA_REGISTRY_TYPE
  value: KAFKA
- name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
  value: "3145728"
- name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
  value: "3145728"
- name: SPRING_KAFKA_PRODUCER_PROPERTIES_COMPRESSION_TYPE
  value: snappy
r
@lively-dress-27742 Hello! I have experienced the same issue on datahub 0.11.0. Before updating to 0.12.0, I had to set:
Copy code
datahub-gms:
  enabled: true
  ...
  extraEnvs:
    - name: KAFKA_LISTENER_CONCURRENCY
      value: "3"
    - name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
      value: "15728640"
    - name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
      value: "15728640"

datahub-mae-consumer:
...
  extraEnvs:
    - name: KAFKA_LISTENER_CONCURRENCY
      value: "3"
    - name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
      value: "15728640"
    - name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
      value: "15728640"

datahub-mce-consumer:
...
  extraEnvs:
    - name: KAFKA_LISTENER_CONCURRENCY
      value: "3"
    - name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
      value: "15728640"
    - name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
      value: "15728640"

kafkaSetupJob:
...
  extraEnvs:
    - name: SPRING_KAFKA_PROPERTIES_MAX_MESSAGE_BYTES
      value: "15728640"
    - name: KAFKA_LISTENER_CONCURRENCY
      value: "3"
    - name: SPRING_KAFKA_PRODUCER_PROPERTIES_MAX_REQUEST_SIZE
      value: "15728640"
    - name: SPRING_KAFKA_CONSUMER_PROPERTIES_MAX_PARTITION_FETCH_BYTES
      value: "15728640"
in my
values.yaml
(I was going for 15Mb). However, my topics were already created, so I had to change
max.message.bytes=15728640
manually (I use an old version of kafka - 2.3.1). You can do this by connecting to one of your brokers and changing the value for each datahub topic (calling
kafka-configs.sh
). In my case (this depends on your kafka version, so its better to check the doc) I did something like:
Copy code
kafka-configs.sh --zookeeper $ZOOKEEPER_ENDPOINT \
                --alter --entity-type topics \
                --entity-name MetadataChangeProposal_v1 \
                --add-config max.message.bytes=15728640
(and then the same for
FailedMetadataChangeProposal_v1
,
MetadataChangeLog_Versioned_v1
,
MetadataChangeLog_Timeseries_v1
,
PlatformEvent_v1, DataHubUsageEvent_v1
) Hope that helps! If you are using datahub 0.12.0 you don't need to set these variables manually in your values.yaml. In my case, I could replace the environment vars for:
Copy code
global:
  ...
  kafka:
  ...
    maxMessageBytes: "15728640"
    producer:
      compressionType: snappy
      maxRequestSize: "15728640"
    consumer:
      maxPartitionFetchBytes: "15728640"
    partitions: 3
    replicationFactor: 3
b
The latest helm charts use v0.12.1 along with configuration to support larger message sizes
👍 1