Hey channel, I ran into a problem with converting ...
# troubleshooting
t
Hey channel, I ran into a problem with converting an Iceberg table source to a datastream. I get an error message that looks like a bug in PyFlink, but I want to make sure that I did everything correct before I open a ticket. Reading the table with the Table API works without issue. Environment: • Flink 1.17 • Python 3.9 • Java 11 • macOS ARM64 Minimal example to reproduce:
Copy code
import os.path

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment

JAR_PATH = "<Path to Fat Jar>"


def example():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    env.add_jars(JAR_PATH)
    t_env = StreamTableEnvironment.create(env)

    t_env.execute_sql(
        """
            CREATE CATALOG iceberg_catalog WITH (
                'type'='iceberg',
                'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
                'warehouse'='s3://<My Bucket>',
                'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
                'client.factory'='org.apache.iceberg.aws.AssumeRoleAwsClientFactory',
                'client.assume-role.region'='<My Region>',
                'client.assume-role.arn'='<My Role>'
            )
        """
    )
    source_table = t_env.from_path(
        "iceberg_catalog.<My Database>.<My Table>"
    )
    t_env.to_data_stream(source_table).print()
    env.execute()


if __name__ == "__main__":
    example()
This is the error message:
Copy code
File "<My venv>/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 58, in decode_from_stream
    return self._value_coder.decode_from_stream(data_input_stream)
TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
Did anyone encounter something similar before? Thanks in advance.
d
@Tilman Krokotsch Hey, you are right that this is a known issue on Mac M1 machine: https://issues.apache.org/jira/browse/FLINK-28786 and will be fixed in 1.16.2 and 1.17.1. Both of these two versions are currently under voting phase and will be published in the next few days.
t
@Dian Fu thanks for the reply. I found your comments in an older thread and was able to apply the workaround. The important part was getting
pip
to build the packages from scratch an not use the cached versions.