Hi, I started to use flink with pyflink, i am tryi...
# random
u
Hi, I started to use flink with pyflink, i am trying to connect to mongo and define udf with the following code
Copy code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema
from pyflink.table.udf import udf
from pyflink.table.expressions import col
import pymongo

client = pymongo.MongoClient("<mongodb://localhost:27017/>")
db = client["local"]
collection = db["example_collection"]
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # Adjust the desired parallelism

settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)

class MongoConnector:
       def __init__(self, host, port, database, collection):
           self.host = host
           self.port = port
           self.database = database
           self.collection = collection

       def query(self, query):
           result = self.collection.find(query)
           return result

# Initialize the connector
mongo_connector = MongoConnector("localhost", 27017, db, collection)
print(mongo_connector.port)
# Define a UDF to query MongoDB
@udf(result_type=DataTypes.STRING(), func_type="pandas")
def query_mongodb_1(query):
    print(mongo_connector.port)
    return pd.DataFrame.from_records(mongo_connector.query(query))


t_env.create_temporary_system_function("query_mongodb", query_mongodb_1)

# Example query
t_env.sql_query("""
   SELECT * FROM TABLE(query_mongodb('{"Test":1})')
""")
i am getting the following error
Copy code
TypeError                                 Traceback (most recent call last)
Cell In[44], line 39
     35     print(mongo_connector.port)
     36     return pd.DataFrame.from_records(mongo_connector.query(query))
---> 39 t_env.create_temporary_system_function("query_mongodb", query_mongodb_1)
     41 # Example query
     42 t_env.sql_query("""
     43    SELECT * FROM TABLE(query_mongodb('{"Test":1})')
     44 """)
TypeError: cannot pickle '_thread.lock' object
can anyone help me?