Hi All, I am trying to read messages from kafka a...
# troubleshooting
i
Hi All, I am trying to read messages from kafka and write to doris. I am using docker compose for the setup. When the sql job tries to write, i get this exception:
2024-08-06 13:24:42
java.lang.Exception: Could not perform checkpoint 5 for operator Source: source_table[64] -> analyticstest[65]: Writer -> analyticstest[65]: Committer (1/1)#4. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1254) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$15(StreamTask.java:1201) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.doris.flink.exception.DorisRuntimeException: table analyticstest.sinkfromkafka stream load error: [ANALYSIS_ERROR]TStatus: errCode = 3, detailMessage = tablet 11081 alive replica num 0 < load required replica num 1, alive backends: [] 0# doris::Status doris:Status:createtrue(doris::TStatus const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187 1# doris:StreamLoadAction:_process_put(doris::HttpRequest*, std::shared_ptrdoris::StreamLoadContext) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449 2# doris:StreamLoadAction:_on_header(doris::HttpRequest*, std::shared_ptrdoris::StreamLoadContext) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701 3# doris:StreamLoadAction:on_header(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:345 4# doris:EvHttpServer:on_header(evhttp_request*) at /home/zcp/repo_center/doris_release/doris/be/src/http/ev_http_server.cpp:255 5# ? 6# bufferevent_run_readcb_ 7# ? 8# ? 9# ? 10# ? 11# std::_Function_handlervoid (), doris::EvHttpServer::start()::$_0::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/atomicity.h:98 12# doris:ThreadPool:dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0 13# doris:🧵:supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562 14# ? 15# __clone , see more in null at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:274) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:323) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242) ... 14 more Any help would be much appreciated. Below is the java code for the job:
Copy code
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(50000);

// Set up the Flink table environment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String create_source_table = "CREATE TABLE source_table (\n" +
        "    field1 STRING,\n" +
        "    field2 STRING\n" +  // Removed the comma here
        ") WITH (\n" +
        "    'connector' = 'kafka',\n" +
        "    'topic' = 'input-topic',\n" +
        "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
        "    'properties.group.id' = 'my-group1',\n" +
        "    'scan.startup.mode' = 'earliest-offset',\n" +
        "    'format' = 'csv'\n" +
        ")";
tableEnv.executeSql(create_source_table);

String create_sink_table = "CREATE TABLE analyticstest (\n" +
        "    field1 STRING,\n" +
        "    field2 STRING\n" +  // Removed the comma here
        ") WITH (\n" +
        "    'connector' = 'doris',\n" +
        "    'fenodes' = 'doris:8030',\n" +
        "    'table.identifier' = 'analyticstest.sinkfromkafka',\n" +
        "    'username' = 'root' ,\n" +
        "    'password' = ''\n" +
        ")";
tableEnv.executeSql(create_sink_table);

String insertQuery = "INSERT INTO analyticstest " +
        "SELECT field1, field2 " +
        "FROM source_table " ;

tableEnv.executeSql(insertQuery);
Please note that when I try to read from doris and write to doris it works just fine
d
The error indicates that the Doris table you’re trying to write to analyticstest has an issue with its tablets. It means that there are no available replicas for tablet 11081 to handle the write operation, To resolve this issue: 1. Ensure all Doris backend nodes are up and running properly. You can use Doris’s built-in commands or the web UI to check this. 2. Use Doris’s admin show command to inspect the tablet replication status. For instance, run SHOW TABLET 11081 to get details about the problem tablet. 3. Fix any unavailable replicas or missing replicas. add new nodes if needed. Failures could also be due to network issues. 4. Rebalance the tablets if needed. to ensure they are evenly distributed across healthy nodes. You can use the ALTER SYSTEM BALANCE command. 5. Check configuration and Run again. Also be sure to turn on Doris logging/debugging
Let us know how it goes and what the results of these checks are.
Also I did not see Flink version and Doris version but I might have missed it.
i
@D. Draco O'Brien thank you very much for the response. At the end i abandoned this effort and created a new docker compose file where i built the doris images manually. It all works as expected, i can paste the docker compose here for anyone that needs it.
d
Ok, that would be great! Thanks! I think many will want to consider using Doris with Flink.
i
here it is:
Copy code
version: "3"
name: analyticspoc
services:
    docker-doris-fe-01:
        image: apache-doris:2.1.5-fe
        container_name: doris-fe-01
        hostname: "fe1"
        environment:
           - FE_SERVERS=fe1:172.20.80.2:9010
           - FE_ID=1
        ports:
           - 8030:8030
           - 9030:9030
        volumes:
           - /data/fe1/doris-meta:/opt/apache-doris/fe/doris-meta
           - /data/fe1/log:/opt/apache-doris/fe/log
        networks:
           kafka-net:
              ipv4_address: 172.20.80.2
    docker-doris-be-01:
        image: apache-doris:2.1.5-be
        container_name: doris-be-01
        hostname: "be1"
        depends_on:
           - docker-doris-fe-01
        environment:
           - FE_SERVERS=fe1:172.20.80.2:9010
           - BE_ADDR=172.20.80.5:9050
        ports:
           - 8040:8040
           - 9060:9060
        volumes:
           - /data/be1/storage:/opt/apache-doris/be/storage
           - /data/be1/script:/docker-entrypoint-initdb.d
           - /data/be1/log:/opt/apache-doris/be/log
        networks:
           kafka-net:
              ipv4_address: 172.20.80.5
    zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        networks:
          kafka-net:
             ipv4_address: 172.20.80.6
    kafka:
        image: wurstmeister/kafka
        container_name: kafka
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: <INSIDE://172.20.80.7:9092>,<OUTSIDE://172.20.80.7:9093>
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
          KAFKA_LISTENERS: <INSIDE://172.20.80.7:9092>,<OUTSIDE://172.20.80.7:9093>
          KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
          KAFKA_ZOOKEEPER_CONNECT: 172.20.80.6:2181
          KAFKA_CREATE_TOPICS: "baeldung:1:1"
        networks:
          kafka-net:
             ipv4_address: 172.20.80.7
    taskmanager:
      image: my-flink
      container_name: taskmanager
      hostname: taskmanager
      depends_on:
        - jobmanager
      command: taskmanager
      scale: 1
      environment:
        - |
          FLINK_PROPERTIES=
          jobmanager.rpc.address: 172.20.80.9
          taskmanager.numberOfTaskSlots: 6
      networks:
          kafka-net:
             ipv4_address: 172.20.80.8
    jobmanager:
      image: my-flink
      container_name: jobmanager
      hostname: jobmanager
      ports:
        - "8082:8082"
      command: jobmanager
      environment:
        - |
          FLINK_PROPERTIES=
          jobmanager.rpc.address: 172.20.80.9
          rest.address: 172.20.80.9
          rest.port: 8082
      networks:
          kafka-net:
             ipv4_address: 172.20.80.9
networks:
  kafka-net:
#    driver: bridge
    ipam:
      config:
        - subnet: 172.20.80.0/24
👍 1
i used this link to build the images
the only tricky part is that before building the docker images, in order to avoid the disable swap notofication. we need to open the compressed file /apache-doris-2.1.5-bin-x64/be/bin/start_be.sh and uncomment the line where it says "exit 1" just under the lines
Copy code
if [[ "$(swapon -s | wc -l)" -gt 1 ]]; then
        echo "Please disable swap memory before installation."
🙌 1
just make sure that you put in the docker compose file your internal ip, what you see above is just a random one