Tilman Krokotsch
05/24/2023, 11:22 AMimport os.path
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment
JAR_PATH = "<Path to Fat Jar>"
def example():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
env.add_jars(JAR_PATH)
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(
"""
CREATE CATALOG iceberg_catalog WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'warehouse'='s3://<My Bucket>',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'client.factory'='org.apache.iceberg.aws.AssumeRoleAwsClientFactory',
'client.assume-role.region'='<My Region>',
'client.assume-role.arn'='<My Role>'
)
"""
)
source_table = t_env.from_path(
"iceberg_catalog.<My Database>.<My Table>"
)
t_env.to_data_stream(source_table).print()
env.execute()
if __name__ == "__main__":
example()
This is the error message:
File "<My venv>/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 58, in decode_from_stream
return self._value_coder.decode_from_stream(data_input_stream)
TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
Did anyone encounter something similar before? Thanks in advance.Dian Fu
05/25/2023, 1:55 AMTilman Krokotsch
05/30/2023, 7:24 AMpip
to build the packages from scratch an not use the cached versions.