Felix Angell
04/26/2023, 10:54 AMGintaras Matulas
04/26/2023, 11:01 AMЮрий Смирнов
04/26/2023, 11:30 AMOriginal error was: dlopen(/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so, 0x0002): tried: '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (no such file), '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))
I've tried to use conda, reinstalling numpy and pyflink, set "-arch arm64" in pip install, tried all pythons 3.7-3.10, tried to use rosetta. Nothing helps.
If anyone knows how to solve this issue, please help.Alexis Leclerc
04/26/2023, 2:02 PM3.0.0-1.17
) that does not seem to exist (see https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc). Same applies to version 1.17.0
of the connector, which is usually the format we use.David Wisecup
04/26/2023, 3:11 PMAeden Jameson
04/26/2023, 6:30 PMTan Kim
04/27/2023, 1:05 AMAmenreet Singh Sodhi
04/27/2023, 10:03 AM2023-04-10 13:48:39,366 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 1 disconnected.
2023-04-10 13:48:39,366 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 8759 due to node 1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
2023-04-10 13:48:39,366 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:48:40,317 INFO org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-122] Node 19 disconnected.
2023-04-10 13:51:20,059 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 16 disconnected.
2023-04-10 13:51:20,060 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 10312 due to node 16 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
2023-04-10 13:51:20,060 INFO org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 16:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:51:48,469 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
2023-04-10 13:51:48,829 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Also, found this ticket with the above issue: https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
Seems the issue is still not resolved. If anyone knows how to solve this issue, please help.ThanksErlend F
04/27/2023, 11:05 AMRashmin Patel
04/27/2023, 12:00 PMchunilal kukreja
04/27/2023, 1:06 PMMehul Batra
04/27/2023, 2:51 PMdetect_noop[true]} of bulk request failed with status 409.
at org.apache.flink.connector.opensearch.sink.OpensearchWriter.extractFailures(OpensearchWriter.java:346)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Single action update
[OpenSearch exception [type=version_conflict_engine_exception, reason=[69404946617821]: version conflict, document already exists (current version [1])]]
Elizaveta Batanina
04/27/2023, 3:19 PMt_env.execute_sql("""
CREATE VIEW response_time AS (
SELECT KEY_COUNTRY_CODE, GLOBAL_ENTITY_ID, VENDOR_CODE, TM,
TIMESTAMPDIFF(SECOND,`ORDER_PARSED`.SENT_TO_VENDOR_AT, `ORDER_PARSED`.ACCEPTED_BY_VENDOR_AT) as RESPONSE_TIME
from delivery_events_liza)
""")
An error occurred while calling o498.executeSql.
: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) *ROWTIME* TM, INTEGER RESPONSE_TIME) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) TM, INTEGER RESPONSE_TIME) NOT NULL
rel:
LogicalProject(KEY_COUNTRY_CODE=[$2], GLOBAL_ENTITY_ID=[$3], VENDOR_CODE=[JSON_VALUE($1, _UTF-16LE'$.VENDOR_CODE')], TM=[CAST(parse_bq_datetime_udf($0)):TIMESTAMP(3)], RESPONSE_TIME=[CAST(/INT(Reinterpret(-(parse_order($1).ACCEPTED_BY_VENDOR_AT, parse_order($1).SENT_TO_VENDOR_AT)), 1000)):INTEGER])
LogicalTableScan(table=[[default_catalog, default_database, delivery_events_liza]])
How to fix this problem?
If I just remove TM field, which is a watermark, my query finishes without an error.
Thanks for help!Krzysztof Chmielewski
04/27/2023, 4:06 PMKryo serializer scala extensions are not availableI'm not using Scala in my sandbox job, pure java. What should I do to turn this off? the same thing was reported by other user on April 21st but unfortunately got no response https://apache-flink.slack.com/archives/C03G7LJTS2G/p1682093159771169
Raghunadh Nittala
04/27/2023, 4:12 PMCREATE TABLE sink_table_s3 (
event_id STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
``date` STRING,`
results_count BIGINT
`) PARTITIONED BY (event_id, event_type, date
) WITH (`
'connector' = 'filesystem',
'path' = '<path>',
'format' = 'parquet'
);
I’m using the below SQL query to sink data to S3 in parquet format:
INSERT INTO sink_table_s3
SELECT event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM source_table
GROUP BY event_id, event_type, event_name, TUMBLE(proc_time, INTERVAL '1' HOUR);
I am partitioning the table on event_id, event_type and date columns. I observed the parquet files are getting saved for an event_id
, event_type
, but the date is being wrong. The data being processed today is being saved to 2023-04-26 folder. As I am using proc_time
to derive the date, I expect data to be saved to 2023-04-27 folder.
Can someone help me in identifying the issue?Mali
04/27/2023, 5:09 PMAbolfazl Ghahremani
04/28/2023, 5:10 AMSuriya Krishna Mariappan
04/28/2023, 5:52 AMDheeraj Panangat
04/28/2023, 11:04 AMorg.apache.flink.table.api.ValidationException: Column types of query result and sink for 'default_catalog.default_database.table_name' do not match.
Cause: Incompatible types for sink column 'summary_date' at position 2.
Query schema: [*********, summary_date: TIMESTAMP_LTZ(3) NOT NULL, **********]
Sink schema: [*********, summary_date: DATE, *******************************]
Can someone please help look at this ?
ThanksTan Kim
04/28/2023, 2:59 PMAshwin Gupta
04/28/2023, 4:28 PMNPE
when kryo is trying to serialise PriorityQueue
.
Stack trace in threadMayur Sharma
04/28/2023, 5:31 PMuser_id, price, event_timestamp
2. Output - every 1 minute aggregate min(price) and max(price)
by user_id
for the past 60 days
Was hoping to use sliding/hop windows of size 60 that slide every 1 min but seems like the first aggregation won't be available until first 60 days are completed. Is OVER the only windowing option for this case or am I missing something about sliding windows?Jirawech Siwawut
04/29/2023, 3:16 AM// broadcast
harness0.processElement(new CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0);
harness0.processElement(new CompactionUnit(1, "p0", Collections.singletonList(f3)), 0);
harness0.processElement(new CompactionUnit(2, "p0", Arrays.asList(f2, f5)), 0);
harness0.processElement(new CompactionUnit(3, "p0", Collections.singletonList(f6)), 0);
harness0.processElement(new EndCompaction(1), 0);
// check compacted file generated
assertThat(fs.exists(new Path(folder, "compacted-f0"))).isTrue();
assertThat(fs.exists(new Path(folder, "compacted-f2"))).isTrue();
// f3 and f6 are in the charge of another task
assertThat(fs.exists(new Path(folder, "compacted-f3"))).isFalse();
assertThat(fs.exists(new Path(folder, "compacted-f6"))).isFalse();
CG
04/29/2023, 1:11 PMLeandro Garcia
04/30/2023, 2:15 AMSrivatsav Gorti
04/30/2023, 1:12 PMpublic class LookupFlinkApp implements Serializable {
private static final String CONFIG_FILE = "/configuration.properties";
private static ParameterTool applicationProperties;
private static final StreamExecutionEnvironment STREAM_ENVIRONMENT =
StreamExecutionEnvironment.getExecutionEnvironment();
private static final Logger logger = LoggerFactory.getLogger(LookupFlinkApp.class);
static EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
static StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(STREAM_ENVIRONMENT, settings);
public static void main(String[] args) throws Exception {
ParameterTool parameter;
try (InputStream in = LookupFlinkApp.class.getResourceAsStream(CONFIG_FILE)) {
parameter = ParameterTool.fromPropertiesFile(in);
}
try (InputStream in =
LookupFlinkApp.class.getResourceAsStream(AppConstants.APPLICATION_PROPERTIES_YAML_FILE)) {
applicationProperties = ParameterTool.fromPropertiesFile(in);
} catch (IOException e) {
throw new RuntimeException(e);
}
Properties properties = parameter.getProperties();
Properties appProperties = applicationProperties.getProperties();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setBootstrapServers(applicationProperties.get("input.bootstrap.servers"))
.setTopics(applicationProperties.get("kafka.input.topic"))
.setGroupId(applicationProperties.get("input.group.id"))
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
final SingleOutputStreamOperator<String> kafkaData =
STREAM_ENVIRONMENT
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
.setParallelism(1);
SingleOutputStreamOperator<FlowEntity> dataFromSource =
kafkaData.process(new StringToFlowEntity()).setParallelism(1);
// Stream of JSONObjects
DataStream<JSONObject> content = dataFromSource.map(new CustomMapFunction(properties));
// convert the content datastream into a table
DataStream<Row> rows = content.flatMap(new BuildTableRowsFunc());
Table postingsTable = streamTableEnvironment.fromDataStream(rows);
postingsTable.printSchema();
Table result = postingsTable.select($("*"));
streamTableEnvironment.toDataStream(result, Row.class).map(new PrintFlinkTable(result));
STREAM_ENVIRONMENT.execute();
}
public static class BuildTableRowsFunc implements FlatMapFunction<JSONObject, Row> {
@Override
public void flatMap(JSONObject jsonObject, Collector<Row> collector) throws Exception {
Set<String> keySet = jsonObject.keySet();
Row row = new Row(keySet.size());
int i = 0;
for (String key : keySet) {
row.setField(i++, jsonObject.get(key));
}
collector.collect(row);
}
}
public static class CustomMapFunction extends RichMapFunction<JSONObject, JSONObject> {
private final Properties properties;
public CustomMapFunction(Properties properties) {
this.properties = properties;
}
@Override
public JSONObject map(JSONObject jsonObject) throws Exception {
jsonObject.put("tagId", "a1b2c3d4");
return jsonObject;
}
}
}
Sample Output
| f0 |
| -- |
| +I[142, data.posting, dateTime, 2015-11-03T09:34:45, 201, 2023-04-30T12:48:15Z, 2016-11-30T20:59:59 application/json, Edm.Int64,_agency, a1b2c3d4e5]
Abolfazl Ghahremani
05/01/2023, 11:02 AMОлег Спица
05/01/2023, 8:55 PMtriggerSavepoint()
method, but I see that after savepoint created, the file in filesystem still is hidden and inprogress, like: build/junit15822280378836783130/archive-output/2_shards/0/2023/04/26/17/.part-0-0.inprogress.8a05ee6b-c969-4e15-88ba-9a3b78f46035
, but I’m expecting that file should be a permanent (closed, not hidden).
I’ve did some attempts to find whats going on and how to fix it and one interesting thing I found: if increase timeout in test and job will trigger checkpoint storing (not savepoint triggered by hands, but checkpoint schedule in 1min) - so after it checkpoint everything as expected: file renamed and stored.
23:41:38.838 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 1 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1682973698824 for job e63ff74fc4bc34a54c3f1f05855d149c.
23:41:38.948 [jobmanager-io-thread-3] INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 1 for job e63ff74fc4bc34a54c3f1f05855d149c (15345 bytes, checkpointDuration=121 ms, finalizationTime=3 ms).
23:42:38.955 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1682973758954 for job e63ff74fc4bc34a54c3f1f05855d149c.
23:42:38.991 [jobmanager-io-thread-5] INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 2 for job e63ff74fc4bc34a54c3f1f05855d149c (15388 bytes, checkpointDuration=33 ms, finalizationTime=4 ms).
23:42:39.528 [Test worker] INFO c.s.d.t.h.FlinkHarness$Companion - Cancelling the job e63ff74fc4bc34a54c3f1f05855d149c...
Does anyone know why it could be? What changed and how I should use it to fix my tests? I hope that in production it should work with enabled checkpoints, but what about savepoints? Thanks a lot for any help and suggestionsSlackbot
05/02/2023, 4:08 AMChirag Dewan
05/02/2023, 5:21 AM