Youssef Gamal
06/29/2022, 7:23 PMpyflink.datastream.connectors
) in the Python API? For example, can we sink data to Cassandra through the python API? I am struggle to figure out how to even import CassandraSink
jason Wang
06/30/2022, 4:02 AMhello everyone,i wanna ask a funny question, is there anyone use flink to do some batch process?can u share some experience or has anyone done something like analysized the difference between spark and flink refer to batch process capabilites、resource use、optimization or other multi-dimensional angles.
1)develop with table api/flink sql spark sql
2)develop with dataset or datastream api
any advice and shared experience will be appreciated.
Veeramani Moorthy
06/30/2022, 6:58 AMVeeramani Moorthy
06/30/2022, 7:00 AMMusy Tom
06/30/2022, 7:43 AMDylan Wylie
06/30/2022, 10:50 AMattachAsDataStream
to statement sets which I think would help, but it doesn't look like it is available in the python api and we're also stuck on an older version (1.13) with aws kda.
context is wanting to simultaneously:
• write to s3 in a bulk-encoded format.
◦ datastream connector: only row-encoded formats look to be supported in pyflink's datastream file sink ❌
◦ table api connector: supports this just fine ✅
• write to JDBC using custom upserts to ensure exactly once
◦ datastream connector: allows this ✅
◦ table api connector: builds the upsert itself so isn't able to add the extra conditions needed to make this exactly-once ❌shishal singh
06/30/2022, 2:30 PMdario bonino
06/30/2022, 3:17 PMorg.apache.flink.util.FlinkRuntimeException: Error while adding value to RocksDB
at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:80)
at org.apache.flink.contrib.streaming.state.RocksDBReducingState.add(RocksDBReducingState.java:96)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:413)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.rocksdb.RocksDBException: block checksum mismatch: stored = 225416896, computed = 263486412 in /tmp/flink-io-5f30e001-0b1f-443f-93d3-6a3cb1cd68d2/job_9d5c118e8b6643dfa7ff188e099b55e1_op_WindowOperator_1e4c6222d1927931766f7327355009c0__27_90__uuid_1228482d-13e5-4253-bfa2-c19c74c591db/db/000021.sst offset 1541261 size 34277
at org.rocksdb.RocksDB.put(Native Method)
at org.rocksdb.RocksDB.put(RocksDB.java:955)
at org.apache.flink.contrib.streaming.state.AbstractRocksDBAppendingState.updateInternal(AbstractRocksDBAppendingState.java:78)
... 15 more
in a pipeline running on Flink 1.14.3 (and also on Flink 1.14.4). The pipeline was running without issues up to today. After re -running the pipeline without changes in code (and on the running environment, theoretically) we are consistently encountering this issue. Any idea on this? Could be caused by a bug in state handling?Yannick - Co-Founder Flike
06/30/2022, 4:29 PMAlex Cruise
06/30/2022, 6:00 PMKrishna Chaithanya M A
06/30/2022, 6:16 PMSucheth Shivakumar
06/30/2022, 10:01 PMTrystan
07/01/2022, 12:49 AMRow-encoded Format
with a filesink using OnCheckpointRollingPolicy
and i ran into an exception during checkpoint thrown here. afaik there aren’t many levers i can turn in here, nor do i know why the parts fall outside these bounds. is there a config i should look for - perhaps the MPU threshold is too low and it’s creating 10k+ parts? what config would adjust that?E Leonard
07/01/2022, 8:45 AMHasan Masood
07/01/2022, 10:59 AMshuaiqi xu
07/01/2022, 11:47 AMEmile Alberts
07/01/2022, 1:04 PMjava.io.IOException: Class class com.amazonaws.auth.WebIdentityTokenCredentialsProvider does not implement AWSCredentialsProvider
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:662) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:605) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3375) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.14.5.jar:1.14.5]
at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:222) ~[?:?]
at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:207) ~[?:?]
at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala) ~[?:?]
at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:136) ~[?:?]
at io.delta.flink.sink.internal.committer.DeltaGlobalCommitter.commit(DeltaGlobalCommitter.java:215) ~[?:?]
This only happens when it is running in Kubernetes. The delta log file is successfully created when running Flink with its MiniCluster
. Any ideas of what the issue may be or can point me in the correct direction?
Thank you for the assistance in advanceGeorge Chen
07/01/2022, 7:00 PMCheckpointedFunction
interface, is snapshotState
API invoked some how in a synchronous fashion with the main api call (invoke/map/flatmap … etc)? It is reasonable to infer from the bufferedsink
example on the doc that it might be the case, but I hope there is some confirmationSree
07/02/2022, 1:49 PMshuaiqi xu
07/04/2022, 3:28 AMZain Haider Nemati
07/04/2022, 10:44 AMShqiprim Bunjaku
07/04/2022, 12:09 PMNithin kharvi
07/04/2022, 1:19 PMNithin kharvi
07/04/2022, 1:19 PMNithin kharvi
07/04/2022, 1:20 PM仰望星空
07/05/2022, 2:51 AMMrinal Ekka
07/05/2022, 6:05 AMorg.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
Caused by: java.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug. org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
Omar Izhar
07/05/2022, 9:16 AMflink version = 1.15
Hi Flink community,
I am trying to create a Flink application that processes incoming records from a DataStream and performs an aggregated lookup on a table. E.g. an incoming DataStream record with a batch_id has to be joined with the minimum delivery_priority for the same batch_id in the lookup table. The following example denotes what I want to achieve:
Incoming stream record: (TYPE: DATASTREAM)
batch_id, latitude, longitude
2134, 24.812, 67.0034
Lookup table: (TYPE: TABLE)
batch_id delivery_priority order_id
2134 4 197168
2134 2 197438
2134 1 201426
Result stream record:
batch_id, delivery_priority order_id latitude, longitude
2134 1 201426 24.812 67.0034
How can I extract extract a single record from the table after applying MIN(delivery_priority) aggregation and use it's data to enrich the incoming stream record?Mrinal Ekka
07/05/2022, 10:32 AMDataStream<Events> sideOutputStreamCustomerEvent = stringInputStream.flatMap(new RichFlatMapFunction<String, Events>() {
private TableEnvironment tableEnvironment;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.build();
tableEnvironment =TableEnvironment.create(settings);
}.......
The application is throwing the following error at "TableEnvironmentImpl.executeSql" while reading hudi table
java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:144)
at org.apache.calcite.rel.metadata.RelMetadataQuery.<init>(RelMetadataQuery.java:108)
at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.<init>(FlinkRelMetadataQuery.java:78)
at org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:59)
at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
at org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
at org.apache.calcite.rel.logical.LogicalFilter.create(LogicalFilter.java:108)
at org.apache.calcite.rel.core.RelFactories$FilterFactoryImpl.createFilter(RelFactories.java:344)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertWhere(SqlToRelConverter.java:1042)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
Jonathan Thomm
07/05/2022, 12:07 PM[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
I’m new to Flink. Any help would be great!
My sql query is:
CREATE TABLE interactions(
userId varchar,
`properties.id` varchar,
`properties.sessionId` varchar,
`itemId` AS COALESCE(`properties.sessionId`, `properties.id`),
`properties.hotSpotName` varchar,
`properties.name`varchar,
`hotSpotName`AS COALESCE(`properties.hotSpotName`, `properties.name`),
`timestamp` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'some-raw-events',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'properties.bootstrap.servers' = 'localhost:9092'
);