עידן קונלי
10/17/2023, 9:28 AMfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema
from pyflink.table.udf import udf
from pyflink.table.expressions import col
import pymongo
client = pymongo.MongoClient("<mongodb://localhost:27017/>")
db = client["local"]
collection = db["example_collection"]
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1) # Adjust the desired parallelism
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
class MongoConnector:
def __init__(self, host, port, database, collection):
self.host = host
self.port = port
self.database = database
self.collection = collection
def query(self, query):
result = self.collection.find(query)
return result
# Initialize the connector
mongo_connector = MongoConnector("localhost", 27017, db, collection)
print(mongo_connector.port)
# Define a UDF to query MongoDB
@udf(result_type=DataTypes.STRING(), func_type="pandas")
def query_mongodb_1(query):
print(mongo_connector.port)
return pd.DataFrame.from_records(mongo_connector.query(query))
t_env.create_temporary_system_function("query_mongodb", query_mongodb_1)
# Example query
t_env.sql_query("""
SELECT * FROM TABLE(query_mongodb('{"Test":1})')
""")
i am getting the following error
TypeError Traceback (most recent call last)
Cell In[44], line 39
35 print(mongo_connector.port)
36 return pd.DataFrame.from_records(mongo_connector.query(query))
---> 39 t_env.create_temporary_system_function("query_mongodb", query_mongodb_1)
41 # Example query
42 t_env.sql_query("""
43 SELECT * FROM TABLE(query_mongodb('{"Test":1})')
44 """)
TypeError: cannot pickle '_thread.lock' object
can anyone help me?