Mitchell Jeppson
08/16/2024, 5:10 PMMitchell Jeppson
08/16/2024, 5:11 PMArrayList<Tx> txList = new ArrayList<>();
txList.add(new Tx("fe5642b1-7c1a-4033-b412-5809663882d3", 100, 1));
txList.add(new Tx("fe5642b1-7c1a-4033-b412-5809663882d3", 500, 1));
DataStream<Tx> txStream = streamExecutionEnvironment.fromData(txList);
Schema txSchema = Schema.newBuilder()
.column("id", DataTypes.STRING().notNull())
.column("amount", <http://DataTypes.INT|DataTypes.INT>().notNull())
.column("userId", <http://DataTypes.INT|DataTypes.INT>().notNull())
.primaryKey("id")
.build();
Table t = streamTableEnvironment.fromDataStream(txStream, txSchema);
DataStream<Row> s = streamTableEnvironment.toChangelogStream(t, txSchema, ChangelogMode.upsert());
Table t2 = streamTableEnvironment.fromChangelogStream(s, txSchema, ChangelogMode.upsert());
streamTableEnvironment.createTemporaryView("txs", t2);
streamExecutionEnvironment.execute();
streamTableEnvironment.executeSql("select * from txs;").print();
Mitchell Jeppson
08/16/2024, 5:11 PM+----+--------------------------------+-------------+-------------+
| op | id | amount | userId |
+----+--------------------------------+-------------+-------------+
| +I | fe5642b1-7c1a-4033-b412-580... | 100 | 1 |
| -U | fe5642b1-7c1a-4033-b412-580... | 100 | 1 |
| +U | fe5642b1-7c1a-4033-b412-580... | 500 | 1 |
+----+--------------------------------+-------------+-------------+
Mitchell Jeppson
08/16/2024, 5:12 PMD. Draco O'Brien
08/16/2024, 7:29 PMMitchell Jeppson
08/16/2024, 8:26 PMShortcut for an upsert changelog that describes idempotent updates on a key and thus does not contain RowKind.UPDATE_BEFORE rows
, should that doc be updated? (using flink 1.17 btw)
• The use of the primary key is intended, this job is kind of a running total so when a tx get's updated with a new value we want that to be reflected in the changelog. So that part seems to be working as expected.
• If you only want to see the net effect you'd typically consume this changelog stream into a table or view that materializes these changes into a snapshot
- can you explain this a bit? This Sounds like another call to toChangelogStream followed by fromToChangelogStream, or is it something else?D. Draco O'Brien
08/17/2024, 8:44 AMD. Draco O'Brien
08/17/2024, 8:47 AMD. Draco O'Brien
08/17/2024, 8:52 AM