Hi all, I am learning flink but got stuck with the...
# troubleshooting
j
Hi all, I am learning flink but got stuck with the following operation. Let's say I have the following inputstream:
Copy code
env.fromElements(
            Tuple4.of("BRENT_FUTURE", "BOM", QuotationPriceType.ASK, 10.0),
            Tuple4.of("RBOB_FUTURE", "BOM", QuotationPriceType.ASK, 10.0),
            Tuple4.of("BRENT_FUTURE", "BOM", QuotationPriceType.BID, 20.0),
            Tuple4.of("BRENT_FUTURE", "BOM", QuotationPriceType.ASK, 15.0),
            Tuple4.of("BRENT_FUTURE", "BOM", QuotationPriceType.ASK, 20.0),
            Tuple4.of("RBOB_FUTURE", "BOM", QuotationPriceType.BID, 10.0));
I want to operate (BID/ASK)/2, grouping by the first field of the tuple, and do it every time I can calculate a new value, ending up with something like:
Copy code
env.fromElements(
        Tuple4.of("A", "BOM", QuotationPriceType.VALUE, 15.0),
        Tuple4.of("A", "BOM", QuotationPriceType.VALUE, 17.5),
        Tuple4.of("A", "BOM", QuotationPriceType.VALUE, 20.0),
        Tuple4.of("B", "BOM", QuotationPriceType.VALUE, 10.0));
any advice? thank you!
any ideas? thanks!
f
Hi Javi πŸ‘‹ You can use the
KeyedProcessFunction
to implement a function that takes the first parameter of the tuple as key, use the keyed state to keep track of the latest values, and finally output a tuple2 with the key and the new value