Kishan Huliyar Jagadeesh
03/13/2023, 2:45 PMTheodore Curtil
03/13/2023, 4:07 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: basic-session-job-only-example
spec:
deploymentName: basic-session-deployment-only-example
job:
jarURI: https://<private-github-repo-file>
parallelism: 3
upgradeMode: stateless
state: running
I know you can curl a raw file using github personal access tokens; but i dont think the FlinkSessionJob CRD allows us to run commands (like curling the file from private repo).
Thanks for your helpAndrew Otto
03/13/2023, 4:35 PMGerald
03/13/2023, 5:01 PMCREATE TABLE Person (
Id int,
Firstname string,
Lastname string,
DateOfBirth date,
primary key (Id) not enforced
) WITH ( ... )
CREATE TABLE Address (
Id int,
Street string,
ZipCode string,
City string,
Country string,
PersonId int,
primary key (Id) not enforced
) WITH ( ... )
It doesn't matter what gets updated (Person
or Address
), for both source triggers I want to emit a new combined PersonUpdate
event that contains information about the person and its corresponding addresses:
CREATE TABLE PersonUpdate (
Id int,
Firstname string,
Lastname string,
DateOfBirth date,
Addresses ARRAY<ROW(Street string, City string, ZipCode string, Country string)>,
primary key (Id) not enforced
) WITH (...)
One option I read about would be to use temporal joins, having the right side of the join doing lookups on for example the Address table (getting all addresses for a specific person X). However, as far as I understood the semantics of temporal joins, the join wouldn’t trigger if there is only an update on the Address table (e.g., an address is added with PersonId X)?
This brings me to the question, whether in such cases I would need to duplicate the same query, but this time doing the temporal lookup on the Person table? But what if I need to join multiple tables to get all the enrichment info for my event? Would I need to duplicate the query multiple times and flip the join directions to set all the required triggers?
Another alternative I currently see would be to rely on the DataStream API by connecting all involved source streams, creating KeyedStreams and then do the state work on my own (e.g., in .process(...)
) using ValueState or MapState etc.. Would that be a better performing or recommended solution compared to the one using temporal joins?
Or did I miss any other alternative? What I clearly want to avoid are lookup joins doing the lookup on the database again.Thijs van de Poll
03/13/2023, 7:04 PMCREATE TABLE group (
group_id INT NOT NULL,
representative INT,
score INT,
PRIMARY KEY (group_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:<postgresql://host.docker.internal:5432/doc_data>',
'table-name' = 'xml.t_group',
'username' = 'root',
'password' = 'root',
'driver' = 'org.postgresql.Driver'
);
The insert statement:
INSERT INTO group (group_id, representative, score) SELECT * FROM tmp_docs
ON CONFLICT (group_id) DO UPDATE SET
representative = CASE WHEN group.score >= tmp_docs.score THEN group.representative ELSE tmp_docs.representative;
According to the JDBC docs this is how to do it in Postgres https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/
But instead of it working, I get the following error message:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ON'
Or does it mean that Flink uses this logic internally, and it it not possible to customize the ON CONFLICT
clause?Amir Hossein Sharifzadeh
03/13/2023, 9:23 PMdataFileChunk.data
. In Python deserialization, they used "unpackb"
and I used the same component in Java for deserialization but I get different results vs Python deserialization: This is my deserialization class in Java:
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.msgpack.core.MessageBufferPacker;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.value.ImmutableValue;
import org.msgpack.value.Value;
import org.msgpack.value.ValueType;
public class DataFileChunkDeserializer extends AbstractDeserializationSchema<DataFileChunk> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) {
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
}
@Override
public DataFileChunk deserialize(byte[] message) throws IOException {
// List<DataFileChunk> dataFileChunks = new ArrayList<>();
DataFileChunk dataFileChunk = new DataFileChunk();
MessageDigest md = null;
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(message);
if (unpacker.hasNext()) {
try {
md = MessageDigest.getInstance("SHA-512");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
ImmutableValue value = unpacker.unpackValue();
List<Value> list = value.asArrayValue().list();
dataFileChunk.filename = "" + list.get(0);
dataFileChunk.chunk_hash = "" + list.get(2);
dataFileChunk.chunk_i = Long.parseLong(String.valueOf(list.get(4)));
dataFileChunk.n_total_chunks = Long.parseLong(String.valueOf(list.get(5)));
dataFileChunk.subdir_str = "" + list.get(6);
dataFileChunk.filename_append = "" + list.get(7);
ValueType vt = list.get(8).getValueType();
dataFileChunk.data = list.get(8).asBinaryValue();
dataFileChunk.dataByteArray = dataFileChunk.data.asByteArray();
assert md != null;
try {
byte[] sh1 = MessageDigest.getInstance("SHA-512").digest(dataFileChunk.dataByteArray);
byte[] sh2 = dataFileChunk.chunk_hash.getBytes(StandardCharsets.UTF_8);
System.out.println();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
return dataFileChunk;
}
}
and this is part of my pom.xml:
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.18</version>
</dependency>
DataFileChunk class:
import org.msgpack.value.BinaryValue;
import org.msgpack.value.RawValue;
public class DataFileChunk {
public long chunk_i;
public String filename;
public String chunk_hash;
public String chunk_offset_write;
public long n_total_chunks;
public String subdir_str;
public String filename_append;
public RawValue data;
public byte[] dataByteArray;
public DataFileChunk() {
}
public DataFileChunk(long chunk_i, String filename) {
this.chunk_i = chunk_i;
this.filename = filename;
}
public DataFileChunk(long chunk_i, String filename, String chunk_hash, String chunk_offset_write,
long n_total_chunks, String subdir_str, String filename_append, BinaryValue data) {
this.chunk_i = chunk_i;
this.filename = filename;
this.chunk_hash = chunk_hash;
this.chunk_offset_write = chunk_offset_write;
this.n_total_chunks = n_total_chunks;
this.subdir_str = subdir_str;
this.filename_append = filename_append;
this.data = data;
}
@Override
public String toString() {
return "Event{" + "chunk_i=" + chunk_i + ", n_total_chunks=" + n_total_chunks +
", filename='" + filename + '\'' +
", chunk_hash='" + chunk_hash + '\'' +
", chunk_offset_write='" + chunk_offset_write + '\'' +
", subdir_str='" + subdir_str + '\'' +
", filename_append='" + filename_append + '\'' +
", data='" + data + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DataFileChunk dataFileChunk = (DataFileChunk) o;
return chunk_i == dataFileChunk.chunk_i &&
n_total_chunks == dataFileChunk.n_total_chunks &&
filename.equals(dataFileChunk.filename) &&
chunk_hash.equals(dataFileChunk.chunk_hash) &&
chunk_offset_write.equals(dataFileChunk.chunk_offset_write) &&
subdir_str.equals(dataFileChunk.subdir_str) &&
filename_append.equals(dataFileChunk.filename_append) &&
data.equals(dataFileChunk.data);
}
}
Does anyone have experience of deserialization with msgpack?Amir Hossein Sharifzadeh
03/14/2023, 1:55 AMDataFileChunk
class, I will have to change the types for chunk_hash
and data
from String
to BinaryValue
. In my deserialize
method, I should hash byte array of data
and the compare it with the byte array of chunk_hash
:
dataFileChunk.data = list.get(8).asBinaryValue();
output.write(dataFileChunk.data.asByteArray());
dataFileChunk.dataByteArray = dataFileChunk.data.asByteArray();
assert md != null;
md.update(dataFileChunk.data.asByteArray());
byte[] bts = md.digest();
String s1 = Base64.getEncoder().encodeToString(bts);
String s2 = Base64.getEncoder().encodeToString(dataFileChunk.chunk_hash.asByteArray());
Prathit Malik
03/14/2023, 4:09 AMJDBC sink
wherein we are trying to exclude some fields if their value is null in the final upsert/insert query which is provided to sink function.
The preparedStatement that currently accepts a pre prepared query only which is provided via jdbc statement builder.
Wanted to know if there is any alternate way we can provide queries to the jdbc sink on the basis of incoming record from stream so we can prepare our own query in runtime instead of using a prepared template query for insert/upsert.
Flink version : 1.14
Thanks for the help !Akshata Shivaprasad
03/14/2023, 4:29 AMFlink-1.15
in EKS and testing where its been observed the pod disk usage growing on subsequent deploys(like upgrademode laststate/ suspend, before checkpoint completes or even after checkpoint completes). We found the taskmanager local state being retained and not getting cleaned up under /data
. And hence the disk usage is doubled.
I would like to understand what folders will be retained on subsequent deploys.
Below sample data being retained in all subsequent deploys.
1001M 2023-03-08 17:58 /data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1/chk_475590/b603b845d9f14110a6d8b32562e04fe0
1001M 2023-03-08 17:58 /data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1/chk_475590
1001M 2023-03-08 17:58 /data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1
Siva Family
03/14/2023, 5:15 AMXiaorong Tai
03/14/2023, 7:57 AMThijs van de Poll
03/14/2023, 9:52 AMTing Yin
03/14/2023, 9:57 AMTing Yin
03/14/2023, 9:58 AMGil Kirkpatrick
03/14/2023, 2:53 PMFelix Angell
03/14/2023, 3:06 PMThijs van de Poll
03/14/2023, 8:26 PMHygor Knust
03/14/2023, 10:00 PMcol1
are the same. If any other column has changed, send it downstream.
My idea was to create an UDTF for that but it seems like there is no way to make it stateful.
Is there any way to create stateful UD(T)Fs to use in Flink SQL?Liad Shachoach
03/15/2023, 12:00 AMstate.savepoints.dir: <wasbs://savepoints>@$<account_name>.<http://blob.core.windows.net/my-app/savepoints|blob.core.windows.net/my-app/savepoints>
fs.azure.account.key.<account_name>.<http://blob.core.windows.net|blob.core.windows.net>: xxxxxxx
state.checkpoints.dir: <wasbs://checkpoints>@$<account_name>.<http://blob.core.windows.net/my-app/checkpoints|blob.core.windows.net/my-app/checkpoints>
As well as changing to wasb, removing the $
sign.
I followed https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/ and copied the jar plugins/azure-fs-hadoop
directory.
I get a different behavior for each combination I try, if anyone can share a working example it would be awesome 🙂Richard Diekema
03/15/2023, 12:03 AMRMQConnectionConfig.Builder
even in the v1.18 docs. I also don't see the getConnectionFactory
method looking at the protocol of the URI (if supplied) to enable SSL if the protocol is amqps
vs amqp
.Reme Ajayi
03/15/2023, 12:51 AMRichFlatMapFunction
, I need some guidance on how to load all the records from the enrichment stream in state. What sort of data structure would be used? How do I end shut off the stream after all the records have been loaded? The bounded enrichment stream is from a Kafka topic. Is there a better approach? Thank you.bo ray
03/15/2023, 12:56 AMZhiyu Tian
03/15/2023, 4:28 AM01 ico
03/15/2023, 4:30 AMVitalii
03/15/2023, 8:02 AMAman Sharma
03/15/2023, 12:21 PMJoão Nicola
03/15/2023, 1:39 PMJoão Nicola
03/15/2023, 1:39 PMJeesmon Jacob
03/15/2023, 2:13 PMpiby 180
03/15/2023, 3:27 PMword_count_data = ["To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."]
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)
ds = env.from_collection(word_count_data)
ds = ds.flat_map(split) \
.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
.key_by(lambda i: i[0])\
.reduce(lambda i, j: (i[0], i[1] + j[1]))
a = list(ds.execute_and_collect())
env.close()
In [58]: a
Out[58]:
[('a', 5),
('Be', 1),
('Is', 1),
('No', 2),
('Or', 1),
('To', 4),
('be', 1),
But this doesn't work
In [59]: word_count_data = ["To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
"Devoutly to be wish'd. To die,--to sleep;--",
"To sleep! perchance to dream:--ay, there's the rub;",
"For in that sleep of death what dreams may come,",
"When we have shuffled off this mortal coil,",
"Must give us pause: there's the respect",
"That makes calamity of so long life;",
"For who would bear the whips and scorns of time,",
"The oppressor's wrong, the proud man's contumely,",
"The pangs of despis'd love, the law's delay,",
"The insolence of office, and the spurns",
"That patient merit of the unworthy takes,",
"When he himself might his quietus make",
"With a bare bodkin? who would these fardels bear,",
"To grunt and sweat under a weary life,",
"But that the dread of something after death,--",
"The undiscover'd country, from whose bourn",
"No traveller returns,--puzzles the will,",
"And makes us rather bear those ills we have",
"Than fly to others that we know not of?",
"Thus conscience does make cowards of us all;",
"And thus the native hue of resolution",
"Is sicklied o'er with the pale cast of thought;",
"And enterprises of great pith and moment,",
"With this regard, their currents turn awry,",
"And lose the name of action.--Soft you now!",
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."]
...: env = StreamExecutionEnvironment.get_execution_environment()
...: env.set_runtime_mode(RuntimeExecutionMode.BATCH)
...: # write all the data to one file
...: env.set_parallelism(1)
...: ds = env.from_collection(word_count_data)
...: ds = ds.flat_map(split) \
...: .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
...: .key_by(lambda i: i[0])
...: a = list(ds.execute_and_collect())
...: env.close()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[59], line 9
5 ds = env.from_collection(word_count_data)
6 ds = ds.flat_map(split) \
7 .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
8 .key_by(lambda i: i[0])
----> 9 a = list(ds.execute_and_collect())
10 env.close()
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/data_stream.py:2920, in CloseableIterator.__next__(self)
2919 def __next__(self):
-> 2920 return self.next()
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/data_stream.py:2931, in CloseableIterator.next(self)
2929 if not self._j_closeable_iterator.hasNext():
2930 raise StopIteration('No more data.')
-> 2931 return convert_to_python_obj(self._j_closeable_iterator.next(), self._type_info)
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:72, in convert_to_python_obj(data, type_info)
70 fields.append(None)
71 else:
---> 72 fields.append(pickled_bytes_to_python_converter(data, field_type))
73 if isinstance(type_info, RowTypeInfo):
74 return Row.of_kind(RowKind(int.from_bytes(pickle_bytes[0], 'little')), *fields)
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:91, in pickled_bytes_to_python_converter(data, field_type)
89 return row
90 else:
---> 91 data = pickle.loads(data)
92 if field_type == Types.SQL_TIME():
93 seconds, microseconds = divmod(data, 10 ** 6)
TypeError: a bytes-like object is required, not 'JavaList
Why can't I run key_by without reduce?