chunilal kukreja
11/01/2022, 11:10 AMTiansu Yu
11/01/2022, 11:16 AMWatermarkStrategy.noWatermarks()
stream B with boundedOutOfOrderness()
, does the result stream inherit timestamp and watermark from stream B entirely or the watermark wont be advanced because of stream A does not have a watermark?Jin S
11/01/2022, 3:16 PMAlexander Fedulov
11/01/2022, 3:24 PMClass<? extends UserDefinedFunction> clazz = (Class<? extends UserDefinedFunction>) Class.forName(f.getClassName(), true, loader);
tableEnvironment.createTemporarySystemFunction(f.getName(), clazz);
now leads to java.lang.ClassNotFoundException
. I tried passing the same loader while initializing the TableEnvironment
using the newly added method, but it did not help. Any hints?Sachin Saikrishna Manikandan
11/01/2022, 4:51 PMAdrian Chang
11/01/2022, 7:03 PMdefaultConfiguration:
create: true
append: true
flink-conf.yaml: |+
# Prometheus
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250-9260
In the logs I see these lines
Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter
Loading configuration property: $internal.application.main, org.apache.flink.client.python.PythonDriver
Loading configuration property: metrics.reporter.prom.port, 9250-9260
Started PrometheusReporter HTTP server on port 9250.
Reporting metrics for reporter prom of type org.apache.flink.metrics.prometheus.PrometheusReporter.
However the JobManager and TaskManager pods are not exporting the port 9250
Do I need to set something else to open the port in which Prometheus will read the metrics ?Jason Politis
11/01/2022, 8:05 PMjava.time.DateTimeException: Field DayOfYear cannot be printed as the value 269 exceeds the maximum print width of 2
We are trying to identify this bad data. I'm in Ververica now, running a sql preview. There are only 2k records in this table. Is there an easy way to export the results of sql preview so I can analyze the data in excel or something?Jirawech Siwawut
11/02/2022, 2:59 AMSachin Saikrishna Manikandan
11/02/2022, 10:26 AMAviv Dozorets
11/02/2022, 11:05 AMforwarding
between them, no rebalancing or shuffling.
Do I have the way to ensure that all of tasks run on the same TM node ? Or understand how tasks are spread between TM ? thanksRaghunadh Nittala
11/02/2022, 11:20 AMMatteo De Martino
11/02/2022, 3:05 PMCREATE TABLE
CREATE TABLE SOME_TABLE
(
id INT NOT NULL,
type STRING,
...
...
)
WITH(
'connector' = 'filesystem',
'path' = 'file:///path/to/file.csv',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
);
I also added flink-csv
as a dependency.
But when I try to executeSql
the above I get:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
What am I missing? My understanding is that there is no explicit dependency to add for the filesystem connector to work. Am I wrong?
Thanks 🙇Yaroslav Bezruchenko
11/02/2022, 4:21 PMRashmin Patel
11/02/2022, 4:23 PM2022-11-02 10:21:38,370 WARN org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=dp_flink_streaming_1664860215310-0, groupId=dp_flink_streaming_1664860215310] Error while fetching metadata with correlation id 5244076 : {loan_account_service.public.loan_accounts=UNKNOWN_TOPIC_OR_PARTITION}
Is this ideal behaviour or should we expect the job to fail ?Jeesmon Jacob
11/02/2022, 4:32 PMkubernetes.operator.job.restart.failed
only for certain FlinkDeployments? Or is it a global flag that applied to all deployments?
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#restart-failed-job-deploymentsKrish Narukulla
11/02/2022, 5:49 PMAmir Halatzi
11/02/2022, 10:41 PMHuib
11/02/2022, 11:02 PMmax(value) over (partition by window(time, 15 minutes), some_id order by time asc)
? The goal is to get a “rolling max” that updates whenever new data comes in, but limits itself to 15 minute windows. Data will arrive mostly in-order per key, but there are more keys than partitions in Kafka (Eventhub). Data that arrives more than (say) 5 minutes too late isn’t relevant any more, but I can’t really delay the results. Is this even possible with SQL?Shubho
11/03/2022, 6:07 AMSlackbot
11/03/2022, 1:14 PMAbhinav sharma
11/03/2022, 2:12 PMRaghunadh Nittala
11/03/2022, 2:28 PMval createDDl = "CREATE TABLE sink_table (" +
" sampleId STRING NOT NULL," +
" location STRING NOT NULL, " +
" dt STRING NOT NULL, " +
" hour STRING NOT NULL)" +
" PARTITIONED BY (sampleId, location, dt, hour)" +
" WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '$filePath'," +
" 'format' = 'parquet'," +
" 'partition.time-extractor.class' = 'HourPartitionTimeExtractor::class.java'," +
" 'sink.rolling-policy.rollover-interval' = '5 m', " +
" 'sink.partition-commit.trigger'='partition-time', " +
" 'sink.partition-commit.policy.kind' = 'success-file' " +
")"
tableEnv.executeSql(createDDL)
I’m using a query from another table to populate this using INSERT INTO.. SELECT
syntax. I’m trying to use the example given in Flink doc for HourPartTimeExtractor:
public class HourPartTimeExtractor implements PartitionTimeExtractor {
@Override
public LocalDateTime extract(List<String> keys, List<String> values) {
String dt = values.get(0);
String hour = values.get(1);
return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
}
}
However, I’m not able to generate the partitions in desired way. I think the class itself is not being called. Do I need to register this as UDF? using something like createTemporarySystemFunction
?Sevvy Yusuf
11/03/2022, 3:56 PMSachin Saikrishna Manikandan
11/03/2022, 4:03 PMHannah Hagen
11/03/2022, 10:57 PMModuleNotFoundError: No module named '_lzma'
whenever I run simple scripts. Is anyone else encountering this?
I get this error when running a tutorial job: /opt/flink/bin/flink run --python examples/python/table/word_count.py
I am running the code on a docker container using an image nearly the same as this (mine just installs a few additional things). I'm running Flink 1.16.0 and Python-3.7.9
thanks in advance for any help!Maryam
11/04/2022, 12:03 AMtumbling window aggregate query
following the examples in The Table API (ref) and SQL (ref).
The Table API execution plan is using GroupWindowAggregate
which is deprecated but execution plan for SQL is using GlobalWindowAggregate
.
I am using flink 1.15.
Is there a reason the table API is using the deprecated feature?
here are the queries and their plan:
val windowAgg = jTable
.window(Tumble over 60.seconds() on $"rowtime" as "w")
.groupBy($"currency", $"w")
.select(
$"currency", $"totalPrice".sum as "totalSales"
)
// == Optimized Execution Plan ==
// Calc(select=[currency, EXPR$0 AS totalSales])
// +- GroupWindowAggregate(groupBy=[currency], window=[TumblingGroupWindow('w, rowtime, 60000)], select=[currency, SUM(totalPrice) AS EXPR$0])
val windowAggSQL = tableEnv.sqlQuery(
s"""
|Select currency, SUM(totalPrice) AS totalSales
|From TABLE(
| TUMBLE(TABLE jtable, DESCRIPTOR(rowtime), INTERVAL '60' SECONDS))
|Group By currency, window_start, window_end
|""".stripMargin
)
// == Optimized Execution Plan ==
// Calc(select=[currency, totalSales])
// +- GlobalWindowAggregate(groupBy=[currency], window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[currency, SUM(sum$0) AS totalSales, start('w$) AS window_start, end('w$) AS window_end])
shuaiqi xu
11/04/2022, 3:22 AMshuaiqi xu
11/04/2022, 3:22 AMCaused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /tmp/flink-io-75b93805-1057-4177-842a-2279ead561a8/job_118acccbcd2559d9acdfc8be402e693a_op_IntervalJoinOperator_10766d48b60612eb868124fba19e114a__1_1__uuid_2eae3845-9d46-4c66-9b8c-3359bb9b17bd/db/OPTIONS-000014: No such file or directory
Krish Narukulla
11/04/2022, 4:16 AM[ERROR] org.apache.flink.core.fs.LimitedConnectionsFileSystemDelegationTest.testDelegateOutStreamMethods Time elapsed: 0.013 s <<< ERROR!
org.mockito.exceptions.base.MockitoException:
Mockito cannot mock this class: class org.apache.flink.core.fs.FSDataOutputStream.
If you're not sure why you're getting this error, please report to the mailing list.
Java : 17
JVM vendor name : <http://Amazon.com|Amazon.com> Inc.
JVM vendor version : 17.0.4.1+9-LTS
JVM name : OpenJDK 64-Bit Server VM
JVM version : 17.0.4.1+9-LTS
JVM info : mixed mode, sharing
OS name : Mac OS X
OS version : 12.6.1
You are seeing this disclaimer because Mockito is configured to create inlined mocks.
You can learn about inline mocks and their limitations under item #39 of the Mockito class javadoc.
Underlying exception : org.mockito.exceptions.base.MockitoException: Could not modify all classes [interface java.io.Closeable, class java.lang.Object, class org.apache.flink.core.fs.FSDataOutputStream, interface java.io.Flushable, class java.io.OutputStream, interface java.lang.AutoCloseable]
at org.apache.flink.core.fs.LimitedConnectionsFileSystemDelegationTest.testDelegateOutStreamMethods(LimitedConnectionsFileSystemDelegationTest.java:168)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.lambda$execute$1(JUnitPlatformProvider.java:199)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:193)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:120)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
Caused by: org.mockito.exceptions.base.MockitoException: Could not modify all classes [interface java.io.Closeable, class java.lang.Object, class org.apache.flink.core.fs.FSDataOutputStream, interface java.io.Flushable, class java.io.OutputStream, interface java.lang.AutoCloseable]
at net.bytebuddy.TypeCache.findOrInsert(TypeCache.java:152)
at net.bytebuddy.TypeCache$WithInlineExpunction.findOrInsert(TypeCache.java:365)
at net.bytebuddy.TypeCache.findOrInsert(TypeCache.java:174)
at net.bytebuddy.TypeCache$WithInlineExpunction.findOrInsert(TypeCache.java:376)
... 42 more
Sumit Nekar
11/04/2022, 5:20 AM