Howdy folks :wave: I'm attempting to implement a ...
# troubleshooting
b
Howdy folks πŸ‘‹ I'm attempting to implement a py4j wrapper around the DynamoDB sink so I can use from PyFlink + Datastream API. I'd like to use Py4J's callback mechanism and implement the
ElementConverter
interface inside of python. I've got this wired up, and it almost works:
Copy code
from typing import List, Dict

from py4j.java_collections import ListConverter, MapConverter
from pyflink.datastream.connectors import Sink
from pyflink.java_gateway import get_gateway


class DynamoDbSink(Sink):
    """
    A Dynamo DB (DDB) Sink that performs async requests against a destination table
    using the buffering protocol.
    """

    class Java:
        implements = ['java.io.Serializable']

    def __init__(self, j_dynamodb_sink):
        super(DynamoDbSink, self).__init__(sink=j_dynamodb_sink)

    @staticmethod
    def builder() -> 'DynamoDbSinkBuilder':
        return DynamoDbSinkBuilder()


class DynamoDbSinkBuilder(object):
    """
    Builder to construct DynamoDbSink.

    """

    class Java:
        implements = ['java.io.Serializable']

    def __init__(self):
        JDynamoDbSink = get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbSink
        self._j_dynamodb_sink_builder = JDynamoDbSink.builder()

    def set_dynamodb_properties(self, properties: Dict) -> 'DynamoDbSinkBuilder':
        j_properties = get_gateway().jvm.java.util.Properties()
        for key, value in properties.items():
            j_properties.setProperty(key, value)
        self._j_dynamodb_sink_builder.setDynamoDbProperties(j_properties)
        return self

    def set_table_name(self, table_name: str) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setTableName(table_name)
        return self

    def set_element_converter(self, elementConverter) -> 'DynamoDbSinkBuilder':
        converter = get_gateway().jvm.org.apache.flink.connector.base.sink.writer.ElementConverter
        self._j_dynamodb_sink_builder.setElementConverter(elementConverter)
        return self

    def set_default_element_converter(self) -> 'DynamoDbSinkBuilder':
        elementConverter = get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter
        String = get_gateway().jvm.Integer.TYPE
        self._j_dynamodb_sink_builder.setElementConverter(elementConverter(String))
        return self

    def set_overwrite_by_partition_keys(self, keys: List[str]) -> 'DynamoDbSinkBuilder':
        j_list = ListConverter().convert(keys or [], get_gateway()._gateway_client)
        self._j_dynamodb_sink_builder.setOverwriteByPartitionKeys(j_list)
        return self

    def set_fail_on_error(self, flag: bool) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setFailOnError(flag)
        return self

    def set_max_batch_size(self, batch_size: int) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setMaxBatchSize(batch_size)
        return self

    def set_max_inflight_requests(self, inflight_requests: int) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setMaxInFlightRequests(inflight_requests)
        return self

    def set_max_buffer_size(self, buffer_size: int) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setMaxBufferedRequests(buffer_size)
        return self

    def set_max_time_in_buffer(self, time_in_buffer: int) -> 'DynamoDbSinkBuilder':
        self._j_dynamodb_sink_builder.setMaxTimeInBufferMS(time_in_buffer)
        return self

    def build(self) -> 'DynamoDbSink':
        return DynamoDbSink(self._j_dynamodb_sink_builder.build())


class DictElementConverter:
    class Java:
        implements = ['org.apache.flink.connector.base.sink.writer.ElementConverter',
                      'java.io.Serializable']

    def apply(self, element: Dict):
        j_map = MapConverter().convert(element or {}, get_gateway()._gateway_client)
        _j_dynamodb_write_request_builder = (get_gateway().jvm.org.apache
                                             .flink.connector.dynamodb
                                             .sink.DynamoDbWriteRequest.builder())
        _j_dynamodb_write_request_builder.setType(
            get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT)
        _j_dynamodb_write_request_builder.setItem(j_map)
        return _j_dynamodb_write_request_builder.build()
I'm using these classes in the following job:
Copy code
import os
from os import path
from typing import Dict

from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import (StreamExecutionEnvironment, RuntimeExecutionMode)
from pyflink.datastream.connectors.kinesis import (FlinkKinesisConsumer)

from pyflink.java_gateway import get_gateway

from framework.connectors.dynamodb import DynamoDbSinkBuilder, DynamoDbSink, DictElementConverter

LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)


def get_source(stream_name: str, config: Dict = None) -> FlinkKinesisConsumer:
    props = config or {}
    consumer_config = {
        'aws.region': 'us-east-1',
        'aws.credentials.provider.basic.accesskeyid': 'localstack_ignored',
        'aws.credentials.provider.basic.secretkey': 'localstack_ignored',
        'flink.stream.initpos': 'LATEST',
        'aws.endpoint': '<http://localhost:4566>',
        **props
    }
    return FlinkKinesisConsumer(stream_name, SimpleStringSchema(), consumer_config)


def get_sink(table_name: str, config: Dict = None) -> DynamoDbSink:
    props = config or {}
    return (DynamoDbSink.builder()
    .set_table_name(table_name)
    # .set_default_element_converter()
    .set_element_converter(DictElementConverter())
    .set_dynamodb_properties(
        {
            'aws.region': 'us-east-1',
            'aws.credentials.provider.basic.accesskeyid': 'localstack_ignored',
            'aws.credentials.provider.basic.secretkey': 'localstack_ignored',
            'aws.endpoint': '<http://localhost:8000>',
            **props
        }
    ).build())


def run():
    get_gateway()
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    env.set_parallelism(1)

    # To enable local running/debugging, we manually add the project's shadow jar that has all the connectors built in
    if LOCAL_DEBUG:
        jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
        env.add_jars(f"file:///{jar_location}")
        env.add_classpaths(f"file:///{jar_location}")

    # Kinesis source definition

    # Build a Datastream from the Kinesis source
    stream = env.add_source(get_source('input_stream'))

    # Kinesis sink definition
    sink = get_sink('PyFlinkTestTable')

    # sink the Datastream from the Kinesis source
    stream.sink_to(sink)

    env.execute("kinesis-2-dynamoDB")


if __name__ == '__main__':
    run()
a
This is probably not going to work (i.e. Implementing Element converter in python). you can either try to implement preprocessing like what kafka does, but I don't think that would work either as you would still have to instantiate the connector with an element converter and more painfully you would have to create a structure for Dynamo
AttributeMap
classes in python, so I wouldn't recommend that. Ideally what I would do since I initially intended to pick the ticket up is to wait for 4.4 release which includes FLINK-35022, where you could either instantiate a
TypeinformedElementConverter
from python or just use the default