https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • g

    Gunjan Kumar

    09/01/2022, 10:32 AM
    Is there a way to convert datastream into pandas. I know there is an api to convert table into pandas using to_pandas
    x
    • 2
    • 4
  • c

    Christos Hadjinikolis

    09/01/2022, 12:35 PM
    Hi all, I am trying to introduce Flink to my company and after doing a bit of reading it felt appropriate to do so through Apache BEAM using Flink as the runner. Reasons were that: • We have use-cases that require batch and sttream processing applied on different inputs to solve the same problem (e,g. historical vs live prediction generation). WE wanted a framework that supports a unified API. • Beam has a a python API which is aligned with our tech stack as far as ML work in the company goes So we ended up trying python + beam on a flink runner. Looking at this https://docs.google.com/spreadsheets/d/1KDa_FGn1ShjomGd-UUDOhuh2q73de2tPz6BqHpzqvNI/edit#gid=0 I was not able to identify any issues. Still, we are constantly running against issues that are hard to troubleshoot when writing stream processing applications that read and write from and to Kafka. For example this: https://www.mail-archive.com/user@beam.apache.org/msg06804.html I am now in a position where the project is frozen and we need to decide on steps forward. I want to know if Python + Apache Beam + Flink is a robust choice (seems not). If not, is Flink’s python API well supported? I understand Flink does not come with a unified API though (correct)? Would really appreciate some help please.
    m
    x
    +2
    • 5
    • 13
  • a

    Ali AIT-BACHIR

    09/01/2022, 12:46 PM
    Hi, I am implementing a PyFlink job in Flink version 1.15.2 and I want to source from RabbitMQ. I followed this code example: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/rabbitmq/ but got stuck in connector dependency. I tried to add this Jar :
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///home/ali/softs/flink-connector-rabbitmq-1.15.2.jar")
    But I got this error:
    py4j.protocol.Py4JError: org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder does not exist in the JVM
    Exception in thread "Thread-4" java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory
    at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
    at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
    Thanks for your help.
    x
    • 2
    • 4
  • f

    Fábio Santos

    09/01/2022, 12:52 PM
    Hi, I'm trying to understand/using watermarks on event time. Using the word count example from Flink documentation, i did some changes to use watermarks and assign timestamp based on event value. But i think this doesn't works, the watermark is always the same (negative value -9223372036854775808 ). I already read some stackoverflow posts about this problem, but i still unable to reach a solution. My Code: Code Result:
    8> (fabio,3)
    Watermark: -9223372036854775808
    Current Timestamp: 2022-09-01T12:51:20.002962Z Start Window: 2022 09 01 13:50:40 End Window: 2022 09 01 13:51:20
    8> (fabio,2)
    Watermark: -9223372036854775808
    Current Timestamp: 2022-09-01T12:52:00.001547Z Start Window: 2022 09 01 13:51:20 End Window: 2022 09 01 13:52:00
    Another think that makes me confusion is, the window is 40 seconds and watermark 20 seconds, so the window processing shouldn't be 20 seconds after end window? Thanks for your help.
    • 1
    • 2
  • a

    Adrian Chang

    09/01/2022, 3:56 PM
    Hello Can I use the Kubernetes Operator for PyFlink ? In the
    spec.job
    I don't how to specify the Python file to run. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/custom-resource/reference/#jobspec
    m
    • 2
    • 3
  • s

    Sharon Xie

    09/01/2022, 4:42 PM
    Hi~ Question about CDC processing w/ transactions. Anyone has experience with processing CDC/debezium data and making sure the output is only emitted when a transaction is committed? So if a few tables in the same db are updated in the single transaction, the output records of the stream processing (eg: join of the few tables) reflects the all the changes? In this case, the sink side “materialized view” of the processing won’t have partial updates.
    s
    j
    • 3
    • 10
  • s

    Shen Zhu

    09/01/2022, 6:16 PM
    Hi team, one of the Flink job failed and we found following logs(sorry for the long log) First we observed some checkpointing timeout
    Copy code
    2022-09-01 06:21:46,168 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 7624 of job a655e16574c6ac2801d80346fa24f450 expired before completing.
    2022-09-01 06:21:46,451 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 7625 (type=CHECKPOINT) @ 1662013306179 for job a655e16574c6ac2801d80346fa24f450.
    2022-09-01 06:21:46,714 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Received late message for now expired checkpoint attempt 7624 from task 24134624cd1401ae0a6f7d55a41410c3 of job a655e16574c6ac2801d80346fa24f450 at container_1660894388534_0005_01_000028 @ ip-172-30-91-83.ec2.internal (dataPort=37881).
    ...
    And then there're some Kafka errors and the job failed(attached image), we did some investigation online and found one related stack overflow answer, in this case, should we keep increasing
    <http://request.timeout.ms|request.timeout.ms>
    and checkpoint timeout(currently it's 1minute)? We are using Flink 1.12, thanks for your help!
    m
    • 2
    • 2
  • y

    Yahor Paulikau

    09/01/2022, 7:53 PM
    I’m trying to implement an approach where Avro schema is read from a registry and no static Avro generated classes are used. The schema is not known in advance because the source is provided as a parameter. The problem with static class is that should be baked into the jar and if schema changes the class must be re-generated which adds extra complexity. Ideally the schema provided through URL must be enough to digest the data and provide flexibility. I’m trying to avoid any custom logic with GeneticRecord deserializers as well. In Flink 1.12 we could implement something like this, still has to provide DataSource schema but at least deserializer schema provided through a string
    kafkaSourceTopicSchema
    . Now we can generate DataSource schema from Avro using
    .fromRowDataType(AvroSchemaConverter.convertToDataType(kafkaSourceTopicSchema))
    but flexible deserialization seems to be a problem. Any ideas?
    Copy code
    tableEnv.
         connect(new Kafka()
            .version("universal")
            .topic(topicName)
            .startFromLatest()
            .property("bootstrap.servers", config.getString("kafka_brokers"))
          )
          .withFormat(new Avro().avroSchema(kafkaSourceTopicSchema)).
          withSchema(new Schema()
            .field("event_ts", DataTypes.BIGINT())
            .field("event_type", DataTypes.STRING())
            .field("device_id", DataTypes.STRING())
            .field("proctime", DataTypes.TIMESTAMP(3)).proctime()
          ).
          createTemporaryTable( temporaryTableName )
    m
    • 2
    • 2
  • s

    Sucheth Shivakumar

    09/01/2022, 9:31 PM
    Is there a way to read data from snowflake in batch mode using DataStream API's ?
    m
    • 2
    • 1
  • m

    Marco Villalobos

    09/02/2022, 5:14 PM
    Too many open files, what is the proper way to fix this?
    Copy code
    2022-09-02 10:04:56
    java.io.IOException: Could not perform checkpoint 119366 for operator tag enrichment (3/4)#104.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968)
    	at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
    	at org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156)
    	at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178)
    	at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
    	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 119366 for operator tag enrichment (3/4)#104. Failure reason: Checkpoint was declined.
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951)
    	... 13 more
    Caused by: org.rocksdb.RocksDBException: While open a file for appending: /mnt/yarn/usercache/hadoop/appcache/application_1631124824249_0061/flink-io-7f392e48-d086-492b-960b-1c56d0f864a0/job_a5b70dea0d3c27b2798c53df49065433_op_KeyedProcessOperator_a91e7e58fb0d0cb4a427ff0c6489016c__3_4__uuid_252bcc06-8857-4153-a866-2e6b3f50c4bb/chk-119366.tmp/MANIFEST-423131: Too many open files
    • 1
    • 1
  • m

    Marco Villalobos

    09/02/2022, 6:13 PM
    If I programmatically assign a state back end like this:
    Copy code
    final RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, incrementalCheckpointingEnabled);
    streamEnv.setStateBackend(rocksDBStateBackend);
    will it still read the other configuration options in the yaml file such as: state.backend.rocksdb.predefined-options ? as defined at https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#advanced-rocksdb-state-backends-options
  • g

    Gaurav Miglani

    09/03/2022, 9:29 AM
    I am trying manual recovery chaos scenario, reading this doc, not able to understand the statement completely:
    When Kubernetes HA is enabled the savepoint upgrade mode may fall back to the last-state behaviour in cases where the job is in an unhealthy state.
    , so if I restart my spec with savepoint upgrade mode, but if it failed based on
    kubernetes.operator.deployment.readiness.timeout
    , will my job restart with
    last-state
    , tried it via changing the operator state in a job(using a sql query change(distinct on different column in a tumble window) -> NOTE: not setting any operator id etc as completely using flink table api), but it doesn't work, in this cases if I want to restart my job with stateless, is it possible or do i need to do
    kubectl delete flinkdeployment my-deployment
    and resubmit job with last-state(that is ideally empty state as I have deleted the spec)?
    g
    • 2
    • 33
  • h

    Herat Acharya

    09/03/2022, 2:33 PM
    👋 Hello, team! I just joined flink, i am trying to investigate fink for my use case... as stated below My use case is that I have a paginated api , like http://someurl.com/next=abc , here next is a pointer to the next set of records. The api will return a pointer to the next set of records in the response, then I need to use that and pass in the next parameter of the url. My questions are: • since Async I/O in Flink provides a mechanism to use HTTP client to call external APIs. How can i use to call paginated api? Using batch/ streaming mode. • Also I do need to put the next pointer in the database to have an audit trail of what i have processed. Does flink allow that? I know we can use Table API or flinks batch processing mode for dataStreams. Any one know how to do that ? Any help is greatly appreciated. https://stackoverflow.com/questions/73562030/using-async-i-o-in-flink-to-call-paginated-http-api
    m
    m
    • 3
    • 5
  • e

    Echo Lee

    09/05/2022, 5:06 AM
    Hi all, I am using temporal join, rocksdb for state backend, using mini cluster locally for debug execution, and limiting the rate of 4000 rows/s to the test source. When I use single parallelism for this operator, the processing performance can reach 2800 rows/s, but when I adjust the parallelism to 2, the performance of a single task is only 400 rows/s. I found that when the parallelism is 2, there are many writes to the local disk, and I am confused about the phenomenon.
    m
    d
    • 3
    • 11
  • a

    Ali AIT-BACHIR

    09/05/2022, 9:51 AM
    Hi all, In PyFlink coding with Python, I am using Flink 1.15.2 and I source messages from RabbitMQ with the following connector:
    flink-sql-connector-rabbitmq-1.15.2.jar
    However, when I try to sing to RabbitMQ with this code: following this link: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/rabbitmq/#installing-rabbitmq
    Copy code
    stream.add_sink(RMQSink(
        connection_config,      # config for the RabbitMQ connection
        'queueName',            # name of the RabbitMQ queue to send messages to
        SimpleStringSchema()))
    I got the following error trace:
    File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
    raise self
    grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.CANCELLED
    details = "Multiplexer hanging up"
    debug_error_string = "{"created":"@1662371359.807069114","description":"Error received from peer ipv6:[::1]:44295","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
    >
    Traceback (most recent call last):
    File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
    and more logs here:
    RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
    at org.apache.flink.runtime.state.StateSerializerProvider$EagerlyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:344)
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)
    Thanks for you help
    • 1
    • 1
  • f

    Fábio Santos

    09/05/2022, 10:36 AM
    Hi Guys, I'm using the flink-s3-fs-hadoop.jar plugin to use S3 as Sink. I want to test it locally (Use Jetbrains IDE), but i'm always getting No credentials provided. With some research, it seems i need create flink-conf.yaml to provide the s3 credentials, and use FLINK_CONF_DIR environment var to load the file. But i still continue getting the same error (AWS No credentials provided). Could someone help? Thanks and regards
    s
    • 2
    • 2
  • b

    Bhavay Pahuja

    09/05/2022, 2:42 PM
    Hello I am using Flink 1.15.1, and am trying to run a job on it using yarn-session and the sql-client I see the following exception when i execute it, It doesn’t happen again if i delete the /tmp directory but it is something which shouldn’t be done again and again. So wanted to bring this issue here to see if it is an actual bug or something might be going wrong on my side.
    Copy code
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See <http://www.slf4j.org/codes.html#multiple_bindings> for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    	at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    	at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
    Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
    	at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
    	at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
    	at org.apache.flink.table.client.gateway.context.ExecutionContext.<init>(ExecutionContext.java:66)
    	at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
    	at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
    	at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
    	at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
    	at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    	... 1 more
    Caused by: org.apache.flink.table.api.TableException: Unexpected error when trying to load service provider for factories.
    	at org.apache.flink.table.factories.FactoryUtil.lambda$discoverFactories$19(FactoryUtil.java:813)
    	at java.util.ArrayList.forEach(ArrayList.java:1259)
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:799)
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517)
    	at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
    	... 8 more
    Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.table.planner.loader.DelegateExecutorFactory could not be instantiated
    	at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    	at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    	at org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42)
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798)
    	... 10 more
    Caused by: java.lang.ExceptionInInitializerError
    	at org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
    	at org.apache.flink.table.planner.loader.DelegateExecutorFactory.<init>(DelegateExecutorFactory.java:34)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at java.lang.Class.newInstance(Class.java:442)
    	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
    	... 14 more
    Caused by: org.apache.flink.table.api.TableException: Could not initialize the table planner components loader.
    	at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:123)
    	at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:52)
    	at org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.<clinit>(PlannerModule.java:131)
    	... 22 more
    Caused by: java.nio.file.FileAlreadyExistsException: /tmp
    	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
    	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    	at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
    	at java.nio.file.Files.createDirectory(Files.java:674)
    	at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
    	at java.nio.file.Files.createDirectories(Files.java:727)
    	at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:96)
    	... 24 more
    c
    • 2
    • 1
  • k

    Kwangin Jung

    09/06/2022, 7:54 AM
    Hello, I'm working on flink 1.13, on kinesis data analytics. I'm trying to sink out data using StreamingFileSink in xml format, like following
    Copy code
    OutputFileConfig outputConfig = OutputFileConfig
                    .builder()
                    .withPartPrefix("output")
                    .withPartSuffix(".xml")
                    .build();
    
    StreamingFileSink<String> streamingFileSink = StreamingFileSink
                    .forRowFormat(
                            new Path("<s3://my-output-bucket/>"),
                            new SimpleStringEncoder<String>("UTF-8")
                    )
                    .withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd_HH:mm"))
                    .withOutputFileConfig(outputConfig)
                    .build();
    
    processStream.keyBy(SinkOutputModel::getId)
                    .map(SinkOutputModel::toString)
                    .addSink(streamingFileSink)
    And I want to make each record in stream to be separated file. But in this case, files are including multiple records (some includes 1 records, while other includes 2,3 or more) How should I setup rolling policy, or other?
    f
    • 2
    • 2
  • p

    Paul Lam

    09/06/2022, 10:34 AM
    Hi all, I'm developing a tool to monitor Flink job status in session clusters via REST API. When a Flink job fails and exits, it would send alerts. But the problem is that the Flink job status might not be final. For example, the job could turn into FAILED status, and then get restarted by JobManager based on the restart strategies. So if we send alerts when the job is in FAILED status, it would be a false alert. Is there any better way to judge a job? Thanks a lot!
    c
    • 2
    • 2
  • s

    Sreekhar Reddy Kandala

    09/06/2022, 11:56 AM
    Hey folks, trying to write a flink application with streamnative flink pulsar connector and pulsar as a data source. We have close to 150+ tenants on pulsar and each tenant has 1 namespace, 1 topic (1 partition only). The rate of consumption in the flink app is too slow (we are able to consume only like 2k msgs over an hour or so where in fact at least 10k+ msgs are being produced). • Increased the parallelism on the Flink Application ( Increased it from 10 to 60 thinking that 60 consumers consume in parallel ) for pulsar source too but didn’t work. • Upgraded the Infra of pulsar (added more cpu and memory --> 8core CPU and 16 GB memory for each node and 3 such nodes in total for all pulsar components.) • Pulsar CPU or memory usage is never crossing beyond 65%. • 2 cores for each broker component so 6 cores in total for all broker replica sets. • 1.5 cores for each bookie component so 4.5 cores in total for all bookie replica sets. ( Pulsar deployed on k8s , version - 2.9.2) Is there some metric which I can track to understand the latency ? Is there something I can tweak my flink pulsar connector ? Is there a dynamic config I have to tweak on pulsar backend ? Help needed please ! Little urgent
    d
    y
    y
    • 4
    • 4
  • j

    Jeesmon Jacob

    09/06/2022, 5:12 PM
    Hi team, we are seeing some issues with upgrading a flink job with new docker image. We are using flink 1.14 and operator version 1.0.1. What is happening is Job manager is submitting the job but we are seeing the log message
    Application completed SUCCESSFULLY
    immediately. So cluster is getting shutdown and not able to recover. Wondering anyone know about this issue or any suggestion to fix the problem. Relevant log is in the thread.
    m
    y
    h
    • 4
    • 45
  • k

    Krish Narukulla

    09/06/2022, 10:32 PM
    Has anyone worked with flink 1.15 with Hortonworks schema registry?
    Copy code
    tEnv.executeSql(String.format(" CREATE TEMPORARY TABLE `table` "
            + "(upload_time BIGINT, log_id STRING)" +
            " WITH (  'connector' = 'kafka',\n"
            + " 'topic' = 'topic1',\n"
            + " 'properties.bootstrap.servers' = '%s', "
            + "  'properties.group.id' = 'kafka-krish-test', "
            + "  'scan.startup.mode' = 'earliest-offset', "
            + " 'value.format' = 'avro', "
            +"  'properties.schema.registry.url' = '%s',"
            +"  'properties.specific.avro.reader' = 'true',"
            +"'properties.value.deserializer' = 'com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer'"
            + ")","xxxxxx",
                "xxxx)
        );
    m
    b
    • 3
    • 17
  • n

    Nipuna Shantha

    09/07/2022, 4:12 AM
    I have faced an issue when sets a JVM system property with flink run command. I tried to set extra JVM_OPTS like "-Dxxx=yyy" but these are not working. Note: I do not use YARN and my flink cluster is deployed in standalone mode Is there anyway that I can add JVM system property with run command, Following is an example for it.
    ~/bin/flink run --detached --jobmanager IP:port -Dallow.func=true -c Processor ~/applicaion/app.jar
    ✅ 1
    k
    c
    • 3
    • 7
  • p

    Pedro Mázala

    09/07/2022, 2:05 PM
    Have someone had problems with socket keep alive configuration for ES7 sink? This ES issue describes well what is going on on my k8s deployment.
    s
    • 2
    • 3
  • r

    Rommel

    09/07/2022, 5:36 PM
    I have a question about using MapValue in Flink because I need to save a map as part of the state, as you know the state needs to be deserializable/serializable, so i exend my class from MapValue, very simple.
    Copy code
    public class HashMapValue<K extends Value, V extends Value> extends MapValue<K, V> {
        public HashMapValue() {
            super();
        }
    }
    When i try to use this class, i can’t initiate it.
    Copy code
    HashMapValue<IntValue, BooleanValue> hashMapValue = new HashMapValue<IntValue, BooleanValue>();
    Copy code
    java.lang.AssertionError
    	at org.apache.flink.util.ReflectionUtil.getTemplateTypes(ReflectionUtil.java:141)
    	at org.apache.flink.util.ReflectionUtil.getSuperTemplateTypes(ReflectionUtil.java:98)
    	at org.apache.flink.util.ReflectionUtil.getTemplateType(ReflectionUtil.java:44)
    	at org.apache.flink.util.ReflectionUtil.getTemplateType1(ReflectionUtil.java:54)
    	at org.apache.flink.types.MapValue.<init>(MapValue.java:55)
    I can’t find online or in the flink source code anything related to MapValue as example. Can anyone help me by pointing out how to use MapValue in Flink?
    i
    • 2
    • 4
  • i

    Ikvir Singh

    09/07/2022, 8:21 PM
    Hello 👋🏽. I’m currently using the FileSystem SQL connector to read files as a source. Is there a convenient way to name a table source so it has a more descriptive name appear in the Flink Web UI?
  • k

    Krish Narukulla

    09/07/2022, 8:51 PM
    is there flink-sql connector to Amazon s3? does this work for
    s3 paths
    Copy code
    CREATE TABLE MyUserTable (
      column_name1 INT,
      column_name2 STRING,
      ...
      part_name1 INT,
      part_name2 STRING
    ) PARTITIONED BY (part_name1, part_name2) WITH (
      'connector' = 'filesystem',           -- required: specify the connector
      'path' = 'file:///path/to/whatever',  -- required: path to a directory
      'format' = '...',                     -- required: file system connector requires to specify a format,
                                            -- Please refer to Table Formats
                                            -- section for more details
      'partition.default-name' = '...',     -- optional: default partition name in case the dynamic partition
                                            -- column value is null/empty string
    
      -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
      -- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
      'sink.shuffle-by-partition.enable' = '...',
      ...
    )
    s
    • 2
    • 1
  • k

    Krish Narukulla

    09/07/2022, 10:00 PM
    i dont find sources for any of the sql connectors. https://github.com/apache/flink/tree/master/flink-connectors/flink-sql-connector-kafka
    m
    d
    • 3
    • 2
  • b

    Bastien DINE

    09/08/2022, 7:31 AM
    Hello everyone, I have got some big troubles with the new KafkaSink API, when the job is INITIALIZING, specifically with the TransactionAborter It tries to abort a lot of transaction (which where never used) and it prevents the job from booting : In the abortTransactionOfSubtask method, this loop is going pretty crazy : for (long checkpointId = startCheckpointId; ; checkpointId++, numTransactionAborted++) { Due to the stop condition if (producer.getEpoch() == 0) { Even after changing the transactional-id, I still got the almost infinite loop, Can someone help me with that ? Am I missing something ? btw parallelism is 1
    🧐 1
    m
    • 2
    • 16
  • p

    Prasanth Kothuri

    09/08/2022, 7:51 AM
    Hi All, I am having trouble in getting the day window fire/close, following is my custom window assigner
    Copy code
    class TimePeriodWindowAssigner(timePeriod: String) extends WindowAssigner[Object, TimeWindow] with Logging{
      override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext):
      util.Collection[TimeWindow] = {
        var startTime: Long = 0
        var endTime: Long = 0
        var calenderUnit: Integer = Calendar.DAY_OF_MONTH
    
        if (timePeriod == "Week") {
          calenderUnit = Calendar.DAY_OF_WEEK
        } else if (timePeriod == "Year") {
          calenderUnit = Calendar.DAY_OF_YEAR
        } else if (timePeriod == "Day") {
          calenderUnit = Calendar.HOUR_OF_DAY
        } else {
          calenderUnit = Calendar.DAY_OF_MONTH
        }
    
        <http://logger.info|logger.info>("Get start of " + timePeriod + " for timestamp " + timestamp)
        val startCalender = CalenderHelpers.getCalendar(timestamp)
        startCalender.set(calenderUnit, startCalender.getActualMinimum(calenderUnit))
        CalenderHelpers.setTimeToBeginningOfDay(startCalender)
        startTime = startCalender.getTimeInMillis
        <http://logger.info|logger.info>("Got start of " + timePeriod + " for timestamp " + timestamp + " as " + startTime)
    
        <http://logger.info|logger.info>("Get end of " + timePeriod + " for timestamp " + timestamp)
        val endCalender = CalenderHelpers.getCalendar(timestamp)
        endCalender.set(calenderUnit, endCalender.getActualMaximum(calenderUnit))
        CalenderHelpers.setTimeToEndOfDay(endCalender)
        endTime = endCalender.getTimeInMillis
        <http://logger.info|logger.info>("Got end of " + timePeriod + " for timestamp " + timestamp + " as " + endTime)
    
        // emitting the corresponding time window
        util.Collections.singletonList(new TimeWindow(startTime, endTime))
      }
    
      override def getDefaultTrigger(env: StreamExecutionEnvironment): Trigger[Object, TimeWindow] = {
        ProcessingTimeTrigger.create()
      }
    
      override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = {
        new TimeWindow.Serializer
      }
    
      override def isEventTime: Boolean = false
    }
    and how it is used in the operator
    Copy code
    testSource
          .keyBy(_.input1)
          .window(new TimePeriodWindowAssigner("Day"))
          .aggregate(new TestInputStatsAggregator(), new TestInputProcessWindowFunction())
          .name("testSink")
          .print
    couldn't figure out why window is not firing/closing, any input is highly appreciated, thanks
    r
    • 2
    • 4
1...161718...98Latest