Huaqiang Kang
02/09/2023, 10:06 PMDaiyan Chowdhury
02/09/2023, 11:39 PMRyner Menezes
02/10/2023, 4:56 AMkingsathurthi
02/10/2023, 7:04 AM2023-02-10 06:57:26,917 INFO org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner [] - --------------------------------------------------------------------------------
2023-02-10 06:57:26,977 INFO org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2023-02-10 06:57:26,990 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: blob.server.port, 6124
2023-02-10 06:57:26,990 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.jobmanager.annotations, <http://flinkdeployment.flink.apache.org/generation:2|flinkdeployment.flink.apache.org/generation:2>
2023-02-10 06:57:26,990 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: job-result-store.storage-path, file:///flink-data/ha/job-result-store/gmf-gnodeb-sa-samsung/6dd3883c-7acc-47b7-8737-29dfea42547c
2023-02-10 06:57:26,990 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.jobmanager.replicas, 2
2023-02-10 06:57:26,990 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.submit-failed-job-on-application-error, true
2023-02-10 06:57:26,991 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.cluster-id, kmf-flink
2023-02-10 06:57:26,991 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: metrics.reporter.prom.factory.class, org.apache.flink.metrics.prometheus.PrometheusReporterFactory
2023-02-10 06:57:26,991 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, file:///flink-data/savepoints
2023-02-10 06:57:26,991 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.taskmanager.cpu, 1.0
2023-02-10 06:57:26,991 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: kubernetes.service-account, flink
Yang LI
02/10/2023, 9:39 AMflink_taskmanager_job_task_backPressuredTimeMsPerSecond
Nicholas Erasmus
02/10/2023, 11:24 AM{
...
"our_field_name": {
"our_key_1": "our_value_1",
"our_key_2": "our_value_2",
...
}
}
In our Flink (Kotlin) application, we are defining the row as follows:
RowType.RowField(
"our_field_name",
MapType(
VarCharType(VarCharType.MAX_LENGTH),
VarCharType(VarCharType.MAX_LENGTH)
)
)
... and we are setting the field with:
rowData.setField(
OUR_FIELD_POSITION,
GenericMapData(
our_field.entries.associate {
StringData.fromString(it.key) to StringData.fromString(it.value)
}
)
)
When we inspect the Parquet records however, we see the following is produced:
{
...
"meta_info": {
"key_value": [
{
"key": "providerCode",
"value": "declined-provider-code"
}
]
}
}
I had a look at the source code here in the link below, and the key_val
, key
, and val
that is added above is hardcoded when converting a `MAP`:
https://github.com/apache/flink/blob/ffecb68b11e2cd914f92699112b5adb62a48724e/flin[…]/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
I'd like to ask for advice on this issue. E.g. has anyone else had the requirement that we have and has anyone looked into this? Would it be possible to submit a pull request to address this?
Thank you 🙂Nihar Rao
02/10/2023, 4:18 PMjar xf <name>.jar
and the path clearly exists and is correctly defined in flink-conf.yaml.
My plugins dir looks like:
plugins
metrics-nihar
custom_reporter.jar
Can you please let me know how to debug this further?René
02/10/2023, 5:23 PMArtun Duman
02/10/2023, 6:07 PMSplitEnumerator
, specifically for Kafka sources. I see that it runs on job manager and round-robins to task managers. Does it mean that we might be bottlenecked by the job manager process and there isn’t a way to increase number of kafka partitions and make the reading/load balancing faster? Has anyone run into a limitation like this? Thanks!Amir Hossein Sharifzadeh
02/10/2023, 6:46 PMJoe Crowley
02/11/2023, 12:34 AMEugenio Marotti
02/11/2023, 10:22 AMTable legalFileEventsTable = legalFilesTable.join(eventsTable)
.where($("id").isEqual($("id_fascicolo")))
.select(
$("id").as("id_fascicolo"),
$("id_evento"),
$("giudice"),
$("nrg"),
$("codice_oggetto"),
$("ufficio"),
$("sezione"),
$("data_evento"),
$("evento"),
$("data_registrazione_evento")
);
Then I convert the joined table to a DataStream to apply some computation on the data. Here's the code I'm using:
DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(legalFileEventsTable)
.keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
.process(new PhaseDurationCounterProcessFunction());
phasesDurationsDataStream.print();
The PhaseDurationCounterProcessFunction emits a Row like this:
Row outputRow = Row.withNames(RowKind.INSERT);
outputRow.setField("id_fascicolo", currentState.getId_fascicolo());
outputRow.setField("nrg", currentState.getNrg());
outputRow.setField("giudice", currentState.getGiudice());
outputRow.setField("codice_oggetto", currentState.getCodice_oggetto());
outputRow.setField("ufficio", currentState.getUfficio());
outputRow.setField("sezione", currentState.getSezione());
outputRow.setField("fase", currentState.getPhase());
outputRow.setField("fase_completata", false);
outputRow.setField("durata", currentState.getDurationCounter());
out.collect(outputRow);
After collecting the results from the process function I reconvert the DataStream to a Table and execute the pipeline:
Table phasesDurationsTable = tEnv.fromChangelogStream(
phasesDurationsDataStream,
Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING())
.column("fase_completata", DataTypes.BOOLEAN())
.column("durata", DataTypes.BIGINT())
.primaryKey("id_fascicolo", "fase")
.build(),
ChangelogMode.upsert()
);
env.execute();
But during the startup I receive this exception:
Unable to find a field named 'id_fascicolo' in the physical data type derived from the given type information for schema declaration.
Make sure that the type information is not a generic raw type. Currently available fields are: [f0]
It seems that the row information (name and type) aren't available yet and so the exception is generated. I tried to invoke the env.execute() before the DataStream->Table conversion and in this case the job starts but I have no output if I print the phasesDurationsTable. Any suggestions on how to make this work?Jason Politis
02/11/2023, 3:17 PMItamar Dvir
02/11/2023, 6:50 PMReme Ajayi
02/11/2023, 8:49 PMSiddhesh Kalgaonkar
02/12/2023, 8:50 AMAs a general rule, operators are required to completely process a given watermark before forwarding it downstream. For example, WindowOperator will first evaluate all windows that should be fired, and only after producing all of the output triggered by the watermark will the watermark itself be sent downstream. In other words, all elements produced due to occurrence of a watermark will be emitted before the watermark.
Why do we need to send the watermark downstream? Can anyone explain the exact meaning of this paragraph? TIASiddhesh Kalgaonkar
02/12/2023, 12:57 PMAmir Hossein Sharifzadeh
02/12/2023, 9:50 PMt_env.sql_query(query).execute().print()
but can’t iterate over results and it doesn’t print anything:
table_result = t_env.sql_query(query).execute().collect()
with table_result as results:
for result in results:
print(result)
This is how I start the code:
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t1 = f"""
CREATE TABLE raw_table(
raw_id INT,
raw_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'empad_raw',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t2 = f"""
CREATE TABLE bkgd_table(
raw_id INT,
raw_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'empad_bkgd',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(t1)
t_env.execute_sql(t2)
query = "SELECT raw_table.raw_id, raw_table.raw_data, bkgd_table.raw_data " \
"FROM raw_table JOIN bkgd_table ON raw_table.raw_id = bkgd_table.raw_id"
t_env.sql_query(query).execute().print()
I will need to get the results from my query and pass the results in another function to process them. Thanks for any suggestions.Kyle Ahn
02/12/2023, 11:15 PMProcessAllWindowFunction
prior to keyBy
to share state with a downstream ProcessWindowFunction
post keyBy
?Jirawech Siwawut
02/13/2023, 2:46 AMStreamExecutionEnvironment env;
Properties props = new Properties();
props.put("s3.access-key", "myaccess");
props.put("s3.secret-key", "mysecret");
props.put("s3.endpoint", "myendpoint");
Configuration conf = ConfigurationUtils.createConfiguration(props);
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
Here is the error
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.AmazonClientException: Unable to load credentials from Amazon EC2 metadata service
Warren McKeown
02/13/2023, 2:58 PM-Djobmanager.memory.process.size=32768mb
-Djobmanager.memory.heap.size=24576mb
-Djobmanager.memory.off-heap.size=7168mb
-Djobmanager.memory.jvm-overhead.min=192mb
-Djobmanager.memory.jvm-overhead.max=1024mb
Nihar Rao
02/13/2023, 4:34 PM2023-02-13 10:58:05,901 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl [] - Error while registering metric: numRecordsOut.
java.lang.ClassCastException: class org.apache.flink.dropwizard.metrics.FlinkCounterWrapper cannot be cast to class com.codahale.metrics.Metric (org.apache.flink.dropwizard.metrics.FlinkCounterWrapper is in unnamed module of loader 'app'; com.codahale.metrics.Metric is in unnamed module of loader org.apache.flink.core.plugin.PluginLoader$PluginClassLoader @5833f5cd)
Due to this error, the metric is not reported. On looking into this, FlinkCounterWrapper does extend codahale.Counter. Can anyone please tell me how to fix this?Nitin Agrawal
02/14/2023, 4:46 AMrecord_id
is removed then flink jobs loads. In case record_id
is added even though the data type for query as well as sink matches it job load fails with the below error. record_id is created by UUID() method as part of flink SQL.
Query schema: [resource: STRING NOT NULL, attribute: STRING NOT NULL, duration: STRING, duration_type: STRING NOT NULL, id: STRING, value: STRING NOT NULL, year: STRING, month: STRING, day: STRING, timestamp: TIMESTAMP_LTZ(3) NOT NULL, record_id: CHAR(36) NOT NULL, _c11: TIMESTAMP_LTZ(3) NOT NULL]
Sink schema: [resource: STRING, id: STRING, duration: STRING, duration_type: STRING, attribute: STRING, value: STRING, year: STRING, month: STRING, day: STRING, record_id: CHAR(36), timestamp: TIMESTAMP_LTZ(3), _c11: TIMESTAMP_LTZ(3)]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168)
Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
Cause: Incompatible types for sink column 'record_id' at position 9.
Query schema: [resource: STRING NOT NULL, attribute: STRING NOT NULL, duration: STRING, duration_type: STRING NOT NULL, id: STRING, value: STRING NOT NULL, year: STRING, month: STRING, day: STRING, timestamp: TIMESTAMP_LTZ(3) NOT NULL, record_id: CHAR(36) NOT NULL, _c11: TIMESTAMP_LTZ(3) NOT NULL]
Sink schema: [resource: STRING, id: STRING, duration: STRING, duration_type: STRING, attribute: STRING, value: STRING, year: STRING, month: STRING, day: STRING, record_id: CHAR(36), timestamp: TIMESTAMP_LTZ(3), _c11: TIMESTAMP_LTZ(3)]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:208)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:151)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:276)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
Suparn Lele
02/14/2023, 5:41 AMPooja Shehrawat
02/14/2023, 7:18 AMRocksDBException: file is too short (1756 bytes) to be an sstable/opt/flink/rocksdb/job_
Any help would be appreciated. Thanks.Superskyyy
02/14/2023, 8:42 AMAdesh Dsilva
02/14/2023, 10:45 AMjobmanager.execution.failover-strategy=region
work? If I have two task pipeline A->B and I run it with parallelism of 5.
Suppose 2 of the tasks fail (type B), will flink restart only those 2 tasks(of type B) or 4 tasks (both A & B) or all 10 tasks?Phil Sheets
02/14/2023, 1:38 PMYaroslav Bezruchenko
02/14/2023, 2:47 PMWarning ClusterDeploymentException 2m39s JobManagerDeployment Could not create Kubernetes cluster "pipeline".
Łukasz Maternia
02/14/2023, 2:57 PMmessage=Forbidden: may not specify more than 1 volume type, reason=FieldValueForbidden, additionalProperties={}), StatusCause(field=spec.containers[0].volumeMounts[0].name, message=Not found: "local-persistent-volume"
There are also otwer volumes at the common podTemplate and I'm wondering if operator is able to properly merge main podTemplate.spec.volumes with taskmanager.podTemplate.spec.volumes.