饶俊
12/24/2022, 3:22 PMSudhan Madhavan
12/25/2022, 7:18 AMAshutosh Joshi
12/26/2022, 11:19 AMROW<`topic` STRING, `partition` INT, `offset` BIGINT, `kafka_timestamp` TIMESTAMP(3), `source_name` STRING, `source_id` INT, `event_type` STRING, `version` INT, `source_timestamp` BIGINT, `event_timestamp` TIMESTAMP(3), `payload` ARRAY<MAP<STRING, STRING>>> NOT NULL(org.apache.flink.types.Row, org.apache.flink.table.runtime.typeutils.ExternalSerializer)
Can anyone please help ?饶俊
12/24/2022, 3:19 PMNithin Kumar Vokkarla
12/26/2022, 2:43 PM2022-12-26 14:32:49,188 i.j.o.p.e.ReconciliationDispatcher [DEBUG][flink-operator/<application-name>] Reconciling resource <application-name> with version: 154257956 with execution scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}
2022-12-26 14:32:49,188 i.j.o.p.e.ReconciliationDispatcher [DEBUG][flink-operator/<application-name>] Handling dispatch for resource <application-name>
2022-12-26 14:32:49,188 i.j.o.p.e.EventProcessor [DEBUG][flink-operator/<application-name>] Executing events for custom resource. Scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}
Show context
2022-12-26 14:32:49,187 i.j.o.p.e.s.i.ManagedInformerEventSource [DEBUG][flink-operator/<application-name>] Resource not found in temporal cache reading it from informer cache, for Resource ID: ResourceID{name='<application-name>', namespace='flink-operator'}
2022-12-26 14:32:49,187 i.j.o.p.e.EventProcessor [DEBUG][flink-operator/<application-name>] Marking event received for: ResourceID{name='<application-name>', namespace='flink-operator'}
2022-12-26 14:32:49,187 i.j.o.p.e.EventProcessor [DEBUG] Received event: Event{relatedCustomResource=ResourceID{name='<application-name>', namespace='flink-operator'}}
2022-12-26 14:32:49,187 i.j.o.p.e.s.t.TimerEventSource [DEBUG] Producing event for custom resource id: ResourceID{name='<application-name>', namespace='flink-operator'}
2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor [DEBUG][flink-operator/<application-name>] ReScheduling event for resource: ResourceID{name='<application-name>', namespace='flink-operator'} with delay: 15000
2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor [DEBUG][flink-operator/<application-name>] Cleanup for successful execution for resource: <application-name>
2022-12-26 14:32:34,187 i.j.o.p.e.EventProcessor [DEBUG][flink-operator/<application-name>] Event processing finished. Scope: ExecutionScope{ resource id: ResourceID{name='<application-name>', namespace='flink-operator'}, version: 154257956}, PostExecutionControl: PostExecutionControl{onlyFinalizerHandled=false, updatedCustomResource=null, runtimeException=null}
2022-12-26 14:32:34,187 o.a.f.k.o.u.StatusRecorder [DEBUG][flink-operator/<application-name>] No status change.
2022-12-26 14:32:34,187 o.a.f.k.o.c.FlinkDeploymentController [INFO ][flink-operator/<application-name>] End of reconciliation
2022-12-26 14:32:34,187 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][flink-operator/<application-name>] Resource fully reconciled, nothing to do...
2022-12-26 14:32:34,187 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [WARN ][flink-operator/<application-name>] Could not recover lost deployment without HA enabled
2022-12-26 14:32:34,186 o.a.f.k.o.u.StatusRecorder [DEBUG][flink-operator/<application-name>] No status change.
2022-12-26 14:32:34,186 o.a.f.k.o.o.d.ApplicationObserver [INFO ][flink-operator/<application-name>] Observing JobManager deployment. Previous status: MISSING
2022-12-26 14:32:34,186 o.a.f.k.o.c.FlinkDeploymentController [INFO ][flink-operator/<application-name>] Starting reconciliation
Slackbot
12/27/2022, 4:07 AMShira Bodenstein
12/27/2022, 6:34 AMfinal ActionRequestFailureHandler failureHandler = (action, failure, restStatusCode, indexer) -> {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
log.warn("Got malformed document , action {}", action);
// malformed document; simply drop elasticsearchSinkFunction without failing sink
} else if (failure instanceof IOException && failure.getCause() instanceof NullPointerException && failure.getMessage().contains("Unable to parse response body")) {
//issue with ES 7 and opensearch - that does not send type - while response is waiting for it
//at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127) -- this.type = Objects.requireNonNull(type);
log.debug("known issue format the response for ES 7.5.1 and DB OS (opensearch) :{}", failure.getMessage());
} else {
// for all other failures, log and don't fail the sink
log.error("Got error while trying to perform ES action {}", action, failure);
}
};
final ElasticsearchSink.Builder<T> builder = new ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
In the new version the class ActionRequestFailureHandler is deprecated and after investigation I can't find any way to handle failures.
For all failures the sink fails.
Is there anything I didn't see?
Thanks is advance!Suriya Krishna Mariappan
12/27/2022, 3:49 PMCaused by: java.lang.ClassNotFoundException: org.apache.beam.runners.core.metrics.ShortIdMap
But the dependency for this is packaged as part of the job fat JAR. And yet we get this error saying class was not found.
• So, is flink not able to find the classes from the fat JAR?
• Should we put the dependency in a path where flink can find this? If so , how to make sure some other dependency is not missed like this?
this is the full stack trace. Any help on this would be greatly appreciated.
2022-12-27 15:16:59,450 WARN org.apache.flink.runtime.accumulators.AccumulatorRegistry [] - Failed to serialize accumulators for task.
java.lang.NoClassDefFoundError: org/apache/beam/runners/core/metrics/ShortIdMap
at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
at java.lang.Class.getDeclaredMethod(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.getPrivateMethod(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$2.run(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$2.run(Unknown Source) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at java.io.ObjectStreamClass.<init>(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$Caches$1.computeValue(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass$Caches$1.computeValue(Unknown Source) ~[?:?]
at java.io.ClassCache$1.computeValue(Unknown Source) ~[?:?]
at java.io.ClassCache$1.computeValue(Unknown Source) ~[?:?]
at java.lang.ClassValue.getFromHashMap(Unknown Source) ~[?:?]
at java.lang.ClassValue.getFromBackup(Unknown Source) ~[?:?]
at java.lang.ClassValue.get(Unknown Source) ~[?:?]
at java.io.ClassCache.get(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.lookup(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:?]
at java.util.concurrent.ConcurrentHashMap.writeObject(Unknown Source) ~[?:?]
at jdk.internal.reflect.GeneratedMethodAccessor93.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeWriteObject(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeSerialData(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeObject0(Unknown Source) ~[?:?]
at java.io.ObjectOutputStream.writeObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:51) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:54) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.unregisterTaskAndNotifyFinalState(TaskExecutor.java:1883) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$2600(TaskExecutor.java:181) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$TaskManagerActionsImpl.lambda$updateTaskExecutionState$1(TaskExecutor.java:2289) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.6.jar:1.13.6]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.6.jar:1.13.6]
Caused by: java.lang.ClassNotFoundException: org.apache.beam.runners.core.metrics.ShortIdMap
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 62 more
@ArunLiubov
12/27/2022, 5:09 PMMehul Batra
12/28/2022, 11:10 AMAri Huttunen
12/28/2022, 1:34 PMtable_env.execute_sql("""
CREATE TABLE print_aggregated_data (
EVENT_DAY STRING,
EVENT_HOUR STRING,
..other fields..
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("""
INSERT INTO print_aggregated_data
SELECT
*
FROM aggregated_data
LIMIT 10
""").wait(10000) # 10 seconds = 10000 milliseconds
If I copy&paste all the code to the ptptython console, it displays 10 lines of selected data, and in 10 seconds it fails due to java.util.concurrent.TimeoutException
.
This works, but feels a bit awkward, so maybe I'm missing something. I'd like a bit more interactive way of seeing the data that it is gathering.Ashutosh Joshi
12/28/2022, 3:34 PMDataStream<Row> outStream = stream.map(new ParsePayload(functionMap, typeInformation))
.uid("ParseNestedColumn");
private static class ParsePayload extends RichMapFunction<Row, Row> implements Serializable
{
@Override
public Row map(Row row) throws Exception {
<business logic>
…….
return resultRow;
}
}
The issue is that, I can return type information of a row only after evaluating map function or by creating output row because of fields are not fixed in nested column. I have tried both Types.ROW_NAMED() and ResultTypeQueryable interface, but both checks type information before evaluating map function and this way I can’t supply type information to the stream.
Can anyone please help by giving solution through some example ?
P.S - I do not want to enable Generic Types for my job.Gerald Schmidt
12/28/2022, 4:24 PMkafka-clients-2.6.0.jar
, flink-sql-connector-kafka-1.16.0.jar
and aws-msk-iam-auth-1.1.5-all.jar
in /opt/flink/lib.
I have also set up and tested IAM access for the service account.
The creation of table mytable
from a topic using sql-client.sh
succeeds, but then SELECT * FROM mytable;
fails after ~2m with:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
Am I missing client.properties
? I'd like to specify:
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
If that's a sensible next step, where on my jobmanager pod should the file go?Jonathan Weaver
12/29/2022, 1:22 AMjava.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper.<init>(com.codahale.metrics.Histogram)' the class loader org.apache.flink.util.ChildFirstClassLoader @61697817 of the current class, com/maxar/di/fids/catalog/fids_sink/FidsSinkWriter, and the class loader 'app' for the method's defining class, org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper, have different Class objects for the type com/codahale/metrics/Histogram used in the signature (com.maxar.di.fids.catalog.fids_sink.FidsSinkWriter is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @61697817, parent loader 'app'; org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper is in unnamed module of loader 'app')
Which forces me to set the dropwizard metrics to provided
on the POM, but that prevents me from running the jar, say with the sql-client locally without the hadoop dependencies.. Anyone have any pointers on resolving this conflict?Doğaç Eldenk
12/29/2022, 5:57 PMgRPC
endpoints. We need to be able to write thousands of requests per second. I am trying to use streaming rpc
to forward data to the service but I have no luck. (Using Kotlin)
1. When I use RichSinkFunction
the endpoints timeout unless I spin up a co-routine that ends each stream after 5-10 seconds.
2. When I use ProcessingWindow
functions, the issue is a processing window processes thousands of requests at once and I am not sure how to retry if some requests fail without retrying the whole window (which can create an absurd amount of spike on service and thus making it fail again).
3. Flow
in kotlin is tricky to work, because rpc streams are bidirectional, responses come async and very hard to capture. Any suggestions on how I can use bidirectional streams to sink data with at least once
guarantee.Lee xu
12/30/2022, 3:33 AMLucas Alcântara Freire
12/30/2022, 10:23 AMJirawech Siwawut
12/30/2022, 10:40 AMEric Laguer
12/30/2022, 4:03 PMEric Laguer
12/30/2022, 7:17 PMflink 1 0 74 19:21 ? 00:00:13 /opt/java/openjdk/bin/java -XX:+UseG1GC -Xmx697932173 -Xms697932173 -XX:MaxDirectMemorySize=300647712 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/flink--kubernetes-taskmanager-0-basic-session-deployment-only-example-taskmanager-1-1.log -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml -classpath /opt/flink/lib/flink-cep-1.15.3.jar:/opt/flink/lib/flink-connector-files-1.15.3.jar:/opt/flink/lib/flink-csv-1.15.3.jar:/opt/flink/lib/flink-json-1.15.3.jar:/opt/flink/lib/flink-scala_2.12-1.15.3.jar:/opt/flink/lib/flink-shaded-zookeeper-3.5.9.jar:/opt/flink/lib/flink-table-api-java-uber-1.15.3.jar:/opt/flink/lib/flink-table-planner-loader-1.15.3.jar:/opt/flink/lib/flink-table-runtime-1.15.3.jar:/opt/flink/lib/log4j-1.2-api-2.17.1.jar:/opt/flink/lib/log4j-api-2.17.1.jar:/opt/flink/lib/log4j-core-2.17.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.17.1.jar:/opt/flink/lib/flink-dist-1.15.3.jar::: org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner --configDir /opt/flink/conf -Djobmanager.memory.jvm-overhead.min=214748368b -Dtaskmanager.resource-id=basic-session-deployment-only-example-taskmanager-1-1 -Djobmanager.memory.off-heap.size=134217728b -Dweb.tmpdir=/tmp/flink-web-98d0f80f-0660-47b6-ba8a-466e9bde4723 -Djobmanager.memory.jvm-metaspace.size=268435456b -Djobmanager.memory.heap.size=1530082096b -Djobmanager.memory.jvm-overhead.max=214748368b -D taskmanager.memory.network.min=166429984b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=214748368b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=166429984b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=665719939b -D taskmanager.memory.task.heap.size=563714445b -D taskmanager.numberOfTaskSlots=2 -D taskmanager.memory.jvm-overhead.max=214748368b
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
.......
logConfiguration:
"logback-console.xml": |
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<appender name="rolling" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.file}</file>
<append>false</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${log.file}.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>10</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>
<!-- This affects logging for both user code and Flink -->
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="rolling"/>
</root>
<!-- Uncomment this if you want to only change Flink's logging -->
<!--<logger name="org.apache.flink" level="INFO"/>-->
<!-- The following lines keep the log level of common libraries/connectors on
log level INFO. The root logger does not override this. You have to manually
change the log levels here. -->
<logger name="akka" level="INFO"/>
<logger name="org.apache.kafka" level="INFO"/>
<logger name="org.apache.hadoop" level="INFO"/>
<logger name="org.apache.zookeeper" level="INFO"/>
<!-- Suppress the irrelevant (wrong) warnings from the Netty channel handler -->
<logger name="org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR"/>
</configuration>
Jonathan Weaver
12/30/2022, 8:07 PMLEFT OUTER JOIN sos FOR SYSTEM_TIME AS OF cdc.proc_time soss ON ios.demand_identifier IS NOT NULL AND ios.order_routing LIKE 'StandingOrder' AND soss.ofs_identifier = ios.demand_identifier
No matter what value order_routing
is set too it still is performing the lookup申凯(lordk)
12/31/2022, 12:23 AMTableResult result = tEnv.executeSql("select * from " +
"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
"PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
") as row_num from " +
"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +
") where row_num <= 1");
result.print();
but no result print out to the console.
when I try :
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
result.print();
I can see the result print out to the console after each time the checkpoint completed. I don’t what mistake I’ve made.Rafael Jeon
12/31/2022, 12:06 PMRafael Jeon
12/31/2022, 12:08 PMGaurav Miglani
01/02/2023, 4:10 AMnumRestarts
metrics, but I'm unable to find out one in jm url, am i missing some configuration to enable it, previously on flink 1.13 it was present 🤔Ari Huttunen
01/02/2023, 11:06 AM./sql-client.sh
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /home/user/.flink-sql-history
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL> SET 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL> SELECT
> name,
> COUNT(*) AS cnt
> FROM
> (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
> GROUP BY name;
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink_1.16_rc1/lib/flink-dist-1.16.0.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: Connection refused
Flink SQL>
Sami Badawi
01/02/2023, 6:24 PMdef setup_catalog(t_env: TableEnvironment) -> None:
jar_filenames = ['flink-connector-jdbc-1.16.0.jar', 'postgresql-9.4.1212.jar']
jar_path = make_jar_paths(jar_filenames)
print("JAR PATH: ", jar_path)
t_env.get_config().set("pipeline.jars", jar_path)
name = "public"
default_database = "postgres"
username = "postgres"
password = "postgres"
base_url = "localhost"
print(f"{name}, {default_database}, {username}, {password}, {base_url}")
catalog = JdbcCatalog(name, default_database, username, password, base_url)
t_env.register_catalog("my_catalog", catalog)
t_env.use_catalog("my_catalog")
generate this error:
python pystreaming/event_files_jdbc.py
Running event_files_jdbc.py
JAR PATH: file:///Users/sami/code/pystreaming/jars/flink-connector-jdbc-1.16.0.jar;file:///Users/sami/code/pystreaming/jars/postgresql-9.4.1212.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker (file:/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/opt/flink-python-1.16.0.jar) to method java.net.URLClassLoader.addURL(java.net.URL)
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
public, postgres, postgres, postgres, localhost
Traceback (most recent call last):
File "/Users/sami/code/pystreaming/pystreaming/event_files_jdbc.py", line 96, in <module>
setup_catalog(t_env)
File "/Users/sami/code/pystreaming/pystreaming/event_files_jdbc.py", line 37, in setup_catalog
catalog = JdbcCatalog(name, default_database, username, password, base_url)
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/table/catalog.py", line 1192, in __init__
j_jdbc_catalog = gateway.jvm.org.apache.flink.connector.jdbc.catalog.JdbcCatalog(
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/java_gateway.py", line 1585, in __call__
return_value = get_return_value(
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 146, in deco
return f(*a, **kw)
File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.flink.connector.jdbc.catalog.JdbcCatalog.
: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.flink.connector.jdbc.catalog.JdbcCatalogUtils.validateJdbcUrl(JdbcCatalogUtils.java:37)
at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.<init>(AbstractJdbcCatalog.java:110)
at org.apache.flink.connector.jdbc.catalog.JdbcCatalog.<init>(JdbcCatalog.java:76)
at org.apache.flink.connector.jdbc.catalog.JdbcCatalog.<init>(JdbcCatalog.java:50)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:238)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Lauri Suurväli
01/02/2023, 6:49 PM申凯(lordk)
01/03/2023, 1:11 AMTableResult result = tEnv.executeSql("select * from " +
"(select ddate, userMAC, bssid, dtime, userName, apName, action, ssid, rawValue, ROW_NUMBER() OVER (" +
"PARTITION BY window_start, window_end, userName ORDER BY eventTime DESC" +
") as row_num from " +
"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))" +
") where row_num <= 1");
result.print();
but no result print out to the console.
when I try :
TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime), INTERVAL '10' SECONDS))");
result.print();
I can see the result print out to the console after each time the checkpoint completed. I don’t what mistake I’ve made.申凯(lordk)
01/03/2023, 9:09 AM