Alex Brekken
04/18/2023, 7:11 PMMiniClusterWithClientResource
. At a high level, my pipeline does a map operation, then a keyBy, and then a process function immediately after the keyBy. The process function contains an aggregator that puts records into a state store. In my integration test, I’m feeding in 2 records using env.fromElements()
. With parallelism set to 1, the test passes every time. With parallelism set to 2 (or really anything greater than 1), the test will intermittently fail. After debugging, what I’m seeing is that occasionally my process function (which extends KeyedProcessFunction) will get the 2nd test record first, in other words - the input records occasionally arrive out-of-order. (I have logic in the process function that expects these records to arrive in the correct order, which is why the test fails)
After arriving at the keyBy function, both test records get re-keyed to the same key. My assumption was that they should get processed in timestamp order by the subsequent process function, even if the record with the higher timestamp arrives first. In my test, I’m using a watermark strategy of boundedOutOfOrderness with a duration of 20 seconds. I’ve checked the elements in the process function by debugging, and they do have timestamps set on each record, and the timestamps are correct.
I’m not sure if I’m doing something wrong, or misunderstanding how ordering works. When I deploy this same application “for real” in a Flink cluster (where the source data is actually Kafka), I don’t see this issue at all. I only see the problem in the integration test. Thanks for any ideas or advice!Z Mario
04/19/2023, 3:30 AMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
namespace: default
name: basic-example
spec:
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
How do I add the beam worker pool in this yaml file, or how should I create it?
Would greatly appreciate your support!Soumya Ghosh
04/19/2023, 6:54 AMelasticsearch-7
) table connector and exploring the below configurations mentioned in document.
• connection.request-timeout
• connection.timeout
• socket.timeout
When I use above connector options in Flink table DDL, I’m getting error that above options are not supported. Is there a specific way to utilize these configs?
Error:
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'elasticsearch-7'.
Unsupported options:
connection.request-timeout
connection.timeout
socket.timeout
Supported options:
connection.path-prefix
connector
document-id.key-delimiter
failure-handler
format
hosts
index
json.encode.decimal-as-plain-number
json.fail-on-missing-field
json.ignore-parse-errors
json.map-null-key.literal
json.map-null-key.mode
json.timestamp-format.standard
password
property-version
sink.bulk-flush.backoff.delay
sink.bulk-flush.backoff.max-retries
sink.bulk-flush.backoff.strategy
sink.bulk-flush.interval
sink.bulk-flush.max-actions
sink.bulk-flush.max-size
sink.flush-on-checkpoint
username
at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:631)
at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:923)
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.createDynamicTableSink(Elasticsearch7DynamicSinkFactory.java:93)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:266)
... 18 more
Alokh P
04/19/2023, 8:47 AMCaused by: org.apache.parquet.io.ParquetEncodingException: empty fields are illegal, the field should be ommited completely instead
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endField(MessageColumnIO.java:329)
at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$ArrayWriter.write(ParquetRowDataWriter.java:420)
at org.apache.flink.formats.parquet.row.ParquetRowDataWriter$RowWriter.write(ParquetRowDataWriter.java:456)
As far as I have checked we can avoid this by using AvroParquetWriter and providing the config parquet.avro.write-old-list-structure
as false.
Can anybody here help on how this can be achieved ?Felix Angell
04/19/2023, 10:33 AMrelease-1.15.2-rc2
differ from release-1.15
?
i've noticed that rc2
does not contain the FlinkKinesisConsumer pyflink bindings whereas release-1.15
does.Thijs van de Poll
04/19/2023, 10:57 AM-D[1,83819211,545900928]
+I[-83819211,83819211,545900928]
This is expected, it should remove the record having primary key 1, and insert the record having primary key -83819211.
If I check the database after this change, I see the following:
-----------+----------------+--------------------
-83819211 | 83819211 | 545900928
1 | 83819211 | 545900928
It suggests that the delete statement has not been executed. Anyone that can help?Peter Esselius
04/19/2023, 1:31 PMDataStream[RowData]
from a connector in Scala, but having no idea how to use it. 😕
I enjoyed the rockthejvm-course a lot, so I'm very motivated like to figure this out!
Previously I've dabbled in Debezium & kafka as input, but my current input is a delta lake table, the connector doesn't support the table api, so I guess I either need to turn the stream into a DataStream[Row]
to do enable using tEnv.fromChangelogstream()
or figure out how to specify schema with tEnv.fromDataStream()
, but how?
Can I avoid specifying schema when the delta lake connector supposedly already knows what the schema is?
I also wanted to turn DataStream[RowData]
into DataStream[MyCaseClass]
somehow, hopefully without manually pointing out and casting field-by-field in a ds.map()
I've tried is using CaseClassConverter
and RowRowConverter
, but I couldn't really understand how to use either of them 😖
Am I missing something? Am I failing to progress because I'm trying too hard to avoid retyping the input schema multiple times?
The DataStream API Integration
docs mention RowData
-> Row
conversion, but I haven't been able to find any examples 😕
Is using Scala + Case Classes making it worse? I can switch to using the third party scala api implementation if that helps, or switch to java or kotlinFelix Angell
04/19/2023, 2:56 PM2023-04-19 15:49:21
java.lang.NoSuchMethodError: 'org.apache.flink.metrics.MetricGroup org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()'
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:765)
we have a pyflink app that bundles with a fat jar of some extra utilities where we do stuff in java-land, e.g. deserialisation stuff, and so on.
flink-streaming-java
is the only dependency in the POM.xml file that is marked as provided. our dependencies are: flink-streaming-java, flink-java, flink-core, flink-connector-kafka, kafka-clients (2.8.1).
everything is set to be version 1.15.2 (other than the kafka-clients).
any ideas what is up here? thanks!Thijs van de Poll
04/19/2023, 3:40 PM# This works
table.select(col("parties_id"), col("publication_id"), col("nested_obj"))
# This works
table.select(col("nested_obj").get("value_from_nested"))
# This does NOT work
table.select(col("parties_id"), col("publication_id"), col("nested_obj").get("value_from_nested"))
The last one raises:
py4j.protocol.Py4JJavaError: An error occurred while calling o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
at java.base/java.util.Objects.checkIndex(Unknown Source)
at java.base/java.util.ArrayList.get(Unknown Source)
at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:975)
at org.apache.calcite.rex.RexProgramBuilder$RegisterInputShuttle.visitLocalRef(RexProgramBuilder.java:924)
at org.apache.calcite.rex.RexLocalRef.accept(RexLocalRef.java:75)
at org.apache.calcite.rex.RexShuttle.visitFieldAccess(RexShuttle.java:198)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:904)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitFieldAccess(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:889)
at org.apache.calcite.rex.RexProgramBuilder$RegisterShuttle.visitCall(RexProgramBuilder.java:887)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.rex.RexProgramBuilder.registerInput(RexProgramBuilder.java:295)
at org.apache.calcite.rex.RexProgramBuilder.addProject(RexProgramBuilder.java:206)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:224)
at org.apache.calcite.rex.RexProgram.create(RexProgram.java:193)
at org.apache.flink.table.planner.plan.rules.logical.PythonCalcSplitRuleBase.onMatch(PythonCalcSplitRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:64)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:78)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Can anyone help?Yaroslav Bezruchenko
04/19/2023, 4:42 PMclass AttributePretraining(ProcessFunction):
def process_element(self, value, ctx: 'ProcessFunction.Context'):
id = value['id']
training_dto = TrainingDTO(id)
return training_dto
But I got TypeError: AttributePretraining.open() missing 1 required positional argument: 'runtime_context'
Any idea why is this happeningKevin L
04/19/2023, 6:17 PMt_row = t.select(row("key", "value").alias('my_row')
I expect this schema to be (when I call t_row.get_schema()
)
root
|-- my_row : ROW<`key` VARCHAR NOT NULL> NOT NULL
but what I get is (see screenshot)
how to I get the expected schema?Nathanael England
04/19/2023, 6:27 PMSimpleStringSchema
but I don't want my bytes stringified.Eric Xiao
04/19/2023, 9:05 PMsetParallelism
calls to setMaxParallelism
, I noticed for sinks (DataStreamSink
), there is no setMaxParallelism
function. We do not want to use a global parallelism setting to control the sink's parallelism as other operators will have a higher parallelism count.
Is there another way we can set a max parallelism on a sink in our pipeline?Elizaveta Batanina
04/20/2023, 11:36 AM./bin/sql-client.sh
I cannot see tables created but job (job is successfull) and all . Is there a better way to do this?Max Dubinin
04/20/2023, 3:57 PMՎահե Քարամյան
04/20/2023, 4:04 PMMax Dubinin
04/20/2023, 4:53 PMAli Zia
04/20/2023, 5:12 PMakka.ask.timeout
akka.lookup.timeout
akka.tcp.timeout
heartbeat.interval
heartbeat.rpc-failure-threshold
heartbeat.timeout
Zachary Schmerber
04/20/2023, 5:38 PMstatic void runJob() throws Exception {
KafkaSource<Message> source =
KafkaSource.<Message>builder()
.setBootstrapServers(BROKER)
.setTopics(TOPIC)
.setGroupId("KafkaExactlyOnceRecipeApplication")
.setStartingOffsets(
OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(Message.class))
.build();
Rion Williams
04/20/2023, 5:42 PMENV GCS_CONNECTOR_VERSION="hadoop2-2.1.1"
ENV FLINK_HADOOP_VERSION="2.8.3-10.0"
ENV GCS_CONNECTOR_NAME="gcs-connector-${GCS_CONNECTOR_VERSION}.jar"
ENV GCS_CONNECTOR_URI="$artifactStorage/${GCS_CONNECTOR_NAME}"
ENV FLINK_HADOOP_JAR_NAME="flink-shaded-hadoop-2-uber-${FLINK_HADOOP_VERSION}.jar"
ENV FLINK_HADOOP_JAR_URI="$artifactStorage/flink-shaded-hadoop-2-uber/${FLINK_HADOOP_VERSION}/${FLINK_HADOOP_JAR_NAME}"
I was hoping to upgrade this to a more recent version but I’m struggling to find the latest shaded plugin. I see these Flink Shaded Hadoop 3 Uber ones within Maven Central, but I’m not entirely sure what version of the connector or Hadoop that those would correspond to.
Any advice? Just trying to upgrade this to the latest supported version as of 1.15 while I’m already touching the job and related things about the deployment.Zachary Piazza
04/20/2023, 8:46 PMflinkConfiguration:
collect-sink.batch-size.max: "8388608"
taskmanager.numberOfTaskSlots: "2"
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: "9999"
Do I need to use the podTemplate attribute of the FlinkDeployment spec to add the additional port mapping?
EDIT I figured it out. The config properties mentioned in the docs were incorrect. And I added an additional container port using the podTemplate param in the spec.Amenreet Singh Sodhi
04/21/2023, 5:38 AM苏超腾
04/21/2023, 7:36 AM张奇文
04/21/2023, 7:44 AMMaryam
04/21/2023, 3:41 PMUrs Schoenenberger
04/21/2023, 3:57 PMCountTumblingWindowAssigner
recently - it is backed by a non-purging CountTrigger
and therefore produces a memory leak unless one replaces the trigger by a purging one using .trigger()
. Are we missing something? Is there a usecase where one can use CountTumblingWindowAssigner
with its current default trigger? I feel the default should be FIRE_AND_PURGE here, but wanted to ask before I file a JIRA.Daniel Craig
04/21/2023, 4:05 PMKryo serializer scala extensions are not available
in my flink application's logs, should I be concerned about this?Felix Angell
04/21/2023, 4:39 PMIt does not work for legacy or if applied after the source via DataStream#assignTimestampsAndWatermarks.is another thing noted. does this mean that if i specify a watermarkstrategy via
assign_timestamps_and_watermarks
it will not let me use watermark alignment?
does this mean i need a watermark strategy per source? since we are technically re-using the strategy and then passing it in e.g. assign_timestamps_and_watermarks(watermark_strategy)
last question. is there any rule of thumb for how many seconds i should set my max_drift + watermark_interval? the docs specify 20 seconds and 1 second respectively. we have events that can be as late as 10-30 minutes. would this mean we are throttling sources for that long?
thanks!Zachary Piazza
04/22/2023, 3:46 PMRICHARD JOY
04/23/2023, 2:38 AM