Viktor Hrtanek
06/06/2023, 2:15 PMAttributeError: 'Test' object has no attribute '_set_takes_row_as_input'
and when i add it, then the error says TypeError: 'Test' object is not callable
def method1(input):
return "#".join([input, input])
class Test(MapFunction):
def __init__(self, testValue):
self.key = None
self.testValue = testValue
# def _set_takes_row_as_input(self):
# self._takes_row_as_input = True
# return self
def map(self, data, testValue, output_type=DataTypes.ROW([DataTypes.FIELD("FIRST_NAME", DataTypes.STRING()),
DataTypes.FIELD("FIRST_NAM", DataTypes.STRING()),
DataTypes.FIELD("FIRST_NA", DataTypes.STRING())])):
return Row(
FIRST_NAME = data['FIRST_NAME'],
FIRST_NAM = data['EMAIL'],
FIRST_NA = method1(self.testValue)
)
source_table = table_env.from_path("table")
source_table.map(Test(testValue="DummyValue")).execute().print()
then i tried with udf, but no success. Error says TypeError: __call__() got an unexpected keyword argument 'dummyVariable'
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("FIELD_1", DataTypes.STRING()),
DataTypes.FIELD("FIELD_2", DataTypes.STRING()),
DataTypes.FIELD("FIELD_3", DataTypes.STRING())]))
def func1(data: Row, dummyVariable: str) -> Row:
return Row(
FIELD_1 = data['FIRST_NAME'],
FIELD_2 = data['EMAIL'],
FIELD_3 = method1(dummyVariable)
)
source_table = table_env.from_path("table")
source_table.map(func1(dummyVariable = "dummyValue")).execute().print()