Hi everyone. I am new to pyflink. I have written a...
# troubleshooting
s
Hi everyone. I am new to pyflink. I have written a pyflink kafka consumer. But I am noticing a delay in consuming the kafka messages. And I am not getting any delay with a non-flink kafka consumer. here is the sample output i am getting when I run the pyflink Kafka consumer
Copy code
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Consumed message: GOOGL,691.27,8432 produced at 2024-07-20 20:01:44.447856
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Time of consumption: 2024-07-20 20:01:45.089603
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Latency: 641.747 ms
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Consumed message: MSFT,1106.14,9041 produced at 2024-07-20 20:01:44.447898
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Time of consumption: 2024-07-20 20:01:45.089821
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Latency: 641.923 ms
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Consumed message: AMZN,1179.2,9676 produced at 2024-07-20 20:01:44.447916
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Time of consumption: 2024-07-20 20:01:45.090019
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Latency: 642.103 ms
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Consumed message: TSLA,673.61,1953 produced at 2024-07-20 20:01:44.447933
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Time of consumption: 2024-07-20 20:01:45.090204
2024-07-20 20:01:45,094 INFO  /Users/stanlythomas/.pyenv/versions/3.9.10/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Latency: 642.2710000000001 ms
I am seeing a delay of more than 500ms between the kafka producer and consumer. I was expecting it to be more real-time. Any idea why this is happening? Following is my Pyflink kafka consumer job
Copy code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common import Types
from pyflink.datastream import MapFunction
from datetime import datetime

# class AddTimestampMapFunction(MapFunction):
#     def map(self, value):
#         current_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
#         return f"{current_timestamp}: {value}"
def process_message(message):
    produced_time = datetime.strptime(message.split('produced at ')[-1], '%Y-%m-%d %H:%M:%S.%f')
    consumed_time = datetime.now()
    latency = (consumed_time - produced_time).total_seconds() * 1000  # in milliseconds
    print(f"Consumed message: {message}")
    print(f"Time of consumption: {consumed_time}")
    print(f"Latency: {latency} ms")

def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(2)
    env.get_config().set_auto_watermark_interval(100)  # Set watermark interval to 100 ms
    
    kafka_props = {
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'test-group',
        'auto.offset.reset': 'earliest',
        '<http://fetch.max.wait.ms|fetch.max.wait.ms>': '10',  # Lower value to fetch messages more frequently
        'fetch.min.bytes': '1',
        'max.poll.records': '1',  # Process messages as soon as they arrive
        '<http://session.timeout.ms|session.timeout.ms>': '30000',  # Increase session timeout for stability
        '<http://heartbeat.interval.ms|heartbeat.interval.ms>': '3000'  # Increase heartbeat interval for stability
    }
    
    kafka_source = FlinkKafkaConsumer(
        topics='ticks',
        deserialization_schema=SimpleStringSchema(),
        properties=kafka_props
    )
    
    kafka_source.set_start_from_earliest()
    
    stream = env.add_source(kafka_source)
    
    # processed_stream = stream.map(AddTimestampMapFunction(), output_type=Types.STRING()).set_parallelism(2)
    
    # processed_stream.print()
    stream.map(lambda message: process_message(message))
    
    env.execute('Flink Kafka Consumer Job')

if __name__ == "__main__":
    main()