Aqib Mehmood
06/13/2022, 8:24 AMFileInputStream ip = new FileInputStream(<.properties file location>);
Reader ip = new FileReader(file);
Properties prop = new Properties();
prop.load(ip);;
String timeStart = prop.getProperty("timeStamp");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timeStart);
• Secondly we're exploring the checkpointing option provided by flink to checkpoint progress at regular intervals. The issue with this approach is that we're running flink in application mode and thus have separate job manager for each job. Wouldn't the checkpointing data be lost if a job were to fail, bringing down the JM which is storing the checkpointing data? How can we use this checkpointing data to start another kinesis job in case of failure?
Is there another way of ensuring stateful resumption of flink-kinesis jobs that we're missing out? Would love to know your thoughts.
TIAVeeramani Moorthy
06/13/2022, 10:15 AMMárk Bartos
06/13/2022, 1:00 PMIldar Almakaev
06/13/2022, 2:56 PMstate.backend=hashmap
?John Gerassimou
06/13/2022, 4:59 PMErik Wickstrom
06/14/2022, 4:05 AMBill Rao
06/14/2022, 7:41 AMCould not find a free permitted port on the machine.
) is thrown as following:
Caused by: java.lang.RuntimeException: Could not find a free permitted port on the machine.
at org.apache.flink.util.NetUtils.getAvailablePort(NetUtils.java:177) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonEnvUtils.java:365) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2022-06-14 07:25:28,086 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2022-06-14 07:25:28,089 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2022-06-14 07:25:28,092 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2022-06-14 07:25:28,572 WARN akka.actor.CoordinatedShutdown [] - Could not addJvmShutdownHook, due to: Shutdown in progress
2022-06-14 07:25:28,573 WARN akka.actor.CoordinatedShutdown [] - Could not addJvmShutdownHook, due to: Shutdown in progress
Could anyone offer some clues? I reviewed the source code and found nothing useful. There is an IOException silenced in NetUtils, but otherwise, I cannot see why an error like this is relevant when starting pyflink. Have been suspecting privilege issues, but a minimal example of pyflink (word_count.py) runs with no abnormality.Liubov
06/14/2022, 10:00 AMTom Xiao
06/14/2022, 11:08 AMdata
. I want to extract platform field and cast it into an array
{
"id": 1,
"name": "Bitcoin",
"symbol": "BTC",
"slug": "bitcoin",
"cmc_rank": 1,
"num_market_pairs": 9237,
"circulating_supply": 18984362,
"total_supply": 18984362,
"max_supply": 21000000,
"last_updated": "2022-03-15T23:00:00.000Z",
"date_added": "2013-04-28T00:00:00.000Z",
"tags": [
"mineable",
"pow",
"sha-256",
"store-of-value",
"state-channel",
"coinbase-ventures-portfolio",
"three-arrows-capital-portfolio",
"polychain-capital-portfolio",
"binance-labs-portfolio",
"blockchain-capital-portfolio",
"boostvc-portfolio",
"cms-holdings-portfolio",
"dcg-portfolio",
"dragonfly-capital-portfolio",
"electric-capital-portfolio",
"fabric-ventures-portfolio",
"framework-ventures-portfolio",
"galaxy-digital-portfolio",
"huobi-capital-portfolio",
"alameda-research-portfolio",
"a16z-portfolio",
"1confirmation-portfolio",
"winklevoss-capital-portfolio",
"usv-portfolio",
"placeholder-ventures-portfolio",
"pantera-capital-portfolio",
"multicoin-capital-portfolio",
"paradigm-portfolio"
],
"quote": [
{
"USD": {
"price": 39338.785492032424,
"volume_24h": 23934000867,
"percent_change_1h": -0.392318904627,
"percent_change_24h": -0.826814091987,
"percent_change_7d": 1.552812110667,
"market_cap": 746821744421.0917,
"last_updated": "2022-03-15T23:00:00.000Z"
}
}
]
}
Jeff Levesque
06/15/2022, 12:52 AMShun Sun
06/15/2022, 9:05 AMTymur Yarosh
06/15/2022, 10:48 AMAqib Mehmood
06/15/2022, 1:19 PMCheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
The issue is that:
This code doesn't seem to be working on our current flink version i.e. 1.13.1
Although it is working fine in version 1.13.6. These two version are almost similar so we aren't able to understand why this is.
Have anyone else faced this issue or know how to overcome it?
TIADJ
06/15/2022, 2:32 PMJeesmon Jacob
06/15/2022, 2:56 PMRich Hanes
06/15/2022, 7:27 PMTheo Diefenthal
06/15/2022, 10:46 PMtopic-partition-offset-states
which doubles in size each time and now already has 4 million entries in the UNION list state. From flink code investigation, I see that this state belongs to FlinkKafkaConsumerBase. However, with my Flink 1.14 upgrade, I replaced the FlinkKafkaConsumer with the new KafkaSource, so this state should not be in use at all, making me questioning why its retained and why it doubles in size on each restart with savepoint. Hence the question: I start my job with "allowNonRestoreState". Could it be that even though unused, the state is still kept in Flink somewhere and used in the next savepoint? Note that I already resolved the issue by simly deleting the state and starting the job from scratch. Now the savepoint works as usual. But I'm just wondering what could have caused this behaviourEric Hwang
06/16/2022, 3:05 AMZsombor Chikan
06/16/2022, 2:19 PMhistoryserver.archive.retained-jobs
? The oldest archive job will be deleted etc.. or it will throw an IllegalConfigurationException
?Rajendra
06/16/2022, 4:24 PMorg.apache.flink.util.FlinkException: Execution XXXXXXXXX is unexpectedly no longer running on task executor 10.227.18.50:6122-34d81a.
any help is appreciated
thanks in advance.DJ
06/16/2022, 4:57 PMorg.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
Hong Teoh
06/17/2022, 12:14 PMJeesmon Jacob
06/17/2022, 12:28 PMLastState
upgrade mode is not supported for FlinkSessionJob
. Is there a plan to support it in future? Thanks.
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/docs/custom-resource/overview/#limitationsJeesmon Jacob
06/17/2022, 7:48 PMFlinkSessionJob
in kubernetes operator. When I use job.args
that references a config file, what will be the context of that config file? Is that going to be the location of config file in the docker image referenced in FlinkDeployment
or local to the operator?
job:
jarURI: <https://xxx/my-job.jar>
args:
- -c
- /path/to/my/job-config.yaml
Sharon Xie
06/17/2022, 11:32 PMMaciej Obuchowski
06/18/2022, 2:12 PMStreamExecutionEnvironment.createRemoteEnvironment
Damon Rolfs
06/18/2022, 7:51 PM/jars/:jarid/run
). The job has a basic three stage setup (source -> through -> sink). We do explicitly setParallelism
on the source, which draws from Kafka, and do not set parallelism on the through and sink stages; i.e., our intention is for the latter parts to follow the parallelism assigned to job at start. We do not set parallelism.default
in flink.conf. I've scoured both code and configuration and I can't find what could prevent the parallelism for the latter stages from using the parallelism supplied on job run.
We start the cluster with 16 task managers, and manually follow the conventional (without reactive flink) rescale downward: take a savepoint+cancel; terminate 2 task managers; and then restart the jar with savepoint and parallelism=14. The outcome is the job attempts to run at 16 parallelism (latter stages) and fails with the NoResourceAvailableException
because the required resources aren't available. Similarly, attempts to rescale at other sizes also fail with the job trying to run at 16.
We don't setMaxParallelism
relying on the 128 default.
I appreciate any guidance on where I can look to find what's driving this nonelastic parallelism of 16.
Thanks in advance! DamonJeff Levesque
06/18/2022, 10:08 PMsliding_window_table = (
input_table.window(
Slide.over(sliding_window_over)
.every(sliding_window_every)
.on(sliding_window_on)
.alias(sliding_window_alias)
)
.group_by('ticker, {}'.format(sliding_window_alias))
.select('''
ticker,
MIN(price) as min_price,
MAX(price) as max_price,
{0}.start as utc_start,
{0}.end as utc_end
'''.format(sliding_window_alias))
)
I want to find the first value within the sliding window, as well as the last value within the sliding window. So, I found general SQL syntax for this [1]:
FIRST_VALUE(price) OVER (PARTITION BY CONVERT(date, dateColumn) ORDER BY dateColumn ASC) as first_price,
FIRST_VALUE(price) OVER (PARTITION BY CONVERT(date, dateColumn) ORDER BY dateColumn DESC) as last_price
I did a quick search to see if FIRST_VALUE
is a function offered by flink (with hopes it would be supported by PyFlink). I found that it should be provided [2]. So, I tried the syntax in my sliding window as follows:
sliding_window_table = (
input_table.window(
Slide.over(sliding_window_over)
.every(sliding_window_every)
.on(sliding_window_on)
.alias(sliding_window_alias)
)
.group_by('ticker, {}'.format(sliding_window_alias))
.select('''
FIRST_VALUE(price),
ticker,
MIN(price) as min_price,
MAX(price) as max_price,
{0}.start as utc_start,
{0}.end as utc_end
'''.format(sliding_window_alias))
)
However, I received the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
: org.apache.flink.table.api.ValidationException: Undefined function: FIRST_VALUE
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53)
at java.base/java.util.Optional.orElseThrow(Optional.java:408)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35)
at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
at org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606)
at org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775)
at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Process finished with exit code 1
I notice that the flink documentation [2], is regarding flink v.1.16, yet I'm using apache-flink==1.13.2
. So, I searched for documentation on flink v.1.13 [3], and found FIRST_VALUE
is supported.
[1] https://stackoverflow.com/a/61346024/2063478
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/
[3] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/Ali AIT-BACHIR
06/20/2022, 12:21 PMjava.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "com/datastax/driver/core/Cluster$Builder"
Thanks for your help.Luis Figueiredo
06/20/2022, 7:13 PM