Felix Angell
04/26/2023, 10:54 AMGintaras Matulas
04/26/2023, 11:01 AMЮрий Смирнов
04/26/2023, 11:30 AMOriginal error was: dlopen(/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so, 0x0002): tried: '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64')), '/System/Volumes/Preboot/Cryptexes/OS/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (no such file), '/Users/ysmirnov/Documents/Repos/flink-python-jobs/venv/lib/python3.9/site-packages/numpy/core/_multiarray_umath.cpython-39-darwin.so' (mach-o file, but is an incompatible architecture (have 'arm64', need 'x86_64'))Alexis Leclerc
04/26/2023, 2:02 PM3.0.0-1.171.17.0David Wisecup
04/26/2023, 3:11 PMAeden Jameson
04/26/2023, 6:30 PMTan Kim
04/27/2023, 1:05 AMAmenreet Singh Sodhi
04/27/2023, 10:03 AM2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 1 disconnected.
2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 8759 due to node 1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
2023-04-10 13:48:39,366 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:48:40,317 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-122] Node 19 disconnected.
2023-04-10 13:51:20,059 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Node 16 disconnected.
2023-04-10 13:51:20,060 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Cancelled in-flight FETCH request with correlation id 10312 due to node 16 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms)
2023-04-10 13:51:20,060 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=event-executor-client-1, groupId=event-executor-grp] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 16:
org.apache.kafka.common.errors.DisconnectException: null
2023-04-10 13:51:48,469 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
2023-04-10 13:51:48,829 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 4
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.Erlend F
04/27/2023, 11:05 AMRashmin Patel
04/27/2023, 12:00 PMchunilal kukreja
04/27/2023, 1:06 PMMehul Batra
04/27/2023, 2:51 PMdetect_noop[true]} of bulk request failed with status 409.
	at org.apache.flink.connector.opensearch.sink.OpensearchWriter.extractFailures(OpensearchWriter.java:346)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
	at 
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Single action update
[OpenSearch exception [type=version_conflict_engine_exception, reason=[69404946617821]: version conflict, document already exists (current version [1])]]Elizaveta Batanina
04/27/2023, 3:19 PMt_env.execute_sql("""
CREATE VIEW response_time AS (
SELECT KEY_COUNTRY_CODE, GLOBAL_ENTITY_ID, VENDOR_CODE, TM,
TIMESTAMPDIFF(SECOND,`ORDER_PARSED`.SENT_TO_VENDOR_AT, `ORDER_PARSED`.ACCEPTED_BY_VENDOR_AT) as RESPONSE_TIME
from delivery_events_liza)
""")An error occurred while calling o498.executeSql.
: java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) *ROWTIME* TM, INTEGER RESPONSE_TIME) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" KEY_COUNTRY_CODE, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" GLOBAL_ENTITY_ID, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" VENDOR_CODE, TIMESTAMP(3) TM, INTEGER RESPONSE_TIME) NOT NULL
rel:
LogicalProject(KEY_COUNTRY_CODE=[$2], GLOBAL_ENTITY_ID=[$3], VENDOR_CODE=[JSON_VALUE($1, _UTF-16LE'$.VENDOR_CODE')], TM=[CAST(parse_bq_datetime_udf($0)):TIMESTAMP(3)], RESPONSE_TIME=[CAST(/INT(Reinterpret(-(parse_order($1).ACCEPTED_BY_VENDOR_AT, parse_order($1).SENT_TO_VENDOR_AT)), 1000)):INTEGER])
  LogicalTableScan(table=[[default_catalog, default_database, delivery_events_liza]])Krzysztof Chmielewski
04/27/2023, 4:06 PMKryo serializer scala extensions are not availableI'm not using Scala in my sandbox job, pure java. What should I do to turn this off? the same thing was reported by other user on April 21st but unfortunately got no response https://apache-flink.slack.com/archives/C03G7LJTS2G/p1682093159771169
Raghunadh Nittala
04/27/2023, 4:12 PMCREATE TABLE sink_table_s3 (event_id STRING NOT NULL,event_type STRING NOT NULL,event_name STRING NOT NULL,results_count BIGINTdate'connector' = 'filesystem','path' = '<path>','format' = 'parquet');INSERT INTO sink_table_s3SELECT event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS record_date, COUNT(*) results_countFROM source_tableGROUP BY event_id, event_type, event_name, TUMBLE(proc_time, INTERVAL '1' HOUR);event_idevent_typeproc_timeMali
04/27/2023, 5:09 PMAbolfazl Ghahremani
04/28/2023, 5:10 AMSuriya Krishna Mariappan
04/28/2023, 5:52 AMDheeraj Panangat
04/28/2023, 11:04 AMorg.apache.flink.table.api.ValidationException: Column types of query result and sink for 'default_catalog.default_database.table_name' do not match.
Cause: Incompatible types for sink column 'summary_date' at position 2.
Query schema: [*********, summary_date: TIMESTAMP_LTZ(3) NOT NULL, **********]
Sink schema:  [*********, summary_date: DATE, *******************************]Tan Kim
04/28/2023, 2:59 PMAshwin Gupta
04/28/2023, 4:28 PMNPEPriorityQueueMayur Sharma
04/28/2023, 5:31 PMuser_id, price, event_timestampmin(price) and max(price)user_idJirawech Siwawut
04/29/2023, 3:16 AM// broadcast
        harness0.processElement(new CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0);
        harness0.processElement(new CompactionUnit(1, "p0", Collections.singletonList(f3)), 0);
        harness0.processElement(new CompactionUnit(2, "p0", Arrays.asList(f2, f5)), 0);
        harness0.processElement(new CompactionUnit(3, "p0", Collections.singletonList(f6)), 0);
        harness0.processElement(new EndCompaction(1), 0);
        // check compacted file generated
        assertThat(fs.exists(new Path(folder, "compacted-f0"))).isTrue();
        assertThat(fs.exists(new Path(folder, "compacted-f2"))).isTrue();
        // f3 and f6 are in the charge of another task
        assertThat(fs.exists(new Path(folder, "compacted-f3"))).isFalse();
        assertThat(fs.exists(new Path(folder, "compacted-f6"))).isFalse();CG
04/29/2023, 1:11 PMLeandro Garcia
04/30/2023, 2:15 AMSrivatsav Gorti
04/30/2023, 1:12 PMpublic class LookupFlinkApp implements Serializable {
  private static final String CONFIG_FILE = "/configuration.properties";
  private static ParameterTool applicationProperties;
  private static final StreamExecutionEnvironment STREAM_ENVIRONMENT =
      StreamExecutionEnvironment.getExecutionEnvironment();private static final Logger logger = LoggerFactory.getLogger(LookupFlinkApp.class);
  static EnvironmentSettings settings = EnvironmentSettings
          .newInstance()
          .inStreamingMode()
          .build();
  static StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(STREAM_ENVIRONMENT, settings);public static void main(String[] args) throws Exception {
    ParameterTool parameter;try (InputStream in = LookupFlinkApp.class.getResourceAsStream(CONFIG_FILE)) {
      parameter = ParameterTool.fromPropertiesFile(in);
    }
    try (InputStream in =
                 LookupFlinkApp.class.getResourceAsStream(AppConstants.APPLICATION_PROPERTIES_YAML_FILE)) {
      applicationProperties = ParameterTool.fromPropertiesFile(in);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    Properties properties = parameter.getProperties();
    Properties appProperties = applicationProperties.getProperties();KafkaSource<String> kafkaSource =
        KafkaSource.<String>builder()
            .setBootstrapServers(applicationProperties.get("input.bootstrap.servers"))
            .setTopics(applicationProperties.get("kafka.input.topic"))
            .setGroupId(applicationProperties.get("input.group.id"))
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setStartingOffsets(OffsetsInitializer.latest())
            .build();final SingleOutputStreamOperator<String> kafkaData =
        STREAM_ENVIRONMENT
            .fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource")
            .setParallelism(1);SingleOutputStreamOperator<FlowEntity> dataFromSource =
        kafkaData.process(new StringToFlowEntity()).setParallelism(1);// Stream of JSONObjects
    DataStream<JSONObject> content = dataFromSource.map(new CustomMapFunction(properties));// convert the content datastream into a table
    DataStream<Row> rows = content.flatMap(new BuildTableRowsFunc());Table postingsTable = streamTableEnvironment.fromDataStream(rows);postingsTable.printSchema();Table result = postingsTable.select($("*"));streamTableEnvironment.toDataStream(result, Row.class).map(new PrintFlinkTable(result));STREAM_ENVIRONMENT.execute();
  }public static class BuildTableRowsFunc implements FlatMapFunction<JSONObject, Row> {@Override
    public void flatMap(JSONObject jsonObject, Collector<Row> collector) throws Exception {
      Set<String> keySet = jsonObject.keySet();
      Row row = new Row(keySet.size());
      int i = 0;
      for (String key : keySet) {
        row.setField(i++, jsonObject.get(key));
      }
      collector.collect(row);
    }
  }public static class CustomMapFunction extends RichMapFunction<JSONObject, JSONObject> {
    private final Properties properties;public CustomMapFunction(Properties properties) {
      this.properties = properties;
    }@Override
    public JSONObject map(JSONObject jsonObject) throws Exception {
      jsonObject.put("tagId", "a1b2c3d4");
      return jsonObject;
    }
  }
}| f0 |
| -- |
| +I[142, data.posting, dateTime, 2015-11-03T09:34:45, 201, 2023-04-30T12:48:15Z, 2016-11-30T20:59:59 application/json, Edm.Int64,_agency, a1b2c3d4e5]Abolfazl Ghahremani
05/01/2023, 11:02 AMОлег Спица
05/01/2023, 8:55 PMtriggerSavepoint()build/junit15822280378836783130/archive-output/2_shards/0/2023/04/26/17/.part-0-0.inprogress.8a05ee6b-c969-4e15-88ba-9a3b78f4603523:41:38.838 [Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 1 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1682973698824 for job e63ff74fc4bc34a54c3f1f05855d149c.23:41:38.948 [jobmanager-io-thread-3] INFO  o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 1 for job e63ff74fc4bc34a54c3f1f05855d149c (15345 bytes, checkpointDuration=121 ms, finalizationTime=3 ms).23:42:38.955 [Checkpoint Timer] INFO  o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1682973758954 for job e63ff74fc4bc34a54c3f1f05855d149c.23:42:38.991 [jobmanager-io-thread-5] INFO  o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 2 for job e63ff74fc4bc34a54c3f1f05855d149c (15388 bytes, checkpointDuration=33 ms, finalizationTime=4 ms).23:42:39.528 [Test worker] INFO  c.s.d.t.h.FlinkHarness$Companion - Cancelling the job e63ff74fc4bc34a54c3f1f05855d149c...Slackbot
05/02/2023, 4:08 AMChirag Dewan
05/02/2023, 5:21 AM