chunilal kukreja
08/01/2022, 12:07 PMPedro Cunha
08/01/2022, 3:12 PMIndexOutOfBounds
exceptions on the KyroSerializer
and it looks like one of the field’s name is being leaked and being considered a class name. Again, this happens just by adding a new String field. Any ideas?Jaya Ananthram
08/01/2022, 3:48 PMJaya Ananthram
08/01/2022, 6:01 PMSylvia Lin
08/01/2022, 6:57 PM$ kubectl describe hpa -n dataeng-admin
Name: basic-hpa
Namespace: dataeng-admin
Labels: <http://kustomize.toolkit.fluxcd.io/name=flux-system|kustomize.toolkit.fluxcd.io/name=flux-system>
<http://kustomize.toolkit.fluxcd.io/namespace=flux-system|kustomize.toolkit.fluxcd.io/namespace=flux-system>
Annotations: <none>
CreationTimestamp: Mon, 01 Aug 2022 11:02:50 -0700
Reference: FlinkDeployment/basic-example
Metrics: ( current / target )
resource cpu on pods (as a percentage of request): <unknown> / 30%
Min replicas: 1
Max replicas: 8
FlinkDeployment pods: 5 current / 0 desired
Conditions:
Type Status Reason Message
---- ------ ------ -------
AbleToScale True SucceededGetScale the HPA controller was able to get the target's current scale
ScalingActive False InvalidSelector the HPA target's scale is missing a selector
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning FailedComputeMetricsReplicas 44m (x12 over 47m) horizontal-pod-autoscaler selector is required
Warning SelectorRequired 2m14s (x181 over 47m) horizontal-pod-autoscaler selector is required
Did I miss anything here? The manifest is attached.Isaac
08/01/2022, 7:18 PMHunter Medney
08/01/2022, 7:39 PMStephan Weinwurm
08/01/2022, 11:24 PMTraceback (most recent call last):
File "/src/.venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/src/.venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await <http://self.app|self.app>(scope, receive, send)
File "/src/worker/baseplate_asgi/asgi/baseplate_asgi_middleware.py", line 37, in __call__
await span_processor.execute()
File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 61, in execute
raise e
File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 57, in execute
await <http://self.app|self.app>(self.scope, self.receive, self.send)
File "/src/.venv/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
await <http://self.app|self.app>(scope, receive, _send)
File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await <http://self.app|self.app>(scope, receive, sender)
File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
await <http://self.app|self.app>(scope, receive, send)
File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/src/worker/baseplate_statefun/server/asgi/make_statefun_handler.py", line 25, in statefun_handler
result = await handler.handle_async(request_body)
File "/src/.venv/lib/python3.9/site-packages/statefun/request_reply_v3.py", line 262, in handle_async
msg = Message(target_typename=sdk_address.typename, target_id=sdk_address.id,
File "/src/.venv/lib/python3.9/site-packages/statefun/messages.py", line 42, in __init__
raise ValueError("target_id can not be missing")
Any pointers would be greatly appreciated! Also a quick explanation what target_id
is used for would be great - it sounds like it allows Flink to tie StateFun invocations to a specific instance of a function but I don’t fully understand what this is used in practice.Jaya Ananthram
08/02/2022, 11:02 AMlaxmi narayan
08/02/2022, 11:05 AMRoman Bohdan
08/02/2022, 12:17 PMLogback 2022-08-02 15:14:25 WARN o.a.f.r.t.Task:1097 - - Sink: pipeline.errors (2/4)#0 (2bd6ddc871fb0b6bf0f90a2c723c51e7) switched from DEPLOYING to FAILED with failure cause: java.lang.UnsupportedOperationException: The configuration is unmodifiable; its contents cannot be changed.
at org.apache.flink.configuration.UnmodifiableConfiguration.error(UnmodifiableConfiguration.java:73)
at org.apache.flink.configuration.UnmodifiableConfiguration.setValueInternal(UnmodifiableConfiguration.java:63)
at org.apache.flink.configuration.Configuration.set(Configuration.java:730)
at org.apache.flink.runtime.state.CheckpointStorageLoader.load(CheckpointStorageLoader.java:177)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStorage(StreamTask.java:1505)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:389)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:359)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:332)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:324)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:314)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:75)
at jdk.internal.reflect.GeneratedConstructorAccessor6.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1582)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:740)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:829)
it shouild be related to:
env.setDefaultSavepointDirectory(configuration.get(ConfigOptions
.key("state.savepoints.dir").stringType().noDefaultValue()));
but then, how can i connect savepoints directoryDuc Anh Khu
08/02/2022, 12:42 PMlogging
or print
to client stdout?
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
My current code as above doesn't seem to work. 🤔Parthiban PR
08/02/2022, 1:20 PMIldar Almakaev
08/02/2022, 1:54 PMHTTPS
and that’s why the Schema Registry client v5.5.2 needs parameters for a truststore certificate.
Since Kinesis Data Analytics is serverless and it deploys only .jar
files, I did the following trick based on this AWS example. So they suggest storing a certificate inside a jar and then copying it in runtime to a desired location, e.g. /tmp/certificate.jks
. FYI, it is done in Flink operators like FlinkKafkaConsumer and FlinkKafkaProducer classes.
Overall, I’m doing copy the SSL certificate for Schema Registry client like in the example, but not in the context of Flink operators. I think that’s the reason why I’m getting FileNotFoundException here.
I think the problem is that I don’t copy it to TaskManager nodes, isn’t it?
Or is there any way to copy the same file to all taskmanager nodes so that task nodes could read them using FileInputStream(...)?
Additional info:
Flink version: 1.13.2
Java version: 11
Scala version: 2.12
Kevin L
08/02/2022, 2:28 PM'kafka'
connector. I am trying to run the following query:
// registry Kafka topic as table. The kafka topic has already been created in my dev environment
CREATE TABLE clicks (
id STRING,
action STRING,
timestamp TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'kafka1:29092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
Then to test that this works correctly, I am trying to run to preview the kafka topic contents.
select * from clicks;
I am submitting both the create table and select statement from the interactive shell I access by starting a shell in the taskmanager and running the provided sql_client.sh
script.
The create table query submits without error, but when I submit the select statement, I get the following error:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getPrimaryKeyIndexes()[I
Any ideas on what the above error means and how to fix it? Thanks!
Additional Information:
I am using a custom flink docker image:
FROM flink:latest
RUN wget -P /opt/flink/lib <https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.1/flink-sql-connector-kafka-1.15.1.jar>
Dan Hill
08/02/2022, 9:21 PMsetStartFromEarliest
.
One of our jobs behaves correctly. During the initial run, it starts from the earliest. After savepoint and recovering, the uses the offsets in the checkpoints/savepoints.
However, one of our jobs does not behave correctly. Whenever it restores, it always starts from earliest. We don't know why. I don't see warnings or errors in our logs.
Thoughts? Is this a bug with the older FlinkKafkaConsumer?Pedro Lera
08/03/2022, 2:26 PMval resultTableJoin = tableEnv.sqlQuery(
"""
|SELECT pt.*, poi.poi_id, poi.association_value, poi.owner_id
|FROM packetsTable pt
|LEFT OUTER JOIN PoisTable FOR SYSTEM_TIME AS OF pt.`ts_rowtime` AS poi
|ON <http://pt.pk|pt.pk> = <http://poi.pk|poi.pk> OR IsInZone(poi.association_value, pt.lat, pt.lng)
|""".stripMargin)
But i keep getting this exeption:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty.
The IsInZone is just a simple user defined function to check if a location is inside another location (with a predefined radius)
I really appreciate any help,
Thank you all in advance 🙂Don Li
08/03/2022, 2:48 PMDataStream<String>
(after grabbing the value through the nested JSON), and I want to loop through all the files in that s3 path, then extract some data from each file. How would I go about doing that? I was thinking about using readTextFile
or a FileSource
but that ends up being a DataStream<DataStream<String>>
, which doesn't seem correct. How would I transform the initial DataStream<String>
(or grab that s3 path) and have another DataStream<String>
from either readTextFile
or FileSource
?Ali Zia
08/03/2022, 3:24 PMLee Wallen
08/03/2022, 3:55 PMDan Hill
08/03/2022, 4:30 PM2022-08-03 09:15:31
java.lang.Exception: Job leader for job id 6178032ec3a9318aa06adf0e30ee291f lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2228)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2226)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
I saw posts saying that TM->ZK connections have issues on AWS. They said timeouts can happen due to networking or TMs being unresponsiv (e.g. GCing). I bumped up these settings.
high-availability.zookeeper.client.connection-timeout: 60000
high-availability.zookeeper.client.session-timeout: 240000
Dan Hill
08/03/2022, 4:30 PMXi Cheng
08/03/2022, 5:40 PMIn order to use abut it did not specify when theyou must first create a class that implements theGauge
interfaceorg.apache.flink.metrics.Gauge
getValue
method will get invoked. Based on the example, i assume that the supplied function (https://nightlies.apache.org/flink/flink-docs-release-1.9/api/java/org/apache/flink/metrics/Gauge.html#getValue--) is evaluated each time the RichMapFunction
finishes the map value function?Adrian Chang
08/03/2022, 6:38 PMRakesh V
08/03/2022, 7:36 PMmvn clean package -DskipTests
? I just checked out the master branch and getting the build error for flick-connector-hive component. I thought master branch was stableIvan M
08/03/2022, 7:53 PMts TIMESTAMP(3) METADATA FROM 'timestamp'
but when I try to use it in the over window like this: ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts ASC) AS row_num
I got OVER windows' ordering in stream mode must be defined on a time attribute.
error.
On the other hand, when I define a timestamp field as proctime: proctime AS PROCTIME()
it works fine.
Could you clarify what the difference between these two timestamps and what I can use in over windows?
I thought the first one is also a time attribute.Sucheth Shivakumar
08/04/2022, 2:28 AMDarin Lee
08/04/2022, 4:48 AMRaghunadh Nittala
08/04/2022, 9:11 AMIvan M
08/04/2022, 10:18 AMWITH
clauses and a couple of joins.
This query reads data from kafka connected tables and produces new events.
Now I want to write these new events into new kafka topic.
I created a new table. But when I try to make INSERT INTO
from the query above I got error:
Table sink <sink table> doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=..., select=..., leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
Could you help me to know what I'm doing wrong here?