Elizaveta Batanina
04/06/2023, 3:34 PMcolumn_by_expression
:
org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
Have anyone faced this problem? I will add more details in thread
Thanks!Elizaveta Batanina
04/06/2023, 3:41 PMdef parse_bq_datetime(timestamp_str):
format_1 = '%Y-%m-%d %H:%M:%S.%f %Z'
format_2 = '%Y-%m-%d %H:%M:%S %Z'
format_3 = '%Y-%m-%d %H:%M:%S.%f'
if timestamp_str is None:
return None
try:
t = datetime.strptime(timestamp_str, format_1)
except Exception as e:
try:
t = datetime.strptime(timestamp_str, format_2)
except Exception as e:
t = datetime.strptime(timestamp_str, format_3)
return t
parse_bq_datetime_udf = udf(parse_bq_datetime, result_type=DataTypes.TIMESTAMP())
Which I later use in schema:
table_schema = Schema.new_builder() \
.column('country_code', DataTypes.STRING()) \
.column('vendor_code', DataTypes.STRING()) \
.column('window_start', DataTypes.STRING())\
.column_by_expression('TM', parse_bq_datetime_udf(col('window_start'))) \
.build()
To create a table from my json:
t_env.create_temporary_table(
'my_table',
TableDescriptor.for_connector('filesystem')
.schema(table_schema)
.option('path', table_path)
# .option('json.ignore-parse-errors', 'true')
.format('json')
.build())
And when I query data from this table it results in following error:
>>> table = t_env.from_path('my_table')
>>> table.limit(10).to_pandas()
org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
I already used same function in other table schema and it worked fine. If I query same string field (without parsing it in schema builder) using this function it also works:
>>>table_without_expression.select(parse_bq_datetime_udf(col('window_start')).cast(DataTypes.TIMESTAMP(3)).alias('window_time')).limit(10).to_pandas()
window_start |
--------------------------------|
Timestamp('2023-03-31 23:35:00')|
.....
Dian Fu
04/07/2023, 5:05 AMt_env.create_temporary_function('parse_bq_datetime_udf', parse_bq_datetime_udf)
table_schema = Schema.new_builder() \
.column('country_code', DataTypes.STRING()) \
.column('vendor_code', DataTypes.STRING()) \
.column('window_start', DataTypes.STRING())\
.column_by_expression('TM', 'parse_bq_datetime_udf(window_start)') \
.build()
that’s change .column_by_expression('TM', parse_bq_datetime_udf(col('window_start')))
to .column_by_expression('TM', 'parse_bq_datetime_udf(window_start)')
.Dian Fu
04/07/2023, 5:09 AMtable.limit(10).to_pandas()
where it need to call table.get_schema()
where it currently only supports computed columns defined via SQL string.Dian Fu
04/07/2023, 5:15 AMElizaveta Batanina
04/11/2023, 8:46 AM