Gunjan Kumar
09/01/2022, 10:32 AMChristos Hadjinikolis
09/01/2022, 12:35 PMAli AIT-BACHIR
09/01/2022, 12:46 PMenv = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///home/ali/softs/flink-connector-rabbitmq-1.15.2.jar")
But I got this error:
py4j.protocol.Py4JError: org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder does not exist in the JVM
Exception in thread "Thread-4" java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
Thanks for your help.Fábio Santos
09/01/2022, 12:52 PM8> (fabio,3)
Watermark: -9223372036854775808
Current Timestamp: 2022-09-01T12:51:20.002962Z Start Window: 2022 09 01 13:50:40 End Window: 2022 09 01 13:51:20
8> (fabio,2)
Watermark: -9223372036854775808
Current Timestamp: 2022-09-01T12:52:00.001547Z Start Window: 2022 09 01 13:51:20 End Window: 2022 09 01 13:52:00
Another think that makes me confusion is, the window is 40 seconds and watermark 20 seconds, so the window processing shouldn't be 20 seconds after end window?
Thanks for your help.Adrian Chang
09/01/2022, 3:56 PMspec.job
I don't how to specify the Python file to run. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/custom-resource/reference/#jobspecSharon Xie
09/01/2022, 4:42 PMShen Zhu
09/01/2022, 6:16 PM2022-09-01 06:21:46,168 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 7624 of job a655e16574c6ac2801d80346fa24f450 expired before completing.
2022-09-01 06:21:46,451 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 7625 (type=CHECKPOINT) @ 1662013306179 for job a655e16574c6ac2801d80346fa24f450.
2022-09-01 06:21:46,714 WARN org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Received late message for now expired checkpoint attempt 7624 from task 24134624cd1401ae0a6f7d55a41410c3 of job a655e16574c6ac2801d80346fa24f450 at container_1660894388534_0005_01_000028 @ ip-172-30-91-83.ec2.internal (dataPort=37881).
...
And then there're some Kafka errors and the job failed(attached image), we did some investigation online and found one related stack overflow answer, in this case, should we keep increasing <http://request.timeout.ms|request.timeout.ms>
and checkpoint timeout(currently it's 1minute)?
We are using Flink 1.12, thanks for your help!Yahor Paulikau
09/01/2022, 7:53 PMkafkaSourceTopicSchema
. Now we can generate DataSource schema from Avro using .fromRowDataType(AvroSchemaConverter.convertToDataType(kafkaSourceTopicSchema))
but flexible deserialization seems to be a problem. Any ideas?
tableEnv.
connect(new Kafka()
.version("universal")
.topic(topicName)
.startFromLatest()
.property("bootstrap.servers", config.getString("kafka_brokers"))
)
.withFormat(new Avro().avroSchema(kafkaSourceTopicSchema)).
withSchema(new Schema()
.field("event_ts", DataTypes.BIGINT())
.field("event_type", DataTypes.STRING())
.field("device_id", DataTypes.STRING())
.field("proctime", DataTypes.TIMESTAMP(3)).proctime()
).
createTemporaryTable( temporaryTableName )
Sucheth Shivakumar
09/01/2022, 9:31 PMMarco Villalobos
09/02/2022, 5:14 PM2022-09-02 10:04:56
java.io.IOException: Could not perform checkpoint 119366 for operator tag enrichment (3/4)#104.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
at org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 119366 for operator tag enrichment (3/4)#104. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951)
... 13 more
Caused by: org.rocksdb.RocksDBException: While open a file for appending: /mnt/yarn/usercache/hadoop/appcache/application_1631124824249_0061/flink-io-7f392e48-d086-492b-960b-1c56d0f864a0/job_a5b70dea0d3c27b2798c53df49065433_op_KeyedProcessOperator_a91e7e58fb0d0cb4a427ff0c6489016c__3_4__uuid_252bcc06-8857-4153-a866-2e6b3f50c4bb/chk-119366.tmp/MANIFEST-423131: Too many open files
Marco Villalobos
09/02/2022, 6:13 PMfinal RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(checkpointDataUri, incrementalCheckpointingEnabled);
streamEnv.setStateBackend(rocksDBStateBackend);
will it still read the other configuration options in the yaml file such as: state.backend.rocksdb.predefined-options ? as defined at https://nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#advanced-rocksdb-state-backends-optionsGaurav Miglani
09/03/2022, 9:29 AMWhen Kubernetes HA is enabled the savepoint upgrade mode may fall back to the last-state behaviour in cases where the job is in an unhealthy state.
, so if I restart my spec with savepoint upgrade mode, but if it failed based on kubernetes.operator.deployment.readiness.timeout
, will my job restart with last-state
, tried it via changing the operator state in a job(using a sql query change(distinct on different column in a tumble window) -> NOTE: not setting any operator id etc as completely using flink table api), but it doesn't work, in this cases if I want to restart my job with stateless, is it possible or do i need to do kubectl delete flinkdeployment my-deployment
and resubmit job with last-state(that is ideally empty state as I have deleted the spec)?Herat Acharya
09/03/2022, 2:33 PMEcho Lee
09/05/2022, 5:06 AMAli AIT-BACHIR
09/05/2022, 9:51 AMflink-sql-connector-rabbitmq-1.15.2.jar
However, when I try to sing to RabbitMQ with this code: following this link: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/rabbitmq/#installing-rabbitmq
stream.add_sink(RMQSink(
connection_config, # config for the RabbitMQ connection
'queueName', # name of the RabbitMQ queue to send messages to
SimpleStringSchema()))
I got the following error trace:
File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "{"created":"@1662371359.807069114","description":"Error received from peer ipv6:[::1]:44295","file":"src/core/lib/surface/call.cc","file_line":966,"grpc_message":"Multiplexer hanging up","grpc_status":1}"
>
Traceback (most recent call last):
File "/home/ali/.virtualenvs/LAB_920_log_parser_more_investigation-DQLOhTET/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
and more logs here:
RuntimeError: java.lang.UnsupportedOperationException: A serializer has already been registered for the state; re-registration is not allowed.
at org.apache.flink.runtime.state.StateSerializerProvider$EagerlyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:344)
at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)
Thanks for you helpFábio Santos
09/05/2022, 10:36 AMBhavay Pahuja
09/05/2022, 2:42 PMSLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/flink/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See <http://www.slf4j.org/codes.html#multiple_bindings> for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
at org.apache.flink.table.client.gateway.context.ExecutionContext.<init>(ExecutionContext.java:66)
at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: org.apache.flink.table.api.TableException: Unexpected error when trying to load service provider for factories.
at org.apache.flink.table.factories.FactoryUtil.lambda$discoverFactories$19(FactoryUtil.java:813)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:799)
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:517)
at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
... 8 more
Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider org.apache.flink.table.planner.loader.DelegateExecutorFactory could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.flink.table.factories.ServiceLoaderUtil.load(ServiceLoaderUtil.java:42)
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:798)
... 10 more
Caused by: java.lang.ExceptionInInitializerError
at org.apache.flink.table.planner.loader.PlannerModule.getInstance(PlannerModule.java:135)
at org.apache.flink.table.planner.loader.DelegateExecutorFactory.<init>(DelegateExecutorFactory.java:34)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 14 more
Caused by: org.apache.flink.table.api.TableException: Could not initialize the table planner components loader.
at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:123)
at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:52)
at org.apache.flink.table.planner.loader.PlannerModule$PlannerComponentsHolder.<clinit>(PlannerModule.java:131)
... 22 more
Caused by: java.nio.file.FileAlreadyExistsException: /tmp
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384)
at java.nio.file.Files.createDirectory(Files.java:674)
at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781)
at java.nio.file.Files.createDirectories(Files.java:727)
at org.apache.flink.table.planner.loader.PlannerModule.<init>(PlannerModule.java:96)
... 24 more
Kwangin Jung
09/06/2022, 7:54 AMOutputFileConfig outputConfig = OutputFileConfig
.builder()
.withPartPrefix("output")
.withPartSuffix(".xml")
.build();
StreamingFileSink<String> streamingFileSink = StreamingFileSink
.forRowFormat(
new Path("<s3://my-output-bucket/>"),
new SimpleStringEncoder<String>("UTF-8")
)
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd_HH:mm"))
.withOutputFileConfig(outputConfig)
.build();
processStream.keyBy(SinkOutputModel::getId)
.map(SinkOutputModel::toString)
.addSink(streamingFileSink)
And I want to make each record in stream to be separated file.
But in this case, files are including multiple records (some includes 1 records, while other includes 2,3 or more)
How should I setup rolling policy, or other?Paul Lam
09/06/2022, 10:34 AMSreekhar Reddy Kandala
09/06/2022, 11:56 AMJeesmon Jacob
09/06/2022, 5:12 PMApplication completed SUCCESSFULLY
immediately. So cluster is getting shutdown and not able to recover. Wondering anyone know about this issue or any suggestion to fix the problem. Relevant log is in the thread.Krish Narukulla
09/06/2022, 10:32 PMtEnv.executeSql(String.format(" CREATE TEMPORARY TABLE `table` "
+ "(upload_time BIGINT, log_id STRING)" +
" WITH ( 'connector' = 'kafka',\n"
+ " 'topic' = 'topic1',\n"
+ " 'properties.bootstrap.servers' = '%s', "
+ " 'properties.group.id' = 'kafka-krish-test', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'value.format' = 'avro', "
+" 'properties.schema.registry.url' = '%s',"
+" 'properties.specific.avro.reader' = 'true',"
+"'properties.value.deserializer' = 'com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer'"
+ ")","xxxxxx",
"xxxx)
);
Nipuna Shantha
09/07/2022, 4:12 AM~/bin/flink run --detached --jobmanager IP:port -Dallow.func=true -c Processor ~/applicaion/app.jar
Pedro Mázala
09/07/2022, 2:05 PMRommel
09/07/2022, 5:36 PMpublic class HashMapValue<K extends Value, V extends Value> extends MapValue<K, V> {
public HashMapValue() {
super();
}
}
When i try to use this class, i can’t initiate it.
HashMapValue<IntValue, BooleanValue> hashMapValue = new HashMapValue<IntValue, BooleanValue>();
java.lang.AssertionError
at org.apache.flink.util.ReflectionUtil.getTemplateTypes(ReflectionUtil.java:141)
at org.apache.flink.util.ReflectionUtil.getSuperTemplateTypes(ReflectionUtil.java:98)
at org.apache.flink.util.ReflectionUtil.getTemplateType(ReflectionUtil.java:44)
at org.apache.flink.util.ReflectionUtil.getTemplateType1(ReflectionUtil.java:54)
at org.apache.flink.types.MapValue.<init>(MapValue.java:55)
I can’t find online or in the flink source code anything related to MapValue as example. Can anyone help me by pointing out how to use MapValue in Flink?Ikvir Singh
09/07/2022, 8:21 PMKrish Narukulla
09/07/2022, 8:51 PMs3 paths
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
'connector' = 'filesystem', -- required: specify the connector
'path' = 'file:///path/to/whatever', -- required: path to a directory
'format' = '...', -- required: file system connector requires to specify a format,
-- Please refer to Table Formats
-- section for more details
'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition
-- column value is null/empty string
-- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly
-- reduce the number of file for filesystem sink but may lead data skew, the default value is false.
'sink.shuffle-by-partition.enable' = '...',
...
)
Krish Narukulla
09/07/2022, 10:00 PMBastien DINE
09/08/2022, 7:31 AMPrasanth Kothuri
09/08/2022, 7:51 AMclass TimePeriodWindowAssigner(timePeriod: String) extends WindowAssigner[Object, TimeWindow] with Logging{
override def assignWindows(element: Object, timestamp: Long, context: WindowAssigner.WindowAssignerContext):
util.Collection[TimeWindow] = {
var startTime: Long = 0
var endTime: Long = 0
var calenderUnit: Integer = Calendar.DAY_OF_MONTH
if (timePeriod == "Week") {
calenderUnit = Calendar.DAY_OF_WEEK
} else if (timePeriod == "Year") {
calenderUnit = Calendar.DAY_OF_YEAR
} else if (timePeriod == "Day") {
calenderUnit = Calendar.HOUR_OF_DAY
} else {
calenderUnit = Calendar.DAY_OF_MONTH
}
<http://logger.info|logger.info>("Get start of " + timePeriod + " for timestamp " + timestamp)
val startCalender = CalenderHelpers.getCalendar(timestamp)
startCalender.set(calenderUnit, startCalender.getActualMinimum(calenderUnit))
CalenderHelpers.setTimeToBeginningOfDay(startCalender)
startTime = startCalender.getTimeInMillis
<http://logger.info|logger.info>("Got start of " + timePeriod + " for timestamp " + timestamp + " as " + startTime)
<http://logger.info|logger.info>("Get end of " + timePeriod + " for timestamp " + timestamp)
val endCalender = CalenderHelpers.getCalendar(timestamp)
endCalender.set(calenderUnit, endCalender.getActualMaximum(calenderUnit))
CalenderHelpers.setTimeToEndOfDay(endCalender)
endTime = endCalender.getTimeInMillis
<http://logger.info|logger.info>("Got end of " + timePeriod + " for timestamp " + timestamp + " as " + endTime)
// emitting the corresponding time window
util.Collections.singletonList(new TimeWindow(startTime, endTime))
}
override def getDefaultTrigger(env: StreamExecutionEnvironment): Trigger[Object, TimeWindow] = {
ProcessingTimeTrigger.create()
}
override def getWindowSerializer(executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer
}
override def isEventTime: Boolean = false
}
and how it is used in the operator
testSource
.keyBy(_.input1)
.window(new TimePeriodWindowAssigner("Day"))
.aggregate(new TestInputStatsAggregator(), new TestInputProcessWindowFunction())
.name("testSink")
.print
couldn't figure out why window is not firing/closing, any input is highly appreciated, thanks