Brad Murry
08/13/2024, 10:07 PMElementConverter
interface inside of python.
I've got this wired up, and it almost works:
from typing import List, Dict
from py4j.java_collections import ListConverter, MapConverter
from pyflink.datastream.connectors import Sink
from pyflink.java_gateway import get_gateway
class DynamoDbSink(Sink):
"""
A Dynamo DB (DDB) Sink that performs async requests against a destination table
using the buffering protocol.
"""
class Java:
implements = ['java.io.Serializable']
def __init__(self, j_dynamodb_sink):
super(DynamoDbSink, self).__init__(sink=j_dynamodb_sink)
@staticmethod
def builder() -> 'DynamoDbSinkBuilder':
return DynamoDbSinkBuilder()
class DynamoDbSinkBuilder(object):
"""
Builder to construct DynamoDbSink.
"""
class Java:
implements = ['java.io.Serializable']
def __init__(self):
JDynamoDbSink = get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbSink
self._j_dynamodb_sink_builder = JDynamoDbSink.builder()
def set_dynamodb_properties(self, properties: Dict) -> 'DynamoDbSinkBuilder':
j_properties = get_gateway().jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
self._j_dynamodb_sink_builder.setDynamoDbProperties(j_properties)
return self
def set_table_name(self, table_name: str) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setTableName(table_name)
return self
def set_element_converter(self, elementConverter) -> 'DynamoDbSinkBuilder':
converter = get_gateway().jvm.org.apache.flink.connector.base.sink.writer.ElementConverter
self._j_dynamodb_sink_builder.setElementConverter(elementConverter)
return self
def set_default_element_converter(self) -> 'DynamoDbSinkBuilder':
elementConverter = get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter
String = get_gateway().jvm.Integer.TYPE
self._j_dynamodb_sink_builder.setElementConverter(elementConverter(String))
return self
def set_overwrite_by_partition_keys(self, keys: List[str]) -> 'DynamoDbSinkBuilder':
j_list = ListConverter().convert(keys or [], get_gateway()._gateway_client)
self._j_dynamodb_sink_builder.setOverwriteByPartitionKeys(j_list)
return self
def set_fail_on_error(self, flag: bool) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setFailOnError(flag)
return self
def set_max_batch_size(self, batch_size: int) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setMaxBatchSize(batch_size)
return self
def set_max_inflight_requests(self, inflight_requests: int) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setMaxInFlightRequests(inflight_requests)
return self
def set_max_buffer_size(self, buffer_size: int) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setMaxBufferedRequests(buffer_size)
return self
def set_max_time_in_buffer(self, time_in_buffer: int) -> 'DynamoDbSinkBuilder':
self._j_dynamodb_sink_builder.setMaxTimeInBufferMS(time_in_buffer)
return self
def build(self) -> 'DynamoDbSink':
return DynamoDbSink(self._j_dynamodb_sink_builder.build())
class DictElementConverter:
class Java:
implements = ['org.apache.flink.connector.base.sink.writer.ElementConverter',
'java.io.Serializable']
def apply(self, element: Dict):
j_map = MapConverter().convert(element or {}, get_gateway()._gateway_client)
_j_dynamodb_write_request_builder = (get_gateway().jvm.org.apache
.flink.connector.dynamodb
.sink.DynamoDbWriteRequest.builder())
_j_dynamodb_write_request_builder.setType(
get_gateway().jvm.org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.PUT)
_j_dynamodb_write_request_builder.setItem(j_map)
return _j_dynamodb_write_request_builder.build()
Brad Murry
08/13/2024, 10:08 PMimport os
from os import path
from typing import Dict
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import (StreamExecutionEnvironment, RuntimeExecutionMode)
from pyflink.datastream.connectors.kinesis import (FlinkKinesisConsumer)
from pyflink.java_gateway import get_gateway
from framework.connectors.dynamodb import DynamoDbSinkBuilder, DynamoDbSink, DictElementConverter
LOCAL_DEBUG = os.getenv('LOCAL_DEBUG', False)
def get_source(stream_name: str, config: Dict = None) -> FlinkKinesisConsumer:
props = config or {}
consumer_config = {
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'localstack_ignored',
'aws.credentials.provider.basic.secretkey': 'localstack_ignored',
'flink.stream.initpos': 'LATEST',
'aws.endpoint': '<http://localhost:4566>',
**props
}
return FlinkKinesisConsumer(stream_name, SimpleStringSchema(), consumer_config)
def get_sink(table_name: str, config: Dict = None) -> DynamoDbSink:
props = config or {}
return (DynamoDbSink.builder()
.set_table_name(table_name)
# .set_default_element_converter()
.set_element_converter(DictElementConverter())
.set_dynamodb_properties(
{
'aws.region': 'us-east-1',
'aws.credentials.provider.basic.accesskeyid': 'localstack_ignored',
'aws.credentials.provider.basic.secretkey': 'localstack_ignored',
'aws.endpoint': '<http://localhost:8000>',
**props
}
).build())
def run():
get_gateway()
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
# To enable local running/debugging, we manually add the project's shadow jar that has all the connectors built in
if LOCAL_DEBUG:
jar_location = str(path.join(path.dirname(path.abspath(__file__)), "../lib/bin/pyflink-services-1.0.jar"))
env.add_jars(f"file:///{jar_location}")
env.add_classpaths(f"file:///{jar_location}")
# Kinesis source definition
# Build a Datastream from the Kinesis source
stream = env.add_source(get_source('input_stream'))
# Kinesis sink definition
sink = get_sink('PyFlinkTestTable')
# sink the Datastream from the Kinesis source
stream.sink_to(sink)
env.execute("kinesis-2-dynamoDB")
if __name__ == '__main__':
run()
Ahmed Hamdy
08/14/2024, 10:06 AMAttributeMap
classes in python, so I wouldn't recommend that.
Ideally what I would do since I initially intended to pick the ticket up is to wait for 4.4 release which includes FLINK-35022, where you could either instantiate a TypeinformedElementConverter
from python or just use the default