David Wisecup
06/12/2023, 6:21 PMpublic class GroupedAttributesFunction extends KeyedProcessFunction<String, TransactionAttribute, GroupedAttributes> {
@Override
public void processElement(
TransactionAttribute txnAttr,
Context ctx,
Collector<GroupedAttributes> out) throws Exception {
...
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + Time.days(maxWindowForAllRules).toMilliseconds());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<GroupedAttributes> out) throws Exception {
// FIXME HELP I never enter this code!
GroupedAttributes groupedAttributes = this.groupedAttributesValueState.value();
if (groupedAttributes != null) {
<http://log.info|log.info>("TODO clean out groupedAttribute state if older than 30 days");
}
}
}
David Wisecup
06/12/2023, 7:42 PMArijit Dasgupta
06/12/2023, 7:52 PMArijit Dasgupta
06/12/2023, 8:20 PMArijit Dasgupta
06/12/2023, 9:14 PMFROM flink:1.17.1
# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python
# install PyFlink
RUN pip3 install apache-flink==1.17.1
However, I received this error. What do I need to change? It's working fine on my Linux machine but receiving this error on my Mac
#0 35.81 Downloading pemja-0.3.0.tar.gz (48 kB)
#0 35.81 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 48.5/48.5 KB 14.6 MB/s eta 00000
#0 35.90 Installing build dependencies: started
#0 39.29 Installing build dependencies: finished with status 'done'
#0 39.30 Getting requirements to build wheel: started
#0 39.36 Getting requirements to build wheel: finished with status 'error'
#0 39.37 error: subprocess-exited-with-error
#0 39.37
#0 39.37 × Getting requirements to build wheel did not run successfully.
#0 39.37 │ exit code: 255
#0 39.37 ╰─> [1 lines of output]
#0 39.37 Include folder should be at '/opt/java/openjdk/include' but doesn't exist. Please check you've installed the JDK properly.
#0 39.37 [end of output]
#0 39.37
#0 39.37 note: This error originates from a subprocess, and is likely not a problem with pip.
#0 39.37 error: subprocess-exited-with-error
#0 39.37
#0 39.37 × Getting requirements to build wheel did not run successfully.
#0 39.37 │ exit code: 255
#0 39.37 ╰─> See above for output.
#0 39.37
#0 39.37 note: This error originates from a subprocess, and is likely not a problem with pip.Amir Hossein Sharifzadeh
06/12/2023, 9:21 PMMapState
here: in my code I have created a map sate like MapStateDescriptor<String, Integer> totalMapStateDescriptor =
new MapStateDescriptor<>(
"totalMapState",
Types._STRING_,
Types._INT_);
totalMap = getRuntimeContext().getMapState(totalMapStateDescriptor);
After I insert some values like totalMap.put(rawPath, rawTotalChunk);
I see that totalMap.get(rawPath)
is null for one those values. In fact Iterable<String> signalKeys = totalMap.keys();
always returns one key and other keys will be disappear! So, why is like that? I forgot to mention that I get rawPath
from a joined_stream: public void processElement(Row left, Row right, ProcessJoinFunction<Row, Row, List<double[][][]>>.Context ctx, Collector<List<double[][][]>> out) throws Exception {String rawPath = String._valueOf_(left.getField(_SUBDIR_STR_)).toLowerCase();
RICHARD JOY
06/12/2023, 10:34 PM"InformerWrapper [ERROR] Informer startup error. Operator will be stopped. Informer: <http://flink.apache.org/b1beta1/namespaces/namespace/flinksessionjobs|flink.apache.org/b1beta1/namespaces/namespace/flinksessionjobs>
java.util.concurrent.ExecutionException: java.net.ConnectException: Failed to connect to /10.9.20.1:443"
Ishan
06/13/2023, 3:40 AMCREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'mydatabase',
'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
cc @Danny ChenHu Guang
06/13/2023, 6:29 AMException in thread "main" java.lang.IncompatibleClassChangeError: class com.google.protobuf.Descriptors$OneofDescriptor has interface com.google.protobuf.Descriptors$GenericDescriptor as super class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2729)
at java.lang.Class.getDeclaredMethod(Class.java:2156)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:490)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1213)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1120)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.LinkedHashMap.internalWriteEntries(LinkedHashMap.java:332)
at java.util.HashMap.writeObject(HashMap.java:1363)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
at org.apache.flink.runtime.jobgraph.JobGraphBuilder.setExecutionConfig(JobGraphBuilder.java:85)
at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:264)
at org.apache.flink.client.PlanTranslator.compilePlan(PlanTranslator.java:87)
at org.apache.flink.client.PlanTranslator.translateToJobGraph(PlanTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1053)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
Can anyone tell me how could I fix it? Is it related to protobuf version issue?Vivek
06/13/2023, 1:58 PMRobert Steele
06/13/2023, 2:27 PMYaroslav Bezruchenko
06/13/2023, 3:05 PMZhang Zhao
06/13/2023, 3:48 PMMingfeng Tan
06/13/2023, 7:38 PMorg.rocksdb.RocksDBException: file is too short (7451 bytes) to be an sstable
. Searched this channel and this had happened to others as well. Anybody know how to resolve it?Zhong Chen
06/13/2023, 10:04 PMZhong Chen
06/13/2023, 10:04 PMERROR org.apache.flink.runtime.metrics.ReporterSetup [] - Could not instantiate metrics reporter dghttp. Metrics might not be exposed/reported.
Herat Acharya
06/13/2023, 10:09 PMHJK nomad
06/14/2023, 3:43 AMdp api
06/14/2023, 5:52 AMdef log_processing():
env_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars", "file:///flink-sql-connector-kafka-1.17.1.jar")
t_env.get_config().set("table.exec.source.idle-timeout", "1000")
source_ddl = """
CREATE TABLE restuarant_live_pending_orders(
rest_id VARCHAR,
status VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'live_order_status',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'rest_group',
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:0',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
)
"""
t_env.execute_sql(source_ddl)
tbl = t_env.from_path('restuarant_live_pending_orders')
tbl.print_schema()
orders_sum = t_env.sql_query ("SELECT rest_id, SUM(CASE WHEN status = 'NEW' THEN 1 ELSE -1 END) AS status_count FROM %s GROUP BY rest_id" % tbl).execute()
orders_sum.print_schema()
dev Jiang
06/14/2023, 7:55 AM2023-06-14 15:35:36,897 WARN org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics [] - Error when getting Kafka consumer metric "records-lag" for partition "topic-21". Metric "pendingRecords" may not be reported correctly.
java.lang.IllegalStateException: Cannot find Kafka metric matching current filter.
at org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63) ~[flink-iceberg-sink-1.0.jar:?]
at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_202]
at org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61) ~[flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:308) ~[flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:231) ~[flink-iceberg-sink-1.0.jar:?]
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) [?:1.8.0_202]
at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:230) [flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:139) [flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) [flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) [flink-iceberg-sink-1.0.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-iceberg-sink-1.0.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
Have you ever meet it?Hangyu Wang
06/14/2023, 8:41 AMkingsathurthi
06/14/2023, 10:27 AMMichael Parkin
06/14/2023, 11:18 AMcompacted-
prefix?
https://github.com/apache/flink/blob/c270a741526def82699a9accbda2e99f42b5a121/flin[…]link/connector/file/sink/compactor/operator/CompactService.java
https://github.com/apache/flink/blob/c270a741526def82699a9accbda2e99f42b5a121/flin[…]link/connector/file/sink/compactor/operator/CompactService.javaBX B
06/14/2023, 2:28 PMfrom pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import *
def log_processing():
env_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
t_env = TableEnvironment.create(env_settings)
##### specify connector and format jars
t_env.get_config().set("pipeline.jars", "file:///Users/karanbawejapro/Desktop/flink-sql-connector-kafka-1.17.0.jar")
t_env.get_config().set("table.exec.source.idle-timeout", "1000")
source_ddl = """
CREATE TABLE restaurant_live_pending_orders(
rest_id VARCHAR,
status VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'live_order_status',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'rest_group',
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:0',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'format' = 'json'
)
"""
tbl = t_env.execute_sql(source_ddl)
tbl = t_env.from_path('restaurant_live_pending_orders')
André Luiz Diniz da Silva
06/14/2023, 5:48 PMCaused by: java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
(complete stack trace in the thread). The context is that I have a job that write parquet files and after adding a new sink to write data to a Kafka topic using avro with schema registry this error began to happen. Any ideia what could it be? am I missing some dependency? More details in the thread.mralfredgui
06/14/2023, 5:51 PMIlya Sterin
06/14/2023, 7:12 PMMatt Wagner
06/15/2023, 12:37 AMRaghunadh Nittala
06/15/2023, 3:51 AMZhang Zhao
06/15/2023, 9:24 AM