Hi! I have encountered some weird error when using...
# troubleshooting
e
Hi! I have encountered some weird error when using flink udf in schema builder method
column_by_expression
:
Copy code
org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
Have anyone faced this problem? I will add more details in thread Thanks!
So, I have a udf function:
Copy code
def parse_bq_datetime(timestamp_str):
  format_1 = '%Y-%m-%d %H:%M:%S.%f %Z'
  format_2 = '%Y-%m-%d %H:%M:%S %Z'
  format_3 = '%Y-%m-%d %H:%M:%S.%f'
  if timestamp_str is None:
    return None
  try:
    t = datetime.strptime(timestamp_str, format_1)
  except Exception as e:
    try:
      t = datetime.strptime(timestamp_str, format_2)
    except Exception as e:
      t = datetime.strptime(timestamp_str, format_3)
  return t

parse_bq_datetime_udf = udf(parse_bq_datetime, result_type=DataTypes.TIMESTAMP())
Which I later use in schema:
Copy code
table_schema = Schema.new_builder() \
                .column('country_code', DataTypes.STRING()) \
                .column('vendor_code', DataTypes.STRING()) \
                .column('window_start', DataTypes.STRING())\
                .column_by_expression('TM', parse_bq_datetime_udf(col('window_start'))) \
                .build()
To create a table from my json:
Copy code
t_env.create_temporary_table(
    'my_table',
    TableDescriptor.for_connector('filesystem')
        .schema(table_schema)
        .option('path', table_path)
        # .option('json.ignore-parse-errors', 'true')
        .format('json')
        .build())
And when I query data from this table it results in following error:
Copy code
>>> table = t_env.from_path('my_table')
>>> table.limit(10).to_pandas()

org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
	at org.apache.flink.table.expressions.ResolvedExpression.asSerializableString(ResolvedExpression.java:51)
I already used same function in other table schema and it worked fine. If I query same string field (without parsing it in schema builder) using this function it also works:
Copy code
>>>table_without_expression.select(parse_bq_datetime_udf(col('window_start')).cast(DataTypes.TIMESTAMP(3)).alias('window_time')).limit(10).to_pandas()

window_start                    |
--------------------------------|
Timestamp('2023-03-31 23:35:00')|
.....
d
@Elizaveta Batanina Hey, the problem is a little complicated. To work around this issue, you could make the following changes to work around this issue:
Copy code
t_env.create_temporary_function('parse_bq_datetime_udf', parse_bq_datetime_udf)

table_schema = Schema.new_builder() \
                .column('country_code', DataTypes.STRING()) \
                .column('vendor_code', DataTypes.STRING()) \
                .column('window_start', DataTypes.STRING())\
                .column_by_expression('TM', 'parse_bq_datetime_udf(window_start)') \
                .build()
that’s change
.column_by_expression('TM', parse_bq_datetime_udf(col('window_start')))
to
.column_by_expression('TM', 'parse_bq_datetime_udf(window_start)')
.
Root cause of this issue: The error happens inside
table.limit(10).to_pandas()
where it need to call
table.get_schema()
where it currently only supports computed columns defined via SQL string.
👍 1
e
Thanks a lot, Dian Fu It worked!
🎉 1