Perry Huang
07/02/2022, 10:42 AMVivi Huang
07/02/2022, 3:42 PMChengxuan Wang
07/05/2022, 7:59 PMyidan zhao
07/12/2022, 2:22 AM# (1) Start Kubernetes session
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
# (2) Submit example job
$ ./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
./examples/streaming/TopSpeedWindowing.jar
# (3) Stop Kubernetes session by deleting cluster deployment
$ kubectl delete deployment/my-first-flink-cluster
我参考步骤1创建了集群,然后运行第2个步骤,显示如下:
./bin/flink run \
> --target kubernetes-session \
> -Dkubernetes.cluster-id=my-first-flink-cluster \
> ./examples/streaming/TopSpeedWindowing.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/work/flink-1.15.0/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/work/hadoop-client-2.7.5/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See <http://www.slf4j.org/codes.html#multiple_bindings> for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
2022-07-12 10:23:23,021 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
2022-07-12 10:23:23,027 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Retrieve flink cluster my-first-flink-cluster successfully, JobManager Web Interface: <http://my-first-flink-cluster-rest.test:8081>
2022-07-12 10:23:23,044 WARN org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Please note that Flink client operations(e.g. cancel, list, stop, savepoint, etc.) won't work from outside the Kubernetes cluster since 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'CarTopSpeedWindowingExample'.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'CarTopSpeedWindowingExample'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2108)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:188)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:119)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1969)
at org.apache.flink.streaming.examples.windowing.TopSpeedWindowing.main(TopSpeedWindowing.java:154)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 11 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:434)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:399)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:109)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.setFailure(DefaultChannelPromise.java:89)
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:214)
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:1008)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted.
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:395)
... 33 more
Caused by: java.util.concurrent.CompletionException: java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name or service not known
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 31 more
Caused by: java.net.UnknownHostException: my-first-flink-cluster-rest.test: Name or service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
at java.net.InetAddress.getAllByName(InetAddress.java:1193)
at java.net.InetAddress.getAllByName(InetAddress.java:1127)
at java.net.InetAddress.getByName(InetAddress.java:1077)
at org.apache.flink.shaded.netty4.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at org.apache.flink.shaded.netty4.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.security.AccessController.doPrivileged(Native Method)
at org.apache.flink.shaded.netty4.io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at org.apache.flink.shaded.netty4.io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at org.apache.flink.shaded.netty4.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at org.apache.flink.shaded.netty4.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at org.apache.flink.shaded.netty4.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at org.apache.flink.shaded.netty4.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at org.apache.flink.shaded.netty4.io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
... 20 more
如上,我的flink集群创建在test这个namespace下,貌似如上命令会找到对应的地址为 http://my-first-flink-cluster-rest.test:8081 ,但这个DNS貌似本地访问不到(我执行的机器是k8s的机器)。吃喵酱的花椒
07/12/2022, 9:08 AMTian Yang
07/21/2022, 7:40 AMZhenyu Xing
08/03/2022, 8:02 AMQinghui Xu
08/04/2022, 5:14 PMZhenyu Xing
08/09/2022, 8:29 AMKyle Meow
08/16/2022, 7:55 AMpingzhong
08/18/2022, 1:15 PMEric Liu
01/13/2023, 4:31 AMblob.fetch.num-concurrent
从50增加到200还是不行.. (Flink1.15.3)
2023-01-09 06:26:28,780 ERROR org.apache.flink.runtime.blob.BlobServerConnection [] - Error while executing BLOB connection.
java.io.IOException: Unknown operation 71
at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116) [flink-dist-1.15.3.jar:1.15.3]
肖文浩
02/16/2023, 5:54 AMTing Yin
02/21/2023, 2:43 AMTing Yin
02/21/2023, 2:45 AMZhiyu Tian
02/23/2023, 4:22 AMZhiyu Tian
03/09/2023, 2:50 AMBruce Lee (Unicorn)
03/16/2023, 9:10 AMBruce Lee (Unicorn)
04/04/2023, 10:03 AMBruce Lee (Unicorn)
04/07/2023, 1:44 AM刘路
05/10/2023, 6:29 AMakira
06/01/2023, 6:46 AMStan Hsu
07/13/2023, 2:54 AMINSERT INTO tidb_order_labels
WITH kafka_ticdc_orders_log_parsed AS (
-- tidb的cdc資料
SELECT
*
FROM
kafka_ticdc_orders_log
),
orders_all_state AS (
-- 利用kafka的order_id去查tidb的order歷程表,tidb_orders_log在tidb裡面pk是order_id+updated_at,
-- 所以來了一筆order_A,如果他有三筆歷程,就會查回像是order_A,T1/order_A,T2/order_A,T3
SELECT
orders_log.*,
kafka_ticdc_orders_log_parsed.proc_time
FROM
kafka_ticdc_orders_log_parsed
JOIN
tidb_orders_log FOR SYSTEM_TIME AS OF kafka_ticdc_orders_log_parsed.proc_time AS orders_log
ON
kafka_ticdc_orders_log_parsed.order_id = orders_log.order_id
),
orders_enrich_previous_state AS (
SELECT
order_id,
seller_id,
updated_at,
status,
previous_status
FROM (
-- 利用剛剛從tidb查回的歷程資料,算出每筆歷程的previous狀態,如下
-- order_id,updated_at,status, previous_status
-- order_A ,T1 ,active, null
-- order_A ,T2 ,cancelled, active
-- order_A ,T3 ,removed, active
-- order_A ,T3 ,removed, cancelled
-- 利用row_number去重複
-- order_id,updated_at,status, previous_status
-- order_A ,T1 ,active, null
-- order_A ,T2 ,cancelled, active
-- order_A ,T3 ,removed, cancelled
SELECT
t1.*,
t2.total_cents AS previous_total_cents,
0 AS previous_total_cost,
t2.status AS previous_status,
t2.edited_at AS previous_edited_at,
ROW_NUMBER() OVER (
PARTITION BY
t1.order_id, t1.updated_at
ORDER BY
t2.updated_at DESC
) AS rn
FROM
orders_all_state AS t1
LEFT JOIN
orders_all_state AS t2
ON
t1.order_id = t2.order_id
AND t1.updated_at > t2.updated_at
)
WHERE
rn = 1
)
SELECT
order_id,
seller_id AS merchant_id,
updated_at,
status,
previous_status,
CURRENT_TIMESTAMP as _processed_at
FROM
orders_enrich_previous_state
WHERE
order_id IS NOT NULL
AND updated_at IS NOT NULL
;
Mike Zhang
08/02/2023, 2:24 AMStephen Lii
08/16/2023, 11:08 AMStephen Lii
08/16/2023, 11:08 AMhan liu
09/11/2023, 2:57 PMZhenjiu Tian
10/30/2023, 8:53 AMXI XI
11/03/2023, 2:13 AMZhenjiu Tian
01/02/2024, 3:57 AM