Hi all :wave: I am trying to convert a stream of ...
# troubleshooting
m
Hi all đź‘‹ I am trying to convert a stream of data to an upsert table. My understanding of this is that the upsert table will only have insert/update_after/delete events. However when I do this I also get update_before's. I'll put the code in the đź§µ. Am I misunderstanding or am I doing the conversion incorrectly? Appreciate any help! gratitude thank you
Copy code
ArrayList<Tx> txList = new ArrayList<>();
txList.add(new Tx("fe5642b1-7c1a-4033-b412-5809663882d3", 100, 1));
txList.add(new Tx("fe5642b1-7c1a-4033-b412-5809663882d3", 500, 1));

DataStream<Tx> txStream = streamExecutionEnvironment.fromData(txList);

Schema txSchema = Schema.newBuilder()
    .column("id", DataTypes.STRING().notNull())
    .column("amount", <http://DataTypes.INT|DataTypes.INT>().notNull())
    .column("userId", <http://DataTypes.INT|DataTypes.INT>().notNull())
    .primaryKey("id")
    .build();

Table t = streamTableEnvironment.fromDataStream(txStream, txSchema);
DataStream<Row> s = streamTableEnvironment.toChangelogStream(t, txSchema, ChangelogMode.upsert());

Table t2 = streamTableEnvironment.fromChangelogStream(s, txSchema, ChangelogMode.upsert());
streamTableEnvironment.createTemporaryView("txs", t2);

streamExecutionEnvironment.execute();

streamTableEnvironment.executeSql("select * from txs;").print();
When I run the above, the output of the last sql statement is :
Copy code
+----+--------------------------------+-------------+-------------+
| op |                             id |      amount |      userId |
+----+--------------------------------+-------------+-------------+
| +I | fe5642b1-7c1a-4033-b412-580... |         100 |           1 |
| -U | fe5642b1-7c1a-4033-b412-580... |         100 |           1 |
| +U | fe5642b1-7c1a-4033-b412-580... |         500 |           1 |
+----+--------------------------------+-------------+-------------+
When converting using upsert mode I would not expect that second -U row.
d
Might be slightly different than one might expect, but I think it’s normal. When you define ChangelogMode.UPSERT, this means the resulting changelog stream will represent changes to the table as INSERT and UPDATE (or DELETE followed by INSERT, which is effectively an UPDATE) events. However, it does not mean you will see only “net” changes after every event. You’ll see each individual change event up to final state. In your example: 1. The first row +I signifies an insert of a new record with ID fe5642b1-7c1a-4033-b412-5809663882d3, amount 100, and userId 1. 2. The-U row you’re seeing is actually a delete operation followed by an update (since UPSERT combines delete and insert into an update). It doesn’t mean the record is updated from 100 to 100. It signifies that the old version of the record with amount 100 has been logically deleted and immediately replaced with a new version in the next step. The reason why is you’re reusing the same key (id) in your data stream which triggers a deletion of the existing entry followed by insertion of the updated one. 3. The +U signifies an insert of the updated record with the same ID but with a new amount (500) and the same userId. So, the -U is correctly indicating the deletion of the previous version of the record before inserting the updated version in a single changelog sequence. If you only want to see the net effect you’d typically consume this changelog stream into a table or view that materializes these changes into a snapshot.
m
Draco, thanks for the super detailed answer! Few follow up thoughts: • This definitely feels counter intuitive at best and plain wrong at worst. The doc for upsert changelog mode says
Shortcut for an upsert changelog that describes idempotent updates on a key and thus does not contain RowKind.UPDATE_BEFORE rows
, should that doc be updated? (using flink 1.17 btw) • The use of the primary key is intended, this job is kind of a running total so when a tx get's updated with a new value we want that to be reflected in the changelog. So that part seems to be working as expected. •
If you only want to see the net effect you'd typically consume this changelog stream into a table or view that materializes these changes into a snapshot
- can you explain this a bit? This Sounds like another call to toChangelogStream followed by fromToChangelogStream, or is it something else?
d
I think you can create an issue regarding the documentation so that it receives attention for clarification.
Materializing the changes into a snapshot could be done by consuming the changelog stream and applying those changes to maintain a current snapshot of the data. In Flink, this would be done by using the changelog stream as the source for a table and querying that table. The Table API manages materialization internally.
Table is created from changelog stream with upsert mode. When table is queried it effectively will give you the latest state after all the changes were applied.