Abhinav sharma
08/09/2022, 1:50 PMAdesh Dsilva
08/09/2022, 3:35 PMdatastream.map(someFunctionThatDoesXandY).sinkTo(sink)
vs
datastream.map(someFunctionThatDoesX).map(someFunctionThatDoesY).sinkTo(sink)
I guess Flink will automatically chain the second sequence and both should give similar performance? Is there any usecase to split your code into multiple operators?Ty Brooks
08/09/2022, 3:59 PMStefan
08/09/2022, 6:32 PMJustin
08/09/2022, 8:34 PMJirawech Siwawut
08/09/2022, 9:03 PMCaused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
Derek Liu
08/10/2022, 7:21 AMJaya Ananthram
08/10/2022, 9:24 AM"The Flink Kubernetes Operator (Operator) extends the Flink Metric System"
so I assume it should support graphite. Am I right?
3. In the logs, I am seeing some exceptions while starting the job (that runs on top K8's operator) -
Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:35: "$internal.application.program-args: "
Is it something need to worry? or Just the dynamic configuration is skipped nothing to worry about the job behavior?Abhinav sharma
08/10/2022, 2:22 PMAbhinav sharma
08/10/2022, 2:36 PMAkhlaq Malik
08/10/2022, 3:39 PM.join(otherStream).where(seletectedMutalKey).equalTo(selectedMutalKey).window(TumblingEventTimeWindows.ofSeconds(Time.seconds(20)))
this is not emitting any data. when switching for e.g. to TumblingProcessingTimeWindows
it emits data. I've also configured the .withTimestampAssigner
to use the event time of the msg (long value in ms). Flink version 1.15.xIvan M
08/10/2022, 4:23 PMCause: Partitioned tables are not supported yet.
Flink version 1.15.0
Does it mean Flink doesn't support partitioned tables? If yes why it's in the documentation?sap1ens
08/10/2022, 4:46 PMRowKind
of the underlying Row/RowData object?Jin Yi
08/10/2022, 9:59 PMJirawech Siwawut
08/11/2022, 10:21 AMorg.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
Basically, I am trying to use HiveSink and get this. I am able to run on IntelliJ fine, but cannot make it work with Docker image. I only add flink-sql-connector-hive-2.3.6_2.12
to build shaded jar.Krishna Chaithanya M A
08/11/2022, 12:28 PMAdesh Dsilva
08/11/2022, 3:19 PMYou can test whether your class adheres to the POJO requirements via org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo() from the flink-test-utils.
Where do I find this? It doesnt seem to be there in flink-test-utils?Adrian Chang
08/11/2022, 6:14 PMSELECT
statement ?Aeden Jameson
08/11/2022, 7:42 PMiLem0n
08/11/2022, 8:48 PMSamin Ahbab
08/12/2022, 8:21 AMstring1
, and for those n only, find the m most occurring <string1,string2> tuples, but am not sure how this would be represented with Flink functions.
I was thinking that it would be done through doing multiple keyBy
and aggregation
. This feels like a simple case, but as I am new to flink, I am not really sure how to do this.
Has anyone have any idea?Aviv Dozorets
08/14/2022, 4:23 PMbroker.rack
on kafka side, client.rack
during client init, but still getting:
"[Consumer clientId=consumer-Consumer-perseus2-7, groupId=Consumer-perseus2] Discovered group coordinator kafka-us-east-1-stage-1.s1:9093 (id: 2147483646 rack: null)"
Tom Alkalak
08/15/2022, 10:48 AM1.14.3
and both flink-metrics-datadog
and flink-metrics-dropwizard
. I am trying to have control over which metrics are being sent to DataDog.
I have tried modified the JobManagers config to look like this:
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.excludes: <metricName>
metrics.reporter.dghttp.useSubstringMatching: true
but unfortunately I was unable to filter out the metrics I wanted.
does anyone have a clue on if its even possible and how to achieve it?Mustafa Akur
08/15/2022, 10:54 AMPrasanth Kothuri
08/15/2022, 1:39 PMtestHarness.getOutput.poll().asInstanceOf[myCaseClass].operatorDecision should contain ("threat")
gives
org.apache.flink.streaming.runtime.streamrecord.StreamRecord cannot be cast to stats.myCaseClass
any clues on how to convert the output to what i want ? thanks a tonRon Cohen
08/15/2022, 2:46 PMHashMap<String, String>
and java.util.ImmutableCollections
map, but getting
it cannot be used as a POJO type and must be processed as GenericTypeI've read https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/serialization/types_serialization/ and tried to add type hints, but to no avail. Is it correctly understood that Flink does not support native serialization of any kind of map?
Krish Narukulla
08/15/2022, 3:46 PMdefault_database
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(sourceSql)
tEnv.executeSql(sinkSql)
tEnv.sqlQuery("insert into <sink table> select .... from <source table>")
env.execute("FlinkApp")
Stefan
08/15/2022, 4:51 PMval salesOrderFactory =
s"""
|SELECT VBAK.MANDT || VBAK.VBELN AS ID,
| 'Sales Order' AS Type,
| TO_TIMESTAMP(VBAK.ERDAT || ' ' || VBAK.ERZET, 'yyyy-MM-dd HH:mm:ss') AS CreationTime,
| CAST (NULL AS TIMESTAMP) AS DeletionTime,
| VBAK.ERNAM AS Creator,
| VBAK.OBJNR AS ObjectNumber,
| VBAK.AUART AS SalesDocumentTypeID,
| TVAKT.BEZEI AS SalesDocumentType,
| VBAK_SQ.MANDT || VBAK_SQ.VBELN AS QuotationID,
| T001.BUTXT AS CompanyName,
| VBUK.GBSTK AS Status,
| VBUK.BESTK AS OrderConfirmationStatus,
| VBKD.VSART AS ShippingType
|FROM VBAK AS VBAK
| LEFT JOIN VBUK AS VBUK
| ON VBAK.MANDT = VBUK.MANDT
| AND VBAK.VBELN = VBUK.VBELN
| LEFT JOIN VBKD AS VBKD
| ON VBAK.MANDT = VBKD.MANDT
| AND VBAK.VBELN = VBKD.VBELN
| AND VBKD.POSNR = '000000'
| LEFT JOIN VBAK AS VBAK_SQ
| ON VBAK.MANDT = VBAK_SQ.MANDT
| AND VBAK.VGBEL = VBAK_SQ.VBELN
| AND VBAK_SQ.VBTYP = 'B'
| LEFT JOIN T001
| ON VBAK.MANDT = T001.MANDT
| AND VBAK.BUKRS_VF = T001.BUKRS
| LEFT JOIN TVAKT AS TVAKT
| ON VBAK.MANDT = TVAKT.MANDT
| AND VBAK.AUART = TVAKT.AUART
| AND TVAKT.SPRAS = 'E'
|WHERE VBAK.VBTYP = 'C';
|""".stripMargin
val factoryQueryTable = tenv.sqlQuery(salesOrderFactory)
val salesOrderTable = tenv.from("SALESORDER")
val stream = tenv.toChangelogStream(
factoryQueryTable,
Schema.newBuilder().fromResolvedSchema(salesOrderTable.getResolvedSchema).build(),
ChangelogMode.newBuilder.addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(
RowKind.DELETE,
).build,
)
stream.print()
When i update a record on TVAKT I get the following output:
>-D[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc, 0, ZSV, BEZEI2, null, Company, GBSTK3, BESTK2, VSART3]
>+I[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc, 0, V, null, null, Company , GBSTK3, BESTK2, VSART3]
>-D[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc, 0, V, null, null, Company, GBSTK3, BESTK2, VSART3]
>+I[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc, 0, V, BEZEI2_updated, null, Company, GBSTK3, BESTK2, VSART3]
I would expect to have only one +I (insert) or actually an +U(pdate after), so the first 3 entries I don't understand why they are created, ultimately I updated a record in TVAKT and there was no delete?
In Kafka streams, I will get one record in the output with the updated value. How can I achieve the same with Flink? ThanksSaurabh Khare
08/15/2022, 7:38 PMSource_
and Sink_
respectively.
Though we see the metrics for source and sink operator in Flink UI it does not reflect in wavefront. I do see all the metrics correctly for the intermediate operator in wavefront. Can someone help me with some insights. Let me know if you need more details from my end. ThanksIldar Almakaev
08/15/2022, 8:10 PMOrders
will be stateless?
2. What about the function rates
? Will it delete old records from RateHistory
table since its old records are not relevant for a new joins with Orders
for a given PK?
3. How Flink manages the state under the hood?
Overall, I’ve the following use-case.
There is transactions data (hot Kafka topic, mostly append-only) which should be enriched with users data (slow-changing table).
In KafkaStreams/ksqlDB it would be Stream-Table join. So I’d like to implement similar logic using Flink.
What do you think about using Temporal Table Function for users
table with a timestamp column PROCTIME()
and then join with transactions table?