anusca
07/05/2022, 4:14 PMpy4j.protocol.Py4JJavaError: An error occurred while calling o84.executeInsert.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.MyTopic'.
Table options are:
'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='123212'
'topic'='MyTopic'
Does anyone know how I can solve it?
I using the following versions of modules
numpy==1.19.5
apache-flink-libraries==1.13.6
apache-flink==1.13.6
Roman Bohdan
07/05/2022, 7:51 PM2022-07-05 19:21:21,796 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] [flink-akka.actor.default-dispatcher-17] Cannot find task to fail for execution d6683fb33067d87adecd0d476af6874e with exception:
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt d6683fb33067d87adecd0d476af6874e was not found.
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:450) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
HAJI
07/06/2022, 12:49 AMpython -m pip install apache-flink==1.15.0
after got
Collecting pyarrow<3.0.0,>=0.15.1
Using cached pyarrow-2.0.0.tar.gz (58.9 MB)
Installing build dependencies ... error
ERROR: Command errored out with exit status 1:
command: /usr/bin/python3 /usr/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-wd5gcv6i/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i <https://pypi.org/simple> -- 'cython >= 0.29' 'numpy==1.14.5; python_version<'"'"'3.7'"'"'' 'numpy==1.16.0; python_version>='"'"'3.7'"'"'' setuptools setuptools_scm wheel
cwd: None
Complete output (367 lines):
Ignoring numpy: markers 'python_version < "3.7"' don't match your environment
Collecting cython>=0.29
Using cached Cython-0.29.30-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl (1.8 MB)
....
ERROR: Command errored out with exit status 1: /usr/bin/python3 /usr/lib/python3.7/site-packages/pip install --ignore-installed --no-user
I used Python 3.7.10,
but I tried with aws EMR, python 3.4 flink 1.14 and python 3.6 and flink 1.14
but still can't set pyflink
Building wheels for collected packages: pyarrow
Building wheel for pyarrow (pyproject.toml) ... error
ERROR: Command errored out with exit status 1:
command: /usr/local/bin/python3 /home/hadoop/.local/lib/python3.6/site-pes/pip/_vendor/pep517/in_process/_in_process.py build_wheel /tmp/tmp2hpyiu5
cwd: /mnt/tmp/pip-install-vgb1drwj/pyarrow_82e244416a7f48bf88e94182cb4
...
error: command 'cmake' failed with exit status 1
----------------------------------------
ERROR: Failed building wheel for pyarrow
Failed to build pyarrow
ERROR: Could not build w
how can I do ?laxmi narayan
07/06/2022, 8:34 AMRashmin Patel
07/06/2022, 8:47 AMclass FlinkDataflow[T: TypeTag](val stream: DataStream[T]) extends IDataflow[T] {
def map[O:TypeTag](f: T => O, id: String): FlinkDataflow[O] = {
val typeInfo: TypeInformation[O] = TypeInformation.of(classOf[O])
val resultStream: DataStream[O] = stream.map(new MapFunction[T, O] {
override def map(value: T): O = {
f.apply(value)
}
}).returns(typeInfo).uid(id).name(id)
new FlinkDataflow[O](resultStream)
}
}
HAJI
07/06/2022, 8:49 AMShashank Mishra
07/06/2022, 4:35 PMJaya Ananthram
07/06/2022, 5:43 PMGeoffrey Picron
07/06/2022, 5:57 PMAdrian Chang
07/06/2022, 7:58 PMGROUP BY HOP(tsMs, interval '5' MINUTE, interval '60' MINUTE)
The issue is I never get the latest 5 minutes. For example,
the windows is called at: 2022-07-06 15:45:04.694672
window start time: 2022-07-06 18:44:59.999000
window row time: 2022-07-06 19:39:59.999000
I was expecting a window row time of: 2022-07-06 19:44:59.999000
I suspect the reason is that I receive the source event every 5 minutes and how the watermarks are generated.
I tried with
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().get_configuration() \
.set_string('parallelism.default', '1') \
.set_string('pipeline.time-characteristic', 'EventTime') \
.set_string('pipeline.auto-watermark-interval', '100 ms')
but it doesn't make any difference.
Could you confirm if these parameters are valid for Python and Table API ? If yes, could you provide me with an example of how to use it ?
ThanksJeremy Ber
07/06/2022, 9:29 PMtimestamp: 2022-07-06T01:01:01
bucket should be
year=2022/month=07/day=06/hour=01
but it’s landing in
year=2022/month=07/day=06/hour=05
Jaya Ananthram
07/06/2022, 9:43 PMHAJI
07/07/2022, 1:44 AMOwen Lee
07/07/2022, 7:55 AMHAJI
07/07/2022, 8:24 AMShqiprim Bunjaku
07/07/2022, 8:34 AMNithin kharvi
07/07/2022, 8:51 AMAqib Mehmood
07/07/2022, 8:59 AMstreamExecEnv.fromElements(new JSONObject(DataStream<String>.toString())).addSink(
JdbcSink.sink(
"insert into <table> (<column 1>, <column 2>) values (?, ?)",
(statement, response) -> {
statement.setObject(1, response.get("<key 1>");
statement.setObject(2, response.get("<key 2>"));
},
The DataStream<String>.toString() object on the first line, despite being in JSON form, does not act like a string despite the toString() method and thus cannot be converted into JSON.
So we're facing difficulty sinking streaming data into JDBC using DataStream objects.
Is there a more standard method of sinking realtime data into jdbc that I'm missing?
Thank you in advanceZain Haider Nemati
07/07/2022, 11:15 AMtransformed_Stream.addSink(JdbcSink.sink(
"insert into table (a,b,c) values (?, ?, ?)",
(statement, response) -> {
statement.setString(1, response.get("a").toString());
statement.setString(2, response.get("b").toString());
statement.setString(3, response.get("c").toString());},
));
Error:
Caused by: org.json.JSONException: JSONObject["a"] not found.
Roman Bohdan
07/07/2022, 12:44 PMSlackbot
07/07/2022, 4:52 PMSusan Perkins
07/07/2022, 6:25 PMAdrian Chang
07/07/2022, 6:41 PMtable.exec.source.idle-timeout
and pipeline.auto-watermark-interval
but the watermark is not updated when there is no events.
I guess the purpose of table.exec.source.idle-timeout
is if some partitions get idle, not for the entire topics. Am I right ?
If I regularly produce to the Kafka topic "ping" events, the Watermark is updated as I expect.
Is it any other solution for this scenario than producing "ping" events ?
ThanksDan Hill
07/08/2022, 1:05 AM/jobs/:jobid/exceptions
REST API but I don’t get useful information from the exceptions API.
I get the job status using
clusterClient.getJobStatus(jobId).get
The job status is FAILED
.
When I do a REST API for the exception, I get:
{
"root-exception": null,
"timestamp": null,
"all-exceptions": [],
"truncated": false,
"exceptionHistory": {
"entries": [],
"truncated": false
}
}
The getJobStatus call returns a stacktrace that indicates the checkpoint timed out. This happens very quickly and I explicitly set the timeout to a longer time period.
Is this a known issue with the exceptions API?salvalcantara
07/08/2022, 4:11 AMsetKafkaSubscriber
public on the kafka source builder (see https://issues.apache.org/jira/browse/FLINK-24660). So, I guess using a custom subscriber would be a way to go about it. However, I'm wondering if it could be achieved in a more reactive way, e.g., by using some sort of control signal for the source so that new topics are specified on demand (e.g., by publishing the set of topics of interest to a control topic). That is something I've been thinking about lately, what are your thoughts? Is this something doable? Nonsensical?Gaurav Miglani
07/08/2022, 6:59 AMGaurav Miglani
07/08/2022, 9:26 AMNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/flink-operator-webhook-service ClusterIP 172.20.171.206 <none> 443/TCP 25h
service/test-job-rest ClusterIP 172.20.112.225 <none> 8081/TCP 45s
Levani Kokhreidze
07/08/2022, 9:39 AMIncrementalRemoteKeyedStateHandle
). While if I restart task managers, local recovery is triggered.
Wondering if this is known limitation or there’s some additional config to tweak?
Setup:
• HA setup with Zookeeper and S3 remote storage.
• JobManager runs as StatefulSet with PersistentVolume. Both process.jobmanager.working-dir
and jobmanager.resource-id
are correctly configured.
• TaskManagers run as StatefulSets with PersistentVolume. Both process.taskmanager.working-dir
and taskmanager.resource-id
are correctly configured.Gaurav Miglani
07/08/2022, 1:44 PM2022-07-08 10:53:36,253 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - MiniBatchAssigner[2] -> Calc[3] -> (LocalWindowAggregate[4], LocalWindowAggregate[12], LocalWindowAggregate[21], LocalWindowAggregate[30]) (33/40) (4904d8c6fa01f6c1d9856a829351795e) switched from RUNNING to FAILED on user-concurrency-streamverse-taskmanager-1-1 @ ip-10-171-54-88.ec2.internal (dataPort=41191).
java.lang.NullPointerException: null
at org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners$AbstractSliceAssigner.assignSliceEnd(SliceAssigners.java:558) ~[flink-table-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator.processElement(LocalSlicingWindowAggOperator.java:114) ~[flink-table-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:40) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:28) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
at StreamExecCalc$56.processElement_split2(Unknown Source) ~[?:?]
at StreamExecCalc$56.processElement(Unknown Source) ~[?:?]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.table.runtime.operators.wmassigners.RowTimeMiniBatchAssginerOperator.processElement(RowTimeMiniBatchAssginerOperator.java:74) ~[flink-table-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Adrian Chang
07/08/2022, 8:22 PMWatermarkGenerator
in the Python code of Flink 1.15. Is it possible to have an equivalent to Periodic WatermarkGenerator in Python ? I want to increase the watermark even if there is no event for a while