Nikola Stanisavljevic
03/15/2023, 5:16 PMYufei Chen
03/15/2023, 6:34 PM2023-03-15 18:20:01,775 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/myauto] Starting reconciliation
2023-03-15 18:20:01,776 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][default/myauto] Getting service for myauto
2023-03-15 18:20:01,788 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][default/myauto] Resource fully reconciled, nothing to do...
2023-03-15 18:20:01,788 o.a.f.k.o.c.FlinkDeploymentController [INFO ][default/myauto] End of reconciliation
piby 180
03/15/2023, 6:38 PMenv = StreamExecutionEnvironment.get_execution_environment()
jars = "file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-sql-parquet-1.16.1.jar; \
file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-parquet-1.16.1.jar; \
file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-hadoop-fs.jar; \
file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/flink-hadoop-compatibility_2.12-1.16.1.jar; \
file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/hadoop-common-3.3.4.jar; \
file:///mnt/c/Users/piby/Downloads/flink/examples/python/jars/hadoop-client-3.3.4.jar"
env.add_jars(jars)
env.add_classpaths(jars)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
# write all the data to one file
env.set_parallelism(1)
# FileSource
file_source = FileSource.for_record_stream_format(StreamFormat.text_line_format(), "/mnt/c/Users/piby/Downloads/flink/examples/python/input") \
.monitor_continuously(Duration.of_millis(5)) \
.build()
ds = env.from_source(file_source, watermark_strategy=WatermarkStrategy.no_watermarks(), source_name="File Source")
ds = ds.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
.key_by(lambda i: i[0], key_type=Types.STRING())\
.reduce(lambda i, j: (i[0], i[1] + j[1]))
row_type = DataTypes.ROW([
DataTypes.FIELD(name='word', data_type=DataTypes.STRING(), description="word name"),
DataTypes.FIELD(name='count', data_type=<http://DataTypes.INT|DataTypes.INT>(), description="total count")
])
file_sink = FileSink.for_bulk_format(base_path=output_path, \
writer_factory=ParquetBulkWriters.for_row_type(
row_type,
hadoop_config=Configuration(),
utc_timestamp=True,
)) \
.with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".parquet")
.build()) \
.with_rolling_policy(RollingPolicy.default_rolling_policy()) \
.build()
ds.sink_to(sink=file_sink)
env.execute()
Here is the error stack
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[1], line 57
46 ds = ds.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), <http://Types.INT|Types.INT>()])) \
47 .key_by(lambda i: i[0], key_type=Types.STRING())\
48 .reduce(lambda i, j: (i[0], i[1] + j[1]))
50 row_type = DataTypes.ROW([
51 DataTypes.FIELD(name='word', data_type=DataTypes.STRING(), description="word name"),
52 DataTypes.FIELD(name='count', data_type=<http://DataTypes.INT|DataTypes.INT>(), description="total count")
53 ])
56 file_sink = FileSink.for_bulk_format(base_path=output_path, \
---> 57 writer_factory=ParquetBulkWriters.for_row_type(
58 row_type,
59 utc_timestamp=True,
60 )) \
61 .with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
62 .with_output_file_config(
63 OutputFileConfig.builder()
64 .with_part_prefix("prefix")
65 .with_part_suffix(".parquet")
66 .build()) \
67 .with_rolling_policy(RollingPolicy.default_rolling_policy()) \
68 .build()
70 ds.sink_to(sink=file_sink)
71 env.execute()
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/formats/parquet.py:212, in ParquetBulkWriters.for_row_type(row_type, hadoop_config, utc_timestamp)
208 jvm = get_gateway().jvm
209 JParquetRowDataBuilder = jvm.org.apache.flink.formats.parquet.row.ParquetRowDataBuilder
210 return RowDataBulkWriterFactory(JParquetRowDataBuilder.createWriterFactory(
211 _to_java_data_type(row_type).getLogicalType(),
--> 212 create_hadoop_configuration(hadoop_config),
213 utc_timestamp
214 ), row_type)
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/datastream/utils.py:39, in create_hadoop_configuration(config)
37 def create_hadoop_configuration(config: Configuration):
38 jvm = get_gateway().jvm
---> 39 hadoop_config = jvm.org.apache.hadoop.conf.Configuration()
40 for k, v in config.to_dict().items():
41 hadoop_config.set(k, v)
File ~/anaconda3/envs/flink/lib/python3.8/site-packages/pyflink/util/exceptions.py:185, in install_py4j_hooks.<locals>.wrapped_call(self, *args, **kwargs)
184 def wrapped_call(self, *args, **kwargs):
--> 185 raise TypeError(
186 "Could not found the Java class '%s'. The Java dependencies could be specified via "
187 "command line argument '--jarfile' or the config option 'pipeline.jars'" % self._fqn)
TypeError: Could not found the Java class 'org.apache.hadoop.conf.Configuration'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
Krish Narukulla
03/15/2023, 6:56 PM2023-03-15 18:45:43,383 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (28/32)#3754 (7a8b9af226cd4e016196e035ecd2d11a_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_27_3754).
2023-03-15 18:45:43,383 INFO org.apache.flink.runtime.taskmanager.Task [] - Task Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (28/32)#3754 is already in state FAILED
2023-03-15 18:45:43,383 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: ics_raw[1] -> (Calc[2] -> Calc[3] -> sink_kafka[4]: Writer -> sink_kafka[4]: Committer, Calc[5]) (32/32)#3754 (7a8b9af226cd4e016196e035ecd2d11a_e3dfc0d7e9ecd8a43f85f0b68ebf3b80_31_3754) switched from DEPLOYING to FAILED with failure cause: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
old:[p-72449e261468158ece284f84d3d13d5995e23b89-295012ce09be8c6d30610c99e0563bf1]
new:[p-72449e261468158ece284f84d3d13d5995e23b89-14ef7154e3312d2ce68d9fb6776e5b7f]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
Chloe He
03/15/2023, 7:06 PMpiby 180
03/15/2023, 10:44 PMdeserialization_schema = AvroRowDeserializationSchema(
avro_schema_string="""
{
"type": "record",
"name": "TestObject",
"namespace": "ca.dataedu",
"fields": [{
"name": "count",
"type": ["null", "int"],
"default": null
}, {
"name": "word",
"type": ["null", "string"],
"default": null
}]
}
"""
)
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers('0.0.0.0:9092') \
.set_topics("test_topic") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(deserialization_schema) \
.build()
ds = st_env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "Kafka Source")
Error traceback
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -62 out of bounds for length 2
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:164)
... 18 more
Python code on the producer side
from random_word import RandomWords
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
r = RandomWords()
for i in range(10):
producer.send('test_topic', {'word': r.get_random_word(), 'count' : i})
piby 180
03/16/2023, 8:56 AMfile_sink = FileSink.for_bulk_format(base_path=output_path, \
writer_factory=ParquetBulkWriters.for_row_type(
row_type,
hadoop_config=Configuration(),
utc_timestamp=True,
)) \
.with_bucket_assigner(BucketAssigner.date_time_bucket_assigner(format_str="yyyy/MM/dd/HH", timezone_id="UTC")) \
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".parquet")
.build()) \
.with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
.build()
Evaldas Buinauskas
03/16/2023, 1:57 PMSlackbot
03/16/2023, 3:01 PMAbhinav sharma
03/16/2023, 4:18 PMThijs van de Poll
03/16/2023, 4:45 PMgroupBy
operation and attempt to collect a list of nested objects. This results in: MULTISET<MAP<STRING,STRING>>
. I do not necessarily need a multiset, Ideally it would be: ARRAY<MAP<STRING,STRING>>
, but I cannot find how to do that (except using UDFs I guess)?
• When I write to Iceberg, I get the following error: Invalid map: MULTISET<MAP<STRING, STRING>> is not a map
.
Iny ideas?Adam Augusta
03/16/2023, 6:17 PMAdam Augusta
03/16/2023, 6:17 PMTrystan
03/16/2023, 6:21 PMAdam Augusta
03/16/2023, 8:18 PMtEnv.createTemporaryView(“contractswithevents”, “select contract_id, isin, leg1.events as leg1events, leg2.events as leg2events from contracts”));
Table leg1Events = tEnv.sqlQuery(“select contract_id, isin, ‘leg1’ as leg, e.event_id, e.quantity from contractswithevents CROSS JOIN UNNEST(leg1events) as e(event_id, quantity)”);
Table leg2Events = …
Table events = leg1Events.unionAll(leg2Events);
TablePipeline pipeline = events.groupBy($(“contract_id”), $(“quantity”).sum()).insertInto(“results”);
The optimized execution plan has 2 `Exchange(distribution=[hash[contract_id]])`s, one after the scan, and one between the union and the aggregate.
What gives? Is the planner getting baffled by the unnest?Adam Augusta
03/16/2023, 8:19 PMAdam Augusta
03/16/2023, 8:38 PMEmmanuel Leroy
03/16/2023, 10:03 PMJeff Levesque
03/17/2023, 1:01 AMDEFAULT
value for a column? If my original table is as follows:
return """CREATE TABLE {0} (
ticker VARCHAR(6),
{2} TIMESTAMP(3),
window_end TIMESTAMP(3),
first_price DOUBLE,
last_price DOUBLE,
min_price DOUBLE,
max_price DOUBLE,
notify_email VARCHAR(320),
WATERMARK FOR {2} AS {2} - INTERVAL '{3}' SECOND
) WITH (
'connector' = '{1}'
)""".format(
table_name,
connector,
field_watermark,
watermark_interval
)
How can I perform something like the following:
return """CREATE TABLE {0} (
ticker VARCHAR(6),
{2} TIMESTAMP(3),
window_end TIMESTAMP(3),
first_price DOUBLE,
last_price DOUBLE,
min_price DOUBLE,
max_price DOUBLE,
notify_email VARCHAR(320) METADATA DEFAULT '<mailto:my@email.com|my@email.com>',
WATERMARK FOR {2} AS {2} - INTERVAL '{3}' SECOND
) WITH (
'connector' = '{1}'
)""".format(
table_name,
connector,
field_watermark,
watermark_interval
)
When I try something like that I get the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "DEFAULT" at line 9, column 44.
Was expecting one of:
"FROM" ...
"COMMENT" ...
"VIRTUAL" ...
")" ...
"," ...
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:56)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:98)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered "DEFAULT" at line 9, column 44.
Was expecting one of:
"FROM" ...
"COMMENT" ...
"VIRTUAL" ...
")" ...
"," ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:452)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:215)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:155)
at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:180)
at org.apache.flink.table.planner.parse.CalciteParser.parse(CalciteParser.java:54)
... 13 more
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 9, column 44.
Was expecting one of:
"FROM" ...
"COMMENT" ...
"VIRTUAL" ...
")" ...
"," ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:39897)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:39708)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5221)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6239)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:20999)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3421)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3924)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:263)
at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:153)
... 15 more
Process finished with exit code 1
Jeff Levesque
03/17/2023, 1:06 AMChen-Che Huang
03/17/2023, 1:50 AMincremental checkpointing
and configure state.checkpoints.num-retained
parameter to retain 10 checkpoints only in our Flink applications. Although Flink keeps only 10 checkpoints, the number of files under shared
folder keeps increasingly. To reduce the storage cost, it seems that the CLAIM mode
mentioned in this doc is useful. We configure execution.savepoint-restore-mode
to CLAIM
but Flink still doesn’t delete files under the shared
folder. I’m curious about the CLAIM mode
only delete savepoints and checkpoints after the Flink app starts from using a new savepoint? If so, does anyone know whether there exists some way to limit the growth of files under the shared
folder for checkpoints? Any comment is appreciated 🙏Ta-Chun Shen
03/17/2023, 4:04 AMlast-state
upgrade mode. After the job was restored, I noticed that the full checkpoint size drastically increased from 600GB to 900GB. To investigate this issue, I extract the internal RocksDB from one of the operator subtask state and use the ldb
tool to dump the database. I find that there are a large number of deleted rows. The following is the command I use:
$ ldb --db=./db --column_family=name idump --hex | grep 'type:1' | wc -l
134529889
$ ldb --db=./db --column_family=name idump --hex | grep 'type:0' | wc -l
33779429
We can see that most of the rows are deleted. Next, I try to manually run compaction on the database and the size decreases from 16GB to 6.6GB. Why could rescaling create so many deleted rows in this case? Is there any configuration to alleviate the size increasing problem or it is just the expected behavior of rescaling?Soumya Ghosh
03/17/2023, 7:04 AMx
minutesAbhinav sharma
03/17/2023, 7:21 AMAbhinav sharma
03/17/2023, 12:21 PMEvaldas Buinauskas
03/17/2023, 12:32 PMFrancisco Morillo
03/17/2023, 1:17 PMLiad Shachoach
03/17/2023, 4:26 PMEmmanuel Leroy
03/17/2023, 8:37 PMCaused by: org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /tmp/flink-web-efe42546-a97a-4393-87c2-f61f782595dc/flink-web-upload/5d8e11e0-4736-49b7-ac82-0e2bc9bd3b5b_beam.jar does not exist
Somehow the job ends up working after some time when the jar is found. However using Beam, the Beam job fails because of this, and I see the Beam job restarting many, many times before it finally works.
Is there any kind of setting to define how to ‘wait’ somehow for the jar to be present before attempting to launch the job? or is it a bug?Deepyaman Datta
03/17/2023, 9:37 PMunexpected correlate variable
error; e.g.:
Flink SQL> SELECT * FROM (VALUES (TIMESTAMP '2023-03-17 22:00:50', '2475090000000000', 0), (TIMESTAMP '2023-03-17 22:00:14', '630423000000', 0)) t0(inference_time, cc_num, is_fraud)
> WHERE inference_time > (
> SELECT max(event_time)
> FROM iceberg_tx_average_transaction_amount_tx_1m t1
> WHERE t0.cc_num = t1.cc_num
> );
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: unexpected correlate variable $cor14 in the plan
---
Hi! I'm trying to execute the following query:
WITH t0 AS (
SELECT t3.`event_time`, t3.`cc_num`,
t3.`amt` AS `average_transaction_amount_tx_1m`
FROM iceberg_tx_average_transaction_amount_tx_1m t3
),
t1 AS (
SELECT t0.`event_time`, t0.`average_transaction_amount_tx_1m`,
t0.`cc_num` AS `_right_by`
FROM t0
)
SELECT `inference_time`, `cc_num`, max(t2.`is_fraud`) AS `is_fraud`,
max(t1.`average_transaction_amount_tx_1m`) AS `average_transaction_amount_tx_1m`
FROM (VALUES (TIMESTAMP '2023-03-17 20:48:49', '6540980000000000', 0), (TIMESTAMP '2023-03-17 20:48:51', '571465000000', 0)) t2(inference_time, cc_num, is_fraud)
LEFT OUTER JOIN t1
ON t2.`cc_num` = t1.`_right_by`
WHERE t1.`event_time` = (
SELECT max(t1.`event_time`) AS `Max(event_time)`
FROM t1
WHERE (t1.`_right_by` = t2.`cc_num`) AND
(t1.`event_time` <= t2.`inference_time`)
)
GROUP BY inference_time, cc_num
and I'm getting some unexpected correlated variable $cor0 in the plan
error. Any pointers as to what might be causing this? I assume the SQL is valid, but Flink is having trouble building the query plan for some reason, and I can't find information on it. I have tried significantly reducing the size of my table created from VALUES
(from ~200 rows to 2), just in case it was an issue of complexity.
If I change (VALUES (TIMESTAMP '2023-03-17 20:48:49', '6540980000000000', 0), (TIMESTAMP '2023-03-17 20:48:51', '571465000000', 0)) t2(inference_time, cc_num, is_fraud)
to spine t2
(an existing table), it works as expected...
I'll also share the stack trace in the thread: