ioannis ntantis
08/06/2024, 10:32 AM2024-08-06 13:24:42
java.lang.Exception: Could not perform checkpoint 5 for operator Source: source_table[64] -> analyticstest[65]: Writer -> analyticstest[65]: Committer (1/1)#4.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1254)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$15(StreamTask.java:1201)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: table analyticstest.sinkfromkafka stream load error: [ANALYSIS_ERROR]TStatus: errCode = 3, detailMessage = tablet 11081 alive replica num 0 < load required replica num 1, alive backends: []
0# doris::Status doris:Status:createtrue(doris::TStatus const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
1# doris:StreamLoadAction:_process_put(doris::HttpRequest*, std::shared_ptrdoris::StreamLoadContext) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:449
2# doris:StreamLoadAction:_on_header(doris::HttpRequest*, std::shared_ptrdoris::StreamLoadContext) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:701
3# doris:StreamLoadAction:on_header(doris::HttpRequest*) at /home/zcp/repo_center/doris_release/doris/be/src/common/status.h:345
4# doris:EvHttpServer:on_header(evhttp_request*) at /home/zcp/repo_center/doris_release/doris/be/src/http/ev_http_server.cpp:255
5# ?
6# bufferevent_run_readcb_
7# ?
8# ?
9# ?
10# ?
11# std::_Function_handlervoid (), doris::EvHttpServer::start()::$_0::_M_invoke(std::_Any_data const&) at /var/local/ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/ext/atomicity.h:98
12# doris:ThreadPool:dispatch_thread() at /home/zcp/repo_center/doris_release/doris/be/src/util/threadpool.cpp:0
13# doris:🧵:supervise_thread(void*) at /var/local/ldb_toolchain/bin/../usr/include/pthread.h:562
14# ?
15# __clone
, see more in null
at org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:274)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.emitCommittables(SinkWriterOperator.java:198)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:168)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:323)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242)
... 14 more
Any help would be much appreciated. Below is the java code for the job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(50000);
// Set up the Flink table environment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String create_source_table = "CREATE TABLE source_table (\n" +
" field1 STRING,\n" +
" field2 STRING\n" + // Removed the comma here
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'input-topic',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'properties.group.id' = 'my-group1',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'csv'\n" +
")";
tableEnv.executeSql(create_source_table);
String create_sink_table = "CREATE TABLE analyticstest (\n" +
" field1 STRING,\n" +
" field2 STRING\n" + // Removed the comma here
") WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = 'doris:8030',\n" +
" 'table.identifier' = 'analyticstest.sinkfromkafka',\n" +
" 'username' = 'root' ,\n" +
" 'password' = ''\n" +
")";
tableEnv.executeSql(create_sink_table);
String insertQuery = "INSERT INTO analyticstest " +
"SELECT field1, field2 " +
"FROM source_table " ;
tableEnv.executeSql(insertQuery);
Please note that when I try to read from doris and write to doris it works just fineD. Draco O'Brien
08/06/2024, 5:25 PMD. Draco O'Brien
08/06/2024, 5:26 PMD. Draco O'Brien
08/06/2024, 5:27 PMioannis ntantis
08/08/2024, 8:09 AMD. Draco O'Brien
08/08/2024, 8:27 AMioannis ntantis
08/08/2024, 8:46 AMioannis ntantis
08/08/2024, 8:46 AMversion: "3"
name: analyticspoc
services:
docker-doris-fe-01:
image: apache-doris:2.1.5-fe
container_name: doris-fe-01
hostname: "fe1"
environment:
- FE_SERVERS=fe1:172.20.80.2:9010
- FE_ID=1
ports:
- 8030:8030
- 9030:9030
volumes:
- /data/fe1/doris-meta:/opt/apache-doris/fe/doris-meta
- /data/fe1/log:/opt/apache-doris/fe/log
networks:
kafka-net:
ipv4_address: 172.20.80.2
docker-doris-be-01:
image: apache-doris:2.1.5-be
container_name: doris-be-01
hostname: "be1"
depends_on:
- docker-doris-fe-01
environment:
- FE_SERVERS=fe1:172.20.80.2:9010
- BE_ADDR=172.20.80.5:9050
ports:
- 8040:8040
- 9060:9060
volumes:
- /data/be1/storage:/opt/apache-doris/be/storage
- /data/be1/script:/docker-entrypoint-initdb.d
- /data/be1/log:/opt/apache-doris/be/log
networks:
kafka-net:
ipv4_address: 172.20.80.5
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
kafka-net:
ipv4_address: 172.20.80.6
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: <INSIDE://172.20.80.7:9092>,<OUTSIDE://172.20.80.7:9093>
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: <INSIDE://172.20.80.7:9092>,<OUTSIDE://172.20.80.7:9093>
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: 172.20.80.6:2181
KAFKA_CREATE_TOPICS: "baeldung:1:1"
networks:
kafka-net:
ipv4_address: 172.20.80.7
taskmanager:
image: my-flink
container_name: taskmanager
hostname: taskmanager
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: 172.20.80.9
taskmanager.numberOfTaskSlots: 6
networks:
kafka-net:
ipv4_address: 172.20.80.8
jobmanager:
image: my-flink
container_name: jobmanager
hostname: jobmanager
ports:
- "8082:8082"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: 172.20.80.9
rest.address: 172.20.80.9
rest.port: 8082
networks:
kafka-net:
ipv4_address: 172.20.80.9
networks:
kafka-net:
# driver: bridge
ipam:
config:
- subnet: 172.20.80.0/24
ioannis ntantis
08/08/2024, 8:47 AMioannis ntantis
08/08/2024, 8:47 AMioannis ntantis
08/08/2024, 9:02 AMif [[ "$(swapon -s | wc -l)" -gt 1 ]]; then
echo "Please disable swap memory before installation."
ioannis ntantis
08/08/2024, 9:51 AM