https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • k

    Kishan Huliyar Jagadeesh

    03/13/2023, 2:45 PM
    Hi!! I am using flink-kubernetes-operator and trying to push custom metrics to datadog, I have the datadog api key in container env so any advice on how to pass that to flink-conf.yml file?
  • t

    Theodore Curtil

    03/13/2023, 4:07 PM
    Hey! I have been playing around with FlinkSessionJob CRD and I could not figure out if there was a way to use a private github repo as the file repository for the jarURI, see below:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: basic-session-job-only-example
    spec:
      deploymentName: basic-session-deployment-only-example
      job:
        jarURI: https://<private-github-repo-file>
        parallelism: 3
        upgradeMode: stateless
        state: running
    I know you can curl a raw file using github personal access tokens; but i dont think the FlinkSessionJob CRD allows us to run commands (like curling the file from private repo). Thanks for your help
  • a

    Andrew Otto

    03/13/2023, 4:35 PM
    Next q: I need (in PyFlink) a function that can both emit metrics and also produce to a side output. Is this not possible?
    d
    • 2
    • 10
  • g

    Gerald

    03/13/2023, 5:01 PM
    Hey all, I have a more conceptual question on how we could handle some specific use cases with Flink. We have multiple Flink source tables ingested via Kafka in the Debezium changelog format. These tables are keyed, and per key, we are only interested in the latest event (similar to Kafka log compaction). I now want to join these tables to create events that are enriched with all information from these source tables. Let me provide an example:
    Copy code
    CREATE TABLE Person (
        Id int,
        Firstname string,
        Lastname string,
        DateOfBirth date,
        primary key (Id) not enforced
    ) WITH ( ... )
    
    CREATE TABLE Address (
        Id int,
        Street string,
        ZipCode string,
        City string,
        Country string,
        PersonId int,
        primary key (Id) not enforced
    ) WITH ( ... )
    It doesn't matter what gets updated (
    Person
    or
    Address
    ), for both source triggers I want to emit a new combined
    PersonUpdate
    event that contains information about the person and its corresponding addresses:
    Copy code
    CREATE TABLE PersonUpdate (
        Id int,
        Firstname string,
        Lastname string,
        DateOfBirth date,
        Addresses ARRAY<ROW(Street string, City string, ZipCode string, Country string)>,
        primary key (Id) not enforced
    ) WITH (...)
    One option I read about would be to use temporal joins, having the right side of the join doing lookups on for example the Address table (getting all addresses for a specific person X). However, as far as I understood the semantics of temporal joins, the join wouldn’t trigger if there is only an update on the Address table (e.g., an address is added with PersonId X)? This brings me to the question, whether in such cases I would need to duplicate the same query, but this time doing the temporal lookup on the Person table? But what if I need to join multiple tables to get all the enrichment info for my event? Would I need to duplicate the query multiple times and flip the join directions to set all the required triggers? Another alternative I currently see would be to rely on the DataStream API by connecting all involved source streams, creating KeyedStreams and then do the state work on my own (e.g., in
    .process(...)
    ) using ValueState or MapState etc.. Would that be a better performing or recommended solution compared to the one using temporal joins? Or did I miss any other alternative? What I clearly want to avoid are lookup joins doing the lookup on the database again.
    s
    • 2
    • 4
  • t

    Thijs van de Poll

    03/13/2023, 7:04 PM
    Hi all, I am trying to upsert data into a Postgres JDBC sink. The sink:
    Copy code
    CREATE TABLE group (
                group_id INT NOT NULL,
                representative INT,
                score INT,
                PRIMARY KEY (group_id) NOT ENFORCED 
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:<postgresql://host.docker.internal:5432/doc_data>',
                'table-name' = 'xml.t_group',
                'username' = 'root',
                'password' = 'root',
                'driver' = 'org.postgresql.Driver'
            );
    The insert statement:
    Copy code
    INSERT INTO group (group_id, representative, score) SELECT * FROM tmp_docs
            ON CONFLICT (group_id) DO UPDATE SET
                representative = CASE WHEN group.score >= tmp_docs.score THEN group.representative ELSE tmp_docs.representative;
    According to the JDBC docs this is how to do it in Postgres https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/ But instead of it working, I get the following error message:
    Copy code
    org.apache.flink.table.api.SqlParserException: SQL parse failed. Incorrect syntax near the keyword 'ON'
    Or does it mean that Flink uses this logic internally, and it it not possible to customize the
    ON CONFLICT
    clause?
    m
    s
    • 3
    • 12
  • a

    Amir Hossein Sharifzadeh

    03/13/2023, 9:23 PM
    Hello everybody. I kinda stuck with a problem: In our original Kafka project, we have implemented our functions for serialization and deserialization. In the Flink project (in Java), I will need to follow deserialization approach. Everything is fine except binary chunk:
    dataFileChunk.data
    . In Python deserialization, they used
    "unpackb"
    and I used the same component in Java for deserialization but I get different results vs Python deserialization: This is my deserialization class in Java:
    Copy code
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.json.JsonMapper;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    import java.util.ArrayList;
    import java.util.Base64;
    import java.util.List;
    
    import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
    import org.msgpack.core.MessageBufferPacker;
    import org.msgpack.core.MessageFormat;
    import org.msgpack.core.MessagePack;
    
    import org.msgpack.core.MessageUnpacker;
    import org.msgpack.value.ImmutableValue;
    import org.msgpack.value.Value;
    import org.msgpack.value.ValueType;
    
    
    public class DataFileChunkDeserializer extends AbstractDeserializationSchema<DataFileChunk> {
    
        private static final long serialVersionUID = 1L;
    
        private transient ObjectMapper objectMapper;
    
        @Override
        public void open(InitializationContext context) {
            objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
        }
    
        @Override
        public DataFileChunk deserialize(byte[] message) throws IOException {
    //        List<DataFileChunk> dataFileChunks = new ArrayList<>();
            DataFileChunk dataFileChunk = new DataFileChunk();
            MessageDigest md = null;
            MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(message);
            if (unpacker.hasNext()) {
                try {
                    md = MessageDigest.getInstance("SHA-512");
                } catch (NoSuchAlgorithmException e) {
                    e.printStackTrace();
                }
                ImmutableValue value = unpacker.unpackValue();
    
                List<Value> list = value.asArrayValue().list();
    
                dataFileChunk.filename = "" + list.get(0);
                dataFileChunk.chunk_hash = "" + list.get(2);
                dataFileChunk.chunk_i = Long.parseLong(String.valueOf(list.get(4)));
                dataFileChunk.n_total_chunks = Long.parseLong(String.valueOf(list.get(5)));
                dataFileChunk.subdir_str = "" + list.get(6);
                dataFileChunk.filename_append = "" + list.get(7);
                ValueType vt = list.get(8).getValueType();
    
                dataFileChunk.data = list.get(8).asBinaryValue();
                dataFileChunk.dataByteArray = dataFileChunk.data.asByteArray();
                assert md != null;
    
                try {
                    byte[] sh1 = MessageDigest.getInstance("SHA-512").digest(dataFileChunk.dataByteArray);
                    byte[] sh2 = dataFileChunk.chunk_hash.getBytes(StandardCharsets.UTF_8);
                    System.out.println();
                } catch (NoSuchAlgorithmException e) {
                    e.printStackTrace();
                }
    
            }
    
            return dataFileChunk;
    
        }
    }
    and this is part of my pom.xml:
    Copy code
    <dependency>
        <groupId>org.msgpack</groupId>
        <artifactId>msgpack-core</artifactId>
        <version>0.8.18</version>
    </dependency>
    DataFileChunk class:
    Copy code
    import org.msgpack.value.BinaryValue;
    import org.msgpack.value.RawValue;
    
    public class DataFileChunk {
        public long chunk_i;
        public String filename;
        public String chunk_hash;
        public String chunk_offset_write;
        public long n_total_chunks;
        public String subdir_str;
        public String filename_append;
        public RawValue data;
        public byte[] dataByteArray;
    
        public DataFileChunk() {
        }
    
        public DataFileChunk(long chunk_i, String filename) {
            this.chunk_i = chunk_i;
            this.filename = filename;
        }
        public DataFileChunk(long chunk_i, String filename, String chunk_hash, String chunk_offset_write,
                             long n_total_chunks, String subdir_str, String filename_append, BinaryValue data) {
            this.chunk_i = chunk_i;
            this.filename = filename;
            this.chunk_hash = chunk_hash;
            this.chunk_offset_write = chunk_offset_write;
            this.n_total_chunks = n_total_chunks;
            this.subdir_str = subdir_str;
            this.filename_append = filename_append;
            this.data = data;
        }
    
        @Override
        public String toString() {
            return "Event{" + "chunk_i=" + chunk_i + ", n_total_chunks=" + n_total_chunks +
                    ", filename='" + filename + '\'' +
                    ", chunk_hash='" + chunk_hash + '\'' +
                    ", chunk_offset_write='" + chunk_offset_write + '\'' +
                    ", subdir_str='" + subdir_str + '\'' +
                    ", filename_append='" + filename_append + '\'' +
                    ", data='" + data + '\'' +
                    '}';
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            DataFileChunk dataFileChunk = (DataFileChunk) o;
            return chunk_i == dataFileChunk.chunk_i &&
                    n_total_chunks == dataFileChunk.n_total_chunks &&
                    filename.equals(dataFileChunk.filename) &&
                    chunk_hash.equals(dataFileChunk.chunk_hash) &&
                    chunk_offset_write.equals(dataFileChunk.chunk_offset_write) &&
                    subdir_str.equals(dataFileChunk.subdir_str) &&
                    filename_append.equals(dataFileChunk.filename_append) &&
                    data.equals(dataFileChunk.data);
        }
    
    
    }
    Does anyone have experience of deserialization with msgpack?
  • a

    Amir Hossein Sharifzadeh

    03/14/2023, 1:55 AM
    I figured the issue: In
    DataFileChunk
    class, I will have to change the types for
    chunk_hash
    and
    data
    from
    String
    to
    BinaryValue
    . In my
    deserialize
    method, I should hash byte array of
    data
    and the compare it with the byte array of
    chunk_hash
    :
    Copy code
    dataFileChunk.data = list.get(8).asBinaryValue();
    output.write(dataFileChunk.data.asByteArray());
    
    dataFileChunk.dataByteArray = dataFileChunk.data.asByteArray();
    assert md != null;
        md.update(dataFileChunk.data.asByteArray());
        byte[] bts = md.digest();
    
        String s1 = Base64.getEncoder().encodeToString(bts);
        String s2 = Base64.getEncoder().encodeToString(dataFileChunk.chunk_hash.asByteArray());
  • p

    Prathit Malik

    03/14/2023, 4:09 AM
    Hi All, We have a use case for
    JDBC sink
    wherein we are trying to exclude some fields if their value is null in the final upsert/insert query which is provided to sink function. The preparedStatement that currently accepts a pre prepared query only which is provided via jdbc statement builder. Wanted to know if there is any alternate way we can provide queries to the jdbc sink on the basis of incoming record from stream so we can prepare our own query in runtime instead of using a prepared template query for insert/upsert. Flink version : 1.14 Thanks for the help !
  • a

    Akshata Shivaprasad

    03/14/2023, 4:29 AM
    Dear Flink team, Here is my case. Assistance on priority will be greatly appreciated. As part of EMR to EKS migration, We deployed
    Flink-1.15
    in EKS and testing where its been observed the pod disk usage growing on subsequent deploys(like upgrademode laststate/ suspend, before checkpoint completes or even after checkpoint completes). We found the taskmanager local state being retained and not getting cleaned up under
    /data
    . And hence the disk usage is doubled. I would like to understand what folders will be retained on subsequent deploys. Below sample data being retained in all subsequent deploys.
    Copy code
    1001M	2023-03-08 17:58	/data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1/chk_475590/b603b845d9f14110a6d8b32562e04fe0
    1001M	2023-03-08 17:58	/data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1/chk_475590
    1001M	2023-03-08 17:58	/data/flink-local-data/tm_flink-personas-experiment-taskmanager-1-2/localState/aid_6b6fd573128d17d190e0b4fdc33e6387/jid_f92a1778225e260ac9febc8dea7aba5e/vtx_f7c20420360e0172e54933e6abdd481a_sti_1
    m
    • 2
    • 1
  • s

    Siva Family

    03/14/2023, 5:15 AM
    Hi everyone, I am trying to create a Pyflink app and deploy it on AWS KDA. My source is a Kinesis stream and the sink is a DynamoDB. I understand i have to create a uber jar as AWS KDA only supports configuring a single jar. The behavior is same in local as well as KDA with a uber jar. While I create the uber jar either I can have Kinesis connector to work or DynamoDB connector to work but not both at the same time depending on which jar i included as the first jar in my pom.xml to create the uber jar. As I searched further this appears to depend on the first entry in META-INF/services/org.apache.flink.table.factories.factory with 2 entries - One for KinesisDynamicTableFactory and another KinesisDynamicTableSinkFactory. Whichever is on the top works fine. Seperating by ", " or new line doesn't help. Screenshot attached
    👀 1
    d
    j
    k
    • 4
    • 19
  • x

    Xiaorong Tai

    03/14/2023, 7:57 AM
    Hello, we are trying to replace KSQL with Flink Stateful Functions. The ideas is the send a data stream to Flink either through Kafka or HTTP. Our system is implemented in Golang. When trying to send HTTP messages, it always fails. We think it is because the HTTP handler expects Protobuf data and we don’t have the same proto file the handler is built with. We haven’t tried Kafka yet, but looking at the Golang SDK, it seems the message will be handled the same way and then we will be stuck with the missing proto file. Could we get some help in getting the simplest example working?
    g
    • 2
    • 5
  • t

    Thijs van de Poll

    03/14/2023, 9:52 AM
    @sap1ens I have seen your talk about “Storing State Forever: Why It Can Be Good For Your Analytics”. Very informative! I have a couple of questions about it: • If I am not mistaken, Flink CDC source operators require a parallelism of 1 to ensure that the sequence of update events is correct. Is it correct that for all the sources you used parallelism of 1 then, and for the sink a high parallelism to maximise resources for the join? • Any work in progress on backfilling the state? 🙂
    s
    r
    • 3
    • 12
  • t

    Ting Yin

    03/14/2023, 9:57 AM
    HI team ,I ’m trying to run flink stream data write to cassandra sink on flink operator environment, I included flink-cassandra-connector in the dependency ,but I hit ava.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter at com.datastax.driver.core.Metrics.<init>(Metrics.java:146) ~[?:?] at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1667) ~[?:?] at com.datastax.driver.core.Cluster.init(Cluster.java:214) ~[?:?] at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:387) ~[?:?] at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:366) ~[?:?] at com.datastax.driver.core.Cluster.connect(Cluster.java:311) ~[?:?] at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:160) ~[?:?] at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:105) ~[?:?] at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49) ~[?:?] at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1] at java.lang.Thread.run(Unknown Source) ~[?:?]
  • t

    Ting Yin

    03/14/2023, 9:58 AM
    where should I add the com/codahale/metrics classes? I included in the application jar file ,it doesn’t work
  • g

    Gil Kirkpatrick

    03/14/2023, 2:53 PM
    Greetings all, I'm trying to get the SQL Gateway working, but I'm not finding the sql-gateway.sh script anywhere... can anyone point me in the right direction?
    m
    s
    • 3
    • 6
  • f

    Felix Angell

    03/14/2023, 3:06 PM
    hey flink friends 👋, is it possible at scale that we could have events being leaked from operators of a keyed stream? for instance, we have a kafka and kinesis data source that is keyed into streams by 'user' and then this data stream gets sessionised. we have noticed that there is output data (at high volume traffic) where it looks as though the data of one user (in theory one keyed stream) is cut into two overlapping streams. note: we are sessionising here, so essentially we have two session windows where there should be one. i'm still trying to investigate this more thoroughly so i'm just wondering if anyone has any thoughts off the top of this? thanks!
    • 1
    • 1
  • t

    Thijs van de Poll

    03/14/2023, 8:26 PM
    Hi all! I have data stored in Postgres tables where a column is of XML data type. Is Flink compatible with this data type? And which Flink data type should be used to read it? Thanks!
    m
    • 2
    • 14
  • h

    Hygor Knust

    03/14/2023, 10:00 PM
    Hi everyone, I have a Flink SQL use case where I need to ignore changes made to a particular column to reduce traffic. If there is a new changelog from an existing key, ignore it if all columns except
    col1
    are the same. If any other column has changed, send it downstream. My idea was to create an UDTF for that but it seems like there is no way to make it stateful. Is there any way to create stateful UD(T)Fs to use in Flink SQL?
  • l

    Liad Shachoach

    03/15/2023, 12:00 AM
    Hey all, Does anyone have a working example of flink checkpoints + savepoints to azure blob? I tried the below configuration:
    Copy code
    state.savepoints.dir: <wasbs://savepoints>@$<account_name>.<http://blob.core.windows.net/my-app/savepoints|blob.core.windows.net/my-app/savepoints>
    fs.azure.account.key.<account_name>.<http://blob.core.windows.net|blob.core.windows.net>: xxxxxxx
    state.checkpoints.dir: <wasbs://checkpoints>@$<account_name>.<http://blob.core.windows.net/my-app/checkpoints|blob.core.windows.net/my-app/checkpoints>
    As well as changing to wasb, removing the
    $
    sign. I followed https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/ and copied the jar
    plugins/azure-fs-hadoop
    directory. I get a different behavior for each combination I try, if anyone can share a working example it would be awesome 🙂
    • 1
    • 1
  • r

    Richard Diekema

    03/15/2023, 12:03 AM
    Hey all, maybe I'm missing something, but does the RabbitMQ connector have SSL/TLS support? I'm building apps for Kinesis Data Analytics, so I'm currently on v1.13.2, with the option to run v1.15.x, but I don't see an SSL/TLS option exposed on
    RMQConnectionConfig.Builder
    even in the v1.18 docs. I also don't see the
    getConnectionFactory
    method looking at the protocol of the URI (if supplied) to enable SSL if the protocol is
    amqps
    vs
    amqp
    .
    • 1
    • 1
  • r

    Reme Ajayi

    03/15/2023, 12:51 AM
    Hey all, I am trying to implement an enriched DataStream, I would like to enrich a datastream ,which is the joined output of two streams, with another stream that has relatively few records. Basically, I would like to store all of the records from the enrichment stream in state and do a lookup from the joined stream. The enrichment stream is bounded. I have seen a few examples where lookups are done with external data sources like this Flink Forward Talk Example and this AWS example where this enrichment is done in a
    RichFlatMapFunction
    , I need some guidance on how to load all the records from the enrichment stream in state. What sort of data structure would be used? How do I end shut off the stream after all the records have been loaded? The bounded enrichment stream is from a Kafka topic. Is there a better approach? Thank you.
  • b

    bo ray

    03/15/2023, 12:56 AM
    Hello, everyone, val result: DataStream[OutOfLimitSpeedInfo_Par2] = stream1.connect(stream2) .process(new BroadcastProcessFunction[TrafficInfo_Part2, MonitorInfo_Part2, OutOfLimitSpeedInfo_Par2] { override def processElement(value: TrafficInfo_Part2, ctx: BroadcastProcessFunction[TrafficInfo_Part2, MonitorInfo_Part2, OutOfLimitSpeedInfo_Par2]#ReadOnlyContext, out: Collector[OutOfLimitSpeedInfo_Par2]) = { if (value.monitorId == "5815") { println(">>>>>>>TrafficInfo_Part2 :" + value) } val info: MonitorInfo_ Part2 = ctx.getBroadcastState(GlobalConstants_Part2.MONITOR_STATE_DESCRIPTOR).get(value.monitorId) if (info != null) { val limitSpeed = info.limitSpeed val speed = value.speed if (limitSpeed * 1.1 < speed) { out.collect(new OutOfLimitSpeedInfo_Par2(value.car, value.monitorId, value.roadId, speed, limitSpeed, value.actionTime)) } } } override def processBroadcastElement(value: MonitorInfo_Part2, ctx: BroadcastProcessFunction[TrafficInfo_Part2, MonitorInfo_Part2, OutOfLimitSpeedInfo_Par2]#Context, out: Collector[OutOfLimitSpeedInfo_Par2]) = { if (value.monitorId == "5815") { println(">>>>>>>>>>5815: " + value) } ctx.getBroadcastState(GlobalConstants_Part2.MONITOR_STATE_DESCRIPTOR).put(value.monitorId, value) } }) The printed data shows that there is data in the broadcast stream, but through ctx. getBroadcastState (GlobalConstants_Part2. MONITOR_STATE_DESCRIPTOR). get (value. monitorId) This sentence cannot query the corresponding status data set by processBroadcastElement. What is the reason for this? Flink version: Flink 1.9
  • z

    Zhiyu Tian

    03/15/2023, 4:28 AM
    Hello, I am trying Flink-Kubernetes-Operator on AKS cluster and met an error when submitting an example job https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic.yaml. Could you help on this? The error message from operator: 2023-03-15 042025,969 o.a.f.c.d.a.c.ApplicationClusterDeployer [INFO ] [magnetar.basic-example] Submitting application in 'Application Mode'. 2023-03-15 042026,010 o.a.f.k.u.KubernetesUtils [INFO ] [magnetar.basic-example] Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124 2023-03-15 042026,010 o.a.f.k.u.KubernetesUtils [INFO ] [magnetar.basic-example] Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122 2023-03-15 042026,011 o.a.f.c.Configuration [INFO ] [magnetar.basic-example] Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.jobmanager.service-account' 2023-03-15 042026,030 o.a.f.k.KubernetesClusterDescriptor [WARN ] [magnetar.basic-example] Failed to create the Kubernetes cluster "basic-example", try to clean up the residual resources. 2023-03-15 042026,048 o.a.f.k.o.l.AuditUtils [INFO ] [magnetar.basic-example] >>> Event | Warning | CLUSTERDEPLOYMENTEXCEPTION | Could not create Kubernetes cluster "basic-example". 2023-03-15 042026,048 o.a.f.k.o.r.ReconciliationUtils [WARN ] [magnetar.basic-example] Attempt count: 0, last attempt: false 2023-03-15 042026,084 o.a.f.k.o.l.AuditUtils [INFO ] [magnetar.basic-example] >>> Status | Error | UPGRADING | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"basic-example\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: https://10.0.0.1/apis/apps/v1/namespaces/magnetar/deployments. Message: Internal error occurred: failed calling webhook \"deploymentwebhook.microsoft.com\": failed to call webhook: yaml: mapping values are not allowed in this context. Received status: Status(apiVersion=v1, code=500, details=StatusDetails(causes=[StatusCause(field=null, message=failed calling webhook \"deploymentwebhook.microsoft.com\": failed to call webhook: yaml: mapping values are not allowed in this context, reason=null, additionalProperties={})], group=null, kind=null, name=null, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Internal error occurred: failed calling webhook \"deploymentwebhook.microsoft.com\": failed to call webhook: yaml: mapping values are not allowed in this context, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=InternalError, status=Failure, additionalProperties={})."}]} 2023-03-15 042026,084 i.j.o.p.e.ReconciliationDispatcher [ERROR] [magnetar.basic-example] Error during event processing ExecutionScope{ resource id: ResourceID{name='basic-example', namespace='magnetar'}, version: 124702780} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster "basic-example". at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:142)
    u
    r
    s
    • 4
    • 7
  • u

    01 ico

    03/15/2023, 4:30 AM
    Is there any configuration in Flink that can be used to limit the message rate? This way, I can prevent a sudden burst of traffic from causing an operator to crash.
    s
    • 2
    • 1
  • v

    Vitalii

    03/15/2023, 8:02 AM
    Hello. We set up Apache flink (statefun) high availability cluster in kubernetes. In this case, when the leader jobmanager falls, the new jobmanager restarts the job from the last snapshot. Q: Is it possible to configure jobmanager's in fault tolerance mode? Expected Behavior: Job continues when leader is lost, instead of restarting from snapshot.
    m
    • 2
    • 1
  • a

    Aman Sharma

    03/15/2023, 12:21 PM
    Hi,We created the flink cep Streaming Application,it was running successfully with Multiple Pattern we have added,We stopped the Job with SavePoint,Removed one Pattern from the job Now when restarting it using savepoint it is giving exception: org.apache.flink.util.FlinkRuntimeException: State newround_1:157 does not exist in the NFA. NFA has states [Final State $endState$ [ ]), Start State login_0 [ StateTransition(TAKE, from login_0 to $endState$, with condition), ])] at org.apache.flink.cep.nfa.NFA.isStartState(NFA.java:162) at org.apache.flink.cep.nfa.NFA.isStateTimedOut(NFA.java:342) at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:294) at org.apache.flink.cep.operator.CepOperator.advanceTime(CepOperator.java:429) at org.apache.flink.cep.operator.CepOperator.onEventTime(CepOperator.java:310) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748)
    m
    • 2
    • 12
  • j

    João Nicola

    03/15/2023, 1:39 PM
    Hi, I am considering using Apache Flink and, more specifically, Flink Stateful Functions, to structure an indexing application. So far it seems to be an ideal architecture for the application. However, are a few concerns I need to address before making the final decision, which I haven't been able to clear out by reading the documentation and searching the web. I was hoping someone could please shed some light on them for me: 1. Is there a (standard) way to share state between (embedded/co-located/remote) statefun function instances? In my particular case, there would be many (million) stateful function instances whose processing depend on some global parameters that may eventually change. 2. Is there a way to broadcast (or multicast) messages to all (a group of) stateful functions? 3. Is there a way to access the stateful functions' states from OUTSIDE the stateful function themselves, e.g. by making some query against the whole set of instance states? 4. I am trying to decide whether to use Kafka or Pulsar for my streaming storage needs. Besides the pros and cons they have, when compared with each other, I've noticed that Flink's documentation seems to favor Kafka. Does that mean that Kafka's support in Flink is much more mature than Pulsar's?
  • j

    João Nicola

    03/15/2023, 1:39 PM
    Thanks in advance!
  • j

    Jeesmon Jacob

    03/15/2023, 2:13 PM
    Hi team, we are running our flink job with Kubernetes HA services with 2 replicas for JobManager. If we kill a JM pod, we are seeing a full job restart with 20-30 seconds downtime. Is that an expected behavior? I was expecting when one JM is killed second JM will take control immediately. Is there any config that you can suggest for a faster recovery? We are using S3 for HA storage. Thanks!
    m
    • 2
    • 1
  • p

    piby 180

    03/15/2023, 3:27 PM
    Hi all, I am new to flink and playing with this official example of pyflink and currently facing an error. https://github.com/apache/flink/blob/7fccd5992f6222df62ed850542ef50b0714cd647/flink-python/pyflink/examples/datastream/word_count.py#L89 I can run this fine
    Copy code
    word_count_data = ["To be, or not to be,--that is the question:--",
                       "Whether 'tis nobler in the mind to suffer",
                       "The slings and arrows of outrageous fortune",
                       "Or to take arms against a sea of troubles,",
                       "And by opposing end them?--To die,--to sleep,--",
                       "No more; and by a sleep to say we end",
                       "The heartache, and the thousand natural shocks",
                       "That flesh is heir to,--'tis a consummation",
                       "Devoutly to be wish'd. To die,--to sleep;--",
                       "To sleep! perchance to dream:--ay, there's the rub;",
                       "For in that sleep of death what dreams may come,",
                       "When we have shuffled off this mortal coil,",
                       "Must give us pause: there's the respect",
                       "That makes calamity of so long life;",
                       "For who would bear the whips and scorns of time,",
                       "The oppressor's wrong, the proud man's contumely,",
                       "The pangs of despis'd love, the law's delay,",
                       "The insolence of office, and the spurns",
                       "That patient merit of the unworthy takes,",
                       "When he himself might his quietus make",
                       "With a bare bodkin? who would these fardels bear,",
                       "To grunt and sweat under a weary life,",
                       "But that the dread of something after death,--",
                       "The undiscover'd country, from whose bourn",
                       "No traveller returns,--puzzles the will,",
                       "And makes us rather bear those ills we have",
                       "Than fly to others that we know not of?",
                       "Thus conscience does make cowards of us all;",
                       "And thus the native hue of resolution",
                       "Is sicklied o'er with the pale cast of thought;",
                       "And enterprises of great pith and moment,",
                       "With this regard, their currents turn awry,",
                       "And lose the name of action.--Soft you now!",
                       "The fair Ophelia!--Nymph, in thy orisons",
                       "Be all my sins remember'd."]
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    # write all the data to one file
    env.set_parallelism(1)
    ds = env.from_collection(word_count_data)
    ds = ds.flat_map(split) \
               .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
               .key_by(lambda i: i[0])\
               .reduce(lambda i, j: (i[0], i[1] + j[1]))
    a = list(ds.execute_and_collect())
    env.close()
    
    
    In [58]: a
    Out[58]: 
    [('a', 5),
     ('Be', 1),
     ('Is', 1),
     ('No', 2),
     ('Or', 1),
     ('To', 4),
     ('be', 1),
    But this doesn't work
    Copy code
    In [59]: word_count_data = ["To be, or not to be,--that is the question:--",
                       "Whether 'tis nobler in the mind to suffer",
                       "The slings and arrows of outrageous fortune",
                       "Or to take arms against a sea of troubles,",
                       "And by opposing end them?--To die,--to sleep,--",
                       "No more; and by a sleep to say we end",
                       "The heartache, and the thousand natural shocks",
                       "That flesh is heir to,--'tis a consummation",
                       "Devoutly to be wish'd. To die,--to sleep;--",
                       "To sleep! perchance to dream:--ay, there's the rub;",
                       "For in that sleep of death what dreams may come,",
                       "When we have shuffled off this mortal coil,",
                       "Must give us pause: there's the respect",
                       "That makes calamity of so long life;",
                       "For who would bear the whips and scorns of time,",
                       "The oppressor's wrong, the proud man's contumely,",
                       "The pangs of despis'd love, the law's delay,",
                       "The insolence of office, and the spurns",
                       "That patient merit of the unworthy takes,",
                       "When he himself might his quietus make",
                       "With a bare bodkin? who would these fardels bear,",
                       "To grunt and sweat under a weary life,",
                       "But that the dread of something after death,--",
                       "The undiscover'd country, from whose bourn",
                       "No traveller returns,--puzzles the will,",
                       "And makes us rather bear those ills we have",
                       "Than fly to others that we know not of?",
                       "Thus conscience does make cowards of us all;",
                       "And thus the native hue of resolution",
                       "Is sicklied o'er with the pale cast of thought;",
                       "And enterprises of great pith and moment,",
                       "With this regard, their currents turn awry,",
                       "And lose the name of action.--Soft you now!",
                       "The fair Ophelia!--Nymph, in thy orisons",
                       "Be all my sins remember'd."]
        ...: env = StreamExecutionEnvironment.get_execution_environment()
        ...: env.set_runtime_mode(RuntimeExecutionMode.BATCH)
        ...: # write all the data to one file
        ...: env.set_parallelism(1)
        ...: ds = env.from_collection(word_count_data)
        ...: ds = ds.flat_map(split) \
        ...:            .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
        ...:            .key_by(lambda i: i[0])
        ...: a = list(ds.execute_and_collect())
        ...: env.close()
    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    Cell In[59], line 9
          5 ds = env.from_collection(word_count_data)
          6 ds = ds.flat_map(split) \
          7            .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
          8            .key_by(lambda i: i[0])
    ----> 9 a = list(ds.execute_and_collect())
         10 env.close()
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/data_stream.py:2920, in CloseableIterator.__next__(self)
       2919 def __next__(self):
    -> 2920     return self.next()
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/data_stream.py:2931, in CloseableIterator.next(self)    
       2929 if not self._j_closeable_iterator.hasNext():
       2930     raise StopIteration('No more data.')
    -> 2931 return convert_to_python_obj(self._j_closeable_iterator.next(), self._type_info)
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:72, in convert_to_python_obj(data, type_info)  
         70         fields.append(None)
         71     else:
    ---> 72         fields.append(pickled_bytes_to_python_converter(data, field_type))
         73 if isinstance(type_info, RowTypeInfo):
         74     return Row.of_kind(RowKind(int.from_bytes(pickle_bytes[0], 'little')), *fields)
    
    File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:91, in pickled_bytes_to_python_converter(data, field_type)
         89     return row
         90 else:
    ---> 91     data = pickle.loads(data)
         92     if field_type == Types.SQL_TIME():
         93         seconds, microseconds = divmod(data, 10 ** 6)
    
    TypeError: a bytes-like object is required, not 'JavaList
    Why can't I run key_by without reduce?
    d
    • 2
    • 2
1...646566...98Latest