Anh Tan Le Dinh
08/02/2024, 10:17 AMAnh Tan Le Dinh
08/02/2024, 10:18 AM---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[2], line 38
7 t_env.execute_sql(r"""
8 create table if not exists kafka_orders (
9 order_id string,
(...)
21 );
22 """)
24 t_env.execute_sql(r"""
25 create table if not exists opensearch_orders (
26 order_id string,
(...)
35 );
36 """)
---> 38 t_env.execute_sql(r"""
39 insert into opensearch_orders
40 select
41 order_id,
42 customer_id,
43 order_status
44 from kafka_orders;
45 """).wait()
File /usr/local/lib/python3.10/dist-packages/pyflink/table/table_environment.py:837, in TableEnvironment.execute_sql(self, stmt)
823 """
824 Execute the given single statement, and return the execution result.
825
(...)
834 .. versionadded:: 1.11.0
835 """
836 self._before_execute()
--> 837 return TableResult(self._j_tenv.executeSql(stmt))
File /usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py:146, in capture_java_exception.<locals>.deco(*a, **kw)
144 def deco(*a, **kw):
145 try:
--> 146 return f(*a, **kw)
147 except Py4JJavaError as e:
148 from pyflink.java_gateway import get_gateway
File /usr/local/lib/python3.10/dist-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o67.executeSql.
: java.lang.ArrayStoreException: arraycopy: element type mismatch: can not cast one of the elements of java.lang.Object[] to the type of the destination array, org.apache.http.HttpHost
at java.base/java.lang.System.arraycopy(Native Method)
at java.base/java.util.Arrays.copyOf(Unknown Source)
at java.base/java.util.ArrayList.toArray(Unknown Source)
at org.apache.flink.connector.opensearch.table.Opensearch2DynamicSink.getSinkRuntimeProvider(Opensearch2DynamicSink.java:122)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:151)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:214)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Anh Tan Le Dinh
08/02/2024, 10:18 AM<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="<http://maven.apache.org/POM/4.0.0>"
xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion>
<groupId>com.flink</groupId>
<artifactId>flink</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<java.version>17</java.version>
<flink.version>1.19.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Opensearch -->
<dependency>
<groupId>org.opensearch</groupId>
<artifactId>opensearch</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.opensearch.client</groupId>
<artifactId>opensearch-rest-high-level-client</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-opensearch2</artifactId>
<version>2.0.0-1.19</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-opensearch2</artifactId>
<version>2.0.0-1.19</version>
</dependency>
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.17.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/dependency</outputDirectory>
<excludeGroupIds>org.apache.logging.log4j</excludeGroupIds>
<excludeArtifactIds>log4j*</excludeArtifactIds>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Anh Tan Le Dinh
08/02/2024, 10:29 AMfrom pyflink.table import expressions as fl
from pyflink.table import EnvironmentSettings, TableEnvironment
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)
t_env.execute_sql(r"""
create table if not exists kafka_orders (
order_id string,
customer_id string,
order_status string
) with (
'connector' = 'kafka',
'topic' = 'orders.public.orders',
'properties.bootstrap.servers' = 'cluster-kafka-brokers.kafka:9092',
'properties.group.id' = 'opensearch-orders',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry.kafka:8081>'
);
""")
t_env.execute_sql(r"""
create table if not exists opensearch_orders (
order_id string,
customer_id string,
order_status string
) with (
'connector' = 'opensearch-2',
'hosts' = '<http://opensearch.io:80>',
'allow-insecure' = 'true',
'index' = 'orders',
'format' = 'json'
);
""")
t_env.execute_sql(r"""
insert into opensearch_orders
select
order_id,
customer_id,
order_status
from kafka_orders;
""").wait()
This error happens and I have no way to figure out why (I have provided more details in the chat thread):
Py4JJavaError: An error occurred while calling o67.executeSql.
: java.lang.ArrayStoreException: arraycopy: element type mismatch: can not cast one of the elements of java.lang.Object[] to the type of the destination array, org.apache.http.HttpHost
at java.base/java.lang.System.arraycopy(Native Method)
at java.base/java.util.Arrays.copyOf(Unknown Source)
at java.base/java.util.ArrayList.toArray(Unknown Source)
at org.apache.flink.connector.opensearch.table.Opensearch2DynamicSink.getSinkRuntimeProvider(Opensearch2DynamicSink.java:122)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:151)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:214)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1296)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:874)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1112)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
D. Draco O'Brien
08/03/2024, 5:47 AMD. Draco O'Brien
08/03/2024, 5:49 AMD. Draco O'Brien
08/03/2024, 5:52 AM