Hi guys, can someone help me on this issue when wr...
# troubleshooting
a
Hi guys, can someone help me on this issue when writing to Opensearch please 😔 The script run perfectly fine on SQL session (using ./bin/start-cluster.sh and ./bin/sql-client.sh to submit SQL queries directly) Edited: downgrade flink to 1.18.1 (including docker image and pom.xml) work
Here is the full logs:
Copy code
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[2], line 38
      7 t_env.execute_sql(r"""
      8 create table if not exists kafka_orders (
      9     order_id string,
   (...)
     21 );
     22 """)
     24 t_env.execute_sql(r"""
     25 create table if not exists opensearch_orders (
     26     order_id string,
   (...)
     35 );
     36 """)
---> 38 t_env.execute_sql(r"""
     39 insert into opensearch_orders
     40 select 
     41     order_id,
     42     customer_id,
     43     order_status
     44 from kafka_orders;
     45 """).wait()

File /usr/local/lib/python3.10/dist-packages/pyflink/table/table_environment.py:837, in TableEnvironment.execute_sql(self, stmt)
    823 """
    824 Execute the given single statement, and return the execution result.
    825 
   (...)
    834 .. versionadded:: 1.11.0
    835 """
    836 self._before_execute()
--> 837 return TableResult(self._j_tenv.executeSql(stmt))

File /usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py:146, in capture_java_exception.<locals>.deco(*a, **kw)
    144 def deco(*a, **kw):
    145     try:
--> 146         return f(*a, **kw)
    147     except Py4JJavaError as e:
    148         from pyflink.java_gateway import get_gateway

File /usr/local/lib/python3.10/dist-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o67.executeSql.
: java.lang.ArrayStoreException: arraycopy: element type mismatch: can not cast one of the elements of java.lang.Object[] to the type of the destination array, org.apache.http.HttpHost
	at java.base/java.lang.System.arraycopy(Native Method)
	at java.base/java.util.Arrays.copyOf(Unknown Source)
	at java.base/java.util.ArrayList.toArray(Unknown Source)
	at org.apache.flink.connector.opensearch.table.Opensearch2DynamicSink.getSinkRuntimeProvider(Opensearch2DynamicSink.java:122)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:151)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:214)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Unknown Source)
And here is the pom.xml file:
Copy code
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="<http://maven.apache.org/POM/4.0.0>"
         xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"
         xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.flink</groupId>
    <artifactId>flink</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <properties>
        <java.version>17</java.version>
        <flink.version>1.19.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>


    <dependencies>
        <!-- Kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro-confluent-registry</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Opensearch -->
        <dependency>
            <groupId>org.opensearch</groupId>
            <artifactId>opensearch</artifactId>
            <version>2.15.0</version>
        </dependency>
        <dependency>
            <groupId>org.opensearch.client</groupId>
            <artifactId>opensearch-rest-high-level-client</artifactId>
            <version>2.15.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-opensearch2</artifactId>
            <version>2.0.0-1.19</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-opensearch2</artifactId>
            <version>2.0.0-1.19</version>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.17.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.2</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.17.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/dependency</outputDirectory>
                            <excludeGroupIds>org.apache.logging.log4j</excludeGroupIds>
                            <excludeArtifactIds>log4j*</excludeArtifactIds>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
Here is the original message: Hi guys, can someone help me on this issue when writing to Opensearch please 😔 The script run perfectly fine on SQL session (using ./bin/start-cluster.sh and ./bin/sql-client.sh to submit SQL queries directly) But when I ran on Jupyter Notebook as following:
Copy code
from pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment

settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)

t_env.execute_sql(r"""
create table if not exists kafka_orders (
    order_id string,
    customer_id string,
    order_status string
) with (
    'connector' = 'kafka',
    'topic' = 'orders.public.orders',
    'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
    'properties.group.id' = 'opensearch-orders',
    'scan.startup.mode' = 'earliest-offset',
    
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry.kafka:8081>'
);
""")

t_env.execute_sql(r"""
create table if not exists opensearch_orders (
    order_id string,
    customer_id string,
    order_status string
) with (
    'connector' = 'opensearch-2',
    'hosts' = '<http://opensearch.io:80>',
    'allow-insecure' = 'true',
    'index' = 'orders',
    'format' = 'json'
);
""")

t_env.execute_sql(r"""
insert into opensearch_orders
select 
    order_id,
    customer_id,
    order_status
from kafka_orders;
""").wait()
This error happens and I have no way to figure out why (I have provided more details in the chat thread):
Copy code
Py4JJavaError: An error occurred while calling o67.executeSql.
: java.lang.ArrayStoreException: arraycopy: element type mismatch: can not cast one of the elements of java.lang.Object[] to the type of the destination array, org.apache.http.HttpHost
	at java.base/java.lang.System.arraycopy(Native Method)
	at java.base/java.util.Arrays.copyOf(Unknown Source)
	at java.base/java.util.ArrayList.toArray(Unknown Source)
	at org.apache.flink.connector.opensearch.table.Opensearch2DynamicSink.getSinkRuntimeProvider(Opensearch2DynamicSink.java:122)
	at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:151)
	at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:214)
	at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
	at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
	at scala.collection.Iterator.foreach(Iterator.scala:937)
	at scala.collection.Iterator.foreach$(Iterator.scala:937)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
	at scala.collection.IterableLike.foreach(IterableLike.scala:70)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike.map(TraversableLike.scala:233)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
	at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Unknown Source)
d
The error you’re encountering, java.lang.ArrayStoreException: arraycopy: element type mismatch, suggests that there’s an issue with type compatibility during an array copy operation within the OpenSearch connector (Opensearch2DynamicSink). Specifically, it seems the code is attempting to cast an object to org.apache.http.HttpHost, which fails because the actual object type doesn’t match the expected HttpHost.
Since downgrading seemed to solve this that points to compatibility issue between Flink version and OpenSearch connector and you should check the release notes for Flink and OpenSearch to make sure you are using the latest and most compatible version of OpenSearch
Doublecheck the OpenSearch sink configuration in your Flink SQL DDL. Specifically, ensure that the ‘hosts’ configuration is correctly formatted. It should be a list of HttpHost objects or their string representations that can be parsed into HttpHost. It might be an interpretation issue e.g , ‘hosts’ = ’http://opensearch.io:80' might need to be adjusted if the format has changed with the new Flink version.
🚀 1