Varun Sayal
10/20/2022, 2:35 PMEric Xiao
10/20/2022, 2:38 PMDataStream
? i.e.
val source: DataStream[...] = ...
val inputTable = tableEnv.fromDataStream(rows)
When we printed out the schema we noticed it had set all the string columns to nullable and all the integer/timestamp columns to not nullable.Nikhil Mishra
10/20/2022, 5:42 PMSlackbot
10/20/2022, 9:06 PMMatt Fysh
10/21/2022, 12:55 AMDataStream[RowData]
type to pass into a Delta Sink, but the following is not working, I think because RowData is an abstract class. Does anyone know which class I should be using here?
kinesis.map(value => {
val jsonNode: JsonNode = jsonParser.readValue(value, classOf[JsonNode])
new RowData(jsonNode.get("user_id").toString(), 1)
})
chunilal kukreja
10/21/2022, 4:39 AMSlackbot
10/21/2022, 6:23 AMSoumya Ghosh
10/21/2022, 7:33 AMFROM flink:1.13.6-scala_2.12-java11
RUN mkdir -p $FLINK_HOME/usrlib
RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.13.6/flink-sql-avro-confluent-registry-1.13.6.jar> -P $FLINK_HOME/lib/
RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch7_2.12/1.13.6/flink-connector-elasticsearch7_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base_2.12/1.13.6/flink-connector-elasticsearch-base_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
RUN mkdir $FLINK_HOME/plugins/s3-fs-presto && cp $FLINK_HOME/opt/flink-s3-fs-presto-1.13.6.jar $FLINK_HOME/plugins/s3-fs-presto/
RUN mkdir $FLINK_HOME/plugins/s3-fs-hadoop && cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.13.6.jar $FLINK_HOME/plugins/s3-fs-hadoop/
RUN mkdir $FLINK_HOME/plugins/s3 && cp $FLINK_HOME/opt/flink-s3-fs-presto-1.13.6.jar $FLINK_HOME/plugins/s3/ && cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.13.6.jar $FLINK_HOME/plugins/s3/
COPY my-project/target/jars/my-project-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/my-project-1.0-SNAPSHOT-jar-with-dependencies.jar
Eventually I kept adding more and more transitive dependencies in docker image and after a point task manager is boots and fails with ClassNotFoundException for ClassDefFoundError: Could not initialize class org.elasticsearch.client.RestClient
Any idea on how address this? As per flink’s documentation external connectors are not part flink distribution and should be either added in project’s fat jar or be placed in flink lib path.Bastien DINE
10/21/2022, 8:26 AMTypeExtractor#registerFactory(Type, Class)
)."
The registerFactory does not exist anymore
How can I register a global factory ?
https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.htmlRashmin Patel
10/21/2022, 9:48 AMJasmin Redzepovic
10/21/2022, 12:03 PMTiansu Yu
10/21/2022, 1:40 PMException in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.java.functions.KeySelector' interface. Otherwise the type has to be specified explicitly using type information.
In general is it a bad idea to use Tuple2 + Lambda in keySelector at all?
.keyBy(u -> new Tuple2<>(u.getUserId(), u.getProductId()))
RM
10/21/2022, 3:27 PMtaskmanager.memory.network.max
java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 65536 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
Curious to know if there is an easier way to pre-determine the minimum network buffers needed as we scale out the job. Tried following this link https://stackoverflow.com/questions/49283934/flink-ioexception-insufficient-number-of-network-buffers but the math doesn't add up. #slots-per-TM^2 * #TMs * 4 * #no_of_shuffles
==> ( 8 * 8 ) * 10 * 4 * 10 ?Abel Lamjiri
10/21/2022, 10:23 PMstatus:
conditions:
- lastProbeTime: null
lastTransitionTime: "2022-10-21T22:07:46Z"
message: '0/3 nodes are available: 1 node(s) had volume node affinity conflict,
2 Insufficient cpu, 2 Insufficient memory.'
reason: Unschedulable
status: "False"
type: PodScheduled
phase: Pending
qosClass: Guaranteed
Although CA plugin is there, it does not add new nodes.
CA does work fine if I scale up any normal deployment though.Matt Fysh
10/21/2022, 11:00 PMobject MyJob {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements("a", "b", "c")
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("<s3://myflink-sandbox/tmp/d1>"), new SimpleStringEncoder[String]("UTF-8"))
.build()
input.addSink(sink)
env.execute()
}
}
MyJob.main(Array())
Thiruvenkadesh Someswaran
10/22/2022, 6:21 PMKrish Narukulla
10/22/2022, 6:59 PMarray
datatype using Flink SQL? I want to query people
from below protobuf.
message PersonRecord {
string name = 1;
int32 age = 2;
enum PhoneType {
MOBILE = 0;
HOME = 1;
WORK = 2;
}
message PhoneNumber {
string number = 1;
PhoneType type = 2;
}
repeated PhoneNumber phones = 4;
}
// Our address book file is just one of these.
message AddressBookRecord {
repeated PersonRecord people = 1;
}
Akashkiran Shivakumar
10/22/2022, 11:12 PMOwen Lee
10/24/2022, 6:02 AMding bei
10/24/2022, 6:07 AMsalvalcantara
10/24/2022, 9:40 AMnull
on the deserializer, it makes sense to consider the same outcome/strategy on the serializer. However, as explained in the ticket, returning null
on the serializer makes the job crash at runtime (instead of simply skipping the message).haim ari
10/24/2022, 11:58 AMSteven Zhang
10/24/2022, 6:38 PMFROM apache/flink-kubernetes-operator:1.2.0
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
COPY flink-s3-fs-presto-1.15.2.jar $FLINK_PLUGINS_DIR/flink-s3-fs-presto/
And I deployed my custom image into my k8s cluster. When I exec into the operator pod, I see the plugin present
flink@flink-kubernetes-operator-855bb64d96-t6ztb:~/plugins$ pwd
/opt/flink/plugins
flink@flink-kubernetes-operator-855bb64d96-t6ztb:~/plugins$ ls
flink-metrics-datadog flink-metrics-graphite flink-metrics-influxdb flink-metrics-jmx flink-metrics-prometheus flink-metrics-slf4j flink-metrics-statsd flink-s3-fs-presto
but my sessionJob still fails with
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory.
Zgeorge2
10/24/2022, 7:00 PMclass Foo {
String attr; // will be used for the first keyBy
String global; // several foo instances with the same attr value will have a common global value
}
class ReducedFoo {
Set<Foo> fooList;
String key; // all the fooList Foo instances have the same key
String global; // several foo instances with the same attr value will have a common global value
}
class GlobalFoo {
Set<Foo> fooList;
String global; // several foo instances with the same attr value will have a common global value
}
Assume the following pipeline:
Ex. SRC<Foo> --> Some Oper1<Foo> --> keyBy(Foo::attr) --> reduce(ReducedFoo with Key = Foo::attr)
My understanding of the above is that at the end of the above pipeline, depending on the number of unique Foo::attr
values in the actual stream that's processed, I would get as many ReducedFoo
instances (that carry the same Foo::attr
value)
e.g., assume that the stream has the following objects of class Foo
generated at the SRC<Foo>
starting point:
Foo1(attr = AA; global = g1)
Foo2(attr = BB; global = g1)
Foo3(attr = CC; global = g1)
Foo4(attr = AA; global = g1)
Foo5(attr = BB; global = g1)
Notice that all the Foo
instances have the same value for global
.
Hence, assume also that the reduce
operation at the end of the pipeline creates the following:
ReducedFoo( List.of(Foo1, Foo4); key = AA; global = g1 )
ReducedFoo( List.of(Foo2, Foo5); key = BB; global = g1 )
ReducedFoo( List.of(Foo3) ; key = CC; global = g1 )
What I'd like to do here is "join" the "keyedStreams" that produce the three ReducedFoo
instances into a single GlobalFoo
.
e.g., in the above pipeline - only one GlobalFoo
would be produced (not 3, one for each unique value of attr
):
GlobalFoo( List.of(Foo1, Foo2, Foo3, Foo4, Foo5); global = g1 )
In other words - is there a way to take the pipeline and do a "keyBy" on the "global" attribute in ReducedFoo
- so that the final result from the pipeline is a SINGLE GlobalFoo
for each value of global
.
Is this possible - or am I completely misunderstanding what can be done with stream processing?
(code snippets available are in the thread -> )Erwin Cabral
10/24/2022, 8:55 PMMatt Fysh
10/25/2022, 12:51 AMapache-flink
to install on my machine, as it wants to build pandas and numpy from source which takes a long time and then eventually throws build errors (numpy). Does anyone know an existing docker env I can use to run pyflink?Matt Fysh
10/25/2022, 4:13 AMPrathit Malik
10/25/2022, 5:24 AMA join B---\
\
\
A join C------>---->E
/
/
A join D-->/
A is common source being joined with source B, C, D while E is a common sink (i am using union all to combine above 3 streams into 1 and writing in sink)
• A is initialised using Datastream API while B,C,D using table API
• For join operation, A is being converted to a view by datastream to table API conversion.
But when running the job, 2 separate DAGs are created, 1 with operators created with datstream API while other created using table API , is there a way to chain them in a single DAG ?
Thanks in advance !Matt Fysh
10/25/2022, 6:43 AMThe precision specified in DataTypes.TIMESTAMP(p) must be 3 currently
Ivan M
10/25/2022, 10:02 AM