huang hills
03/02/2023, 7:31 AMMitchell Jeppson
03/02/2023, 7:36 AMtableEnv.createTemporaryView(tableName, stream)
, I am then joining the two views by querying them, something like this:
val resultTable = streamTableEnvironment.sqlQuery(myQuery)
streamTableEnvironment.toChangelogStream(resultTable)
The issue I am seeing with this is I am getting is that the entire view is republished each time a new message is consumed and thus added to the table. I assume this is the expected behaviour, but is there anyway to configure only publishing the new row in the case of an insert or the updated row in the case of an update?
I can provide more detailed code examples if that would be helpful gratitude thank youAmresh Venugopal
03/02/2023, 8:19 AM❯ tree plugins
plugins
├── README.txt
└── s3-fs-presto
└── flink-s3-fs-presto-1.16.1.jar
I'll add my flink-conf.yaml in the thread.
When I run:
flink run some.jar
I see the following exception in the flink-amresh.venugopal-client-ip-172-31-50-66.log
. The .jar
I am using works if I don't use s3 for checkpointing.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
Adesh Dsilva
03/02/2023, 10:54 AMParth
03/02/2023, 11:40 AMNick Pocock
03/02/2023, 4:47 PMdario bonino
03/02/2023, 5:10 PMHave records for a split that was not registered
. However split addition is performed through the context and ideally the splits should be registered. Anyone is perhaps incurred in a similar issue? If so, are there any attention points that we must consider in trying to debug this issue?sharad mishra
03/02/2023, 8:23 PMJames McGuire
03/03/2023, 1:16 AMJSON_OBJECT
component that references a DATE
column.
Here's the schema I'm trying to create and the error:
CREATE TABLE test_table (
some_date DATE,
object AS JSON_OBJECT(
KEY 'some_date' VALUE some_date
)
)
COMMENT ''
WITH (
'connector'='datagen'
)
;
Flink SQL> select * from default_catalog.default_database.test_table ;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.planner.codegen.CodeGenException: Type 'DATE' is not scalar or cannot be converted into JSON.
Sandeep R
03/03/2023, 3:21 AMRaghunadh Nittala
03/03/2023, 4:11 AMmessage EventChanged {
string id = 1;
map<string, google.protobuf.Any> changes = 2;
}
message SampleRule {
string event_type = 1;
EventChanged change_events = 2;
}
Here are my queries:
CREATE TABLE test_changes (
message_key STRING NOT NULL,
event_type STRING NOT NULL,
event_changed ROW<id STRING, changes MAP<STRING, BYTES>>,
CONSTRAINT pk_test_changes PRIMARY KEY (message_key) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test-changes',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'value.format' = 'protobuf',
'value.protobuf.message-class-name' = 'TestProto$SampleRule',
'value.fields-include' = 'EXCEPT_KEY'
);
My question is - Is the datatype BYTES correct for protobuf Any field? How can I prepare the data for these type of fields.
I tried the statement, `INSERT INTO test_changes(message_key, event_type, event_changed) VALUES('abc123', 'efg', ROW('123', MAP['www', '123']));`but this didn’t work.Sudhan Madhavan
03/03/2023, 6:27 AMRohan Kumar
03/03/2023, 9:20 AMAditya
03/03/2023, 1:21 PMRichard Noble
03/03/2023, 1:26 PMGuruguha Marur Sreenivasa
03/03/2023, 2:44 PMAlex Escobedo
03/03/2023, 3:25 PMSingh
03/03/2023, 6:40 PMupgradeMode: savepoint
to upgradeMode: stateless
? Operator docs mention stateless
isn’t recommended for production use, is that a general recommendation or that stateless
wouldn’t work for this case ?
Thanks !Leon Xu
03/03/2023, 7:02 PMJames Timotiwu
03/03/2023, 9:16 PMMali
03/04/2023, 7:45 AMimport os
from pyflink.common import SimpleStringSchema
from pyflink.datastream.connectors.rabbitmq import RMQConnectionConfig, RMQSource
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
# logs configs
RABBITMQ_LOGS_HOST = os.getenv("RABBITMQ_LOGS_HOST")
RABBITMQ_LOGS_USER = os.getenv("RABBITMQ_LOGS_USER")
RABBITMQ_LOGS_PASSWD = os.getenv("RABBITMQ_LOGS_PASSWD")
RABBITMQ_LOGS_PORT = os.getenv("RABBITMQ_LOGS_PORT")
QUEUE_NAME = os.getenv("QUEUE_NAME")
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib/flink-connector-rabbitmq-1.16.1.jar","file:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0")
env.enable_checkpointing(600000)
rabbitmq_connection_config = RMQConnectionConfig.Builder() \
.set_host(RABBITMQ_LOGS_HOST) \
.set_port(RABBITMQ_LOGS_PORT) \
.set_user_name(RABBITMQ_LOGS_USER) \
.set_password(RABBITMQ_LOGS_PORT) \
.set_prefetch_count(30000).build()
My deployment yaml is;
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: wuxi
namespace: flink
spec:
ingress:
template: "my-private-ingress-url"
className: "nginx"
annotations: {}
image: "my-private-repo"
imagePullPolicy: "Always"
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
serviceAccount: flink
containers:
# Do not change the main container name
- name: flink-main-container
envFrom:
- secretRef:
name: flink-secret
resource:
memory: "4096m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 1
replicas: 1
job:
jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/main.py"]
parallelism: 1
upgradeMode: stateless
I am getting following errors;
ERRORrootException while sending command.
Traceback (most recent call last):
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1224, in send_command
raise Py4JNetworkError(“Answer from Java side is empty”)
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1038, in send_command
response = connection.send_command(command)
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1228, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File “/usr/local/lib/python3.8/runpy.py”, line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File “/usr/local/lib/python3.8/runpy.py”, line 87, in _run_code
exec(code, run_globals)
File “/tmp/pyflink/cb21431412312412321412231/421312125676768445654642/main.py”, line 22, in <module>
rabbitmq_connection_config = RMQConnectionConfig.Builder() \
File “/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/rabbitmq.py”, line 83, in init
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1547, in getattr
py4j.protocol.Py4JError: org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder does not exist in the JVM
ERROR org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: Thread ‘Thread-14’ produced an uncaught exception. If you want to fail on uncaught exceptions, then configure cluster.uncaught-exception-handling accordingly
java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory
at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?]
at java.lang.Class.privateGetPublicMethods(Unknown Source) ~[?:?]
at java.lang.Class.getMethods(Unknown Source) ~[?:?]
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:284) ~[flink-python-1.16.1.jar:1.16.1]
at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.getMember(ReflectionCommand.java:140) ~[flink-python-1.16.1.jar:1.16.1]
at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.execute(ReflectionCommand.java:91) ~[flink-python-1.16.1.jar:1.16.1]
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) ~[flink-python-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.ConnectionFactory
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 9 more
2023-03-04 070448,953 INFO org.apache.flink.client.python.PythonDriver [] - ERRORrootException while sending command.
Traceback (most recent call last):
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1224, in send_command
raise Py4JNetworkError(“Answer from Java side is empty”)
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
Can anyone help to solve this issue please ?
Very thanks for your effort 🙂saqlain pasha
03/04/2023, 11:16 AMorg.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(8e6c7b9d45fca21399ee867b4db74649, LocalRpcInvocation(requestResourceOverview(Time))) sent to <akka.tcp://flink@realtime-data-publisher.realtimeproc:6123/user/rpc/resourcemanager_0> because the fencing token is null.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Sumit Nekar
03/05/2023, 10:40 AMSumit Nekar
03/05/2023, 6:27 PM2023-03-05 18:22:06,182 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at <akka://flink/user/rpc/resourcemanager_0> .
2023-03-05 18:22:06,193 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector streaming-pipeline-dispatcher-leader with lock identity 5b92b514-7d6a-4203-a50e-c251de97cd74.
2023-03-05 18:22:06,288 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with KubernetesLeaderElectionDriver{configMapName='streaming-pipeline-dispatcher-leader'}.
2023-03-05 18:22:06,288 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
2023-03-05 18:22:06,293 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-restserver-leader.
2023-03-05 18:22:06,293 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-dispatcher-leader.
2023-03-05 18:22:06,388 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='streaming-pipeline-resourcemanager-leader'}.
2023-03-05 18:22:06,491 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='streaming-pipeline-dispatcher-leader'}.
2023-03-05 18:22:07,077 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1.
2023-03-05 18:22:07,078 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
2023-03-05 18:22:07,078 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector streaming-pipeline-resourcemanager-leader with lock identity 5b92b514-7d6a-4203-a50e-c251de97cd74.
2023-03-05 18:22:07,089 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-resourcemanager-leader.
2023-03-05 18:22:07,117 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with KubernetesLeaderElectionDriver{configMapName='streaming-pipeline-resourcemanager-leader'}.
2023-03-05 18:22:19,497 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2023-03-05 18:22:19,498 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2023-03-05 18:22:19,499 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2023-03-05 18:22:19,499 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2023-03-05 18:22:19,593 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-03-05 18:22:19,593 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-03-05 18:22:19,595 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-03-05 18:22:19,595 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:/opt/flink/lib/flink-dist_2.12-1.13.6.jar) to method java.nio.DirectByteBuffer.cleaner()
Shaon
03/06/2023, 12:16 AMZhiyu Tian
03/06/2023, 2:04 AMRM
03/06/2023, 6:47 PMDima Sheludko
03/06/2023, 7:41 PMCaused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ; S3 Extended Request ID: ; Proxy: null), S3 Extended Request ID: (Path: <s3p://myapp/flink-checkpoints/98d9c76cf8bb4fae909c4bcd0a45c271/chk-27/b89524c6-bf57-425d-82c7-846861f085c3>)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:677)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:398)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:453)
i am working with aws s3 and here is list of allowed actions
"GetObject",
"GetObjectAcl",
"GetObjectVersion",
"DeleteObject",
"DeleteObjects",
"HeadObject",
"ListObjects",
"ListObjectsV2",
"PutObject",
"PutObjectAcl",
"PutObjectRetention",
"UploadPart",
"ListBucket"
Flink successfully created a recovery in S3 but cannot create a checkpoint. It seems that I need to extend the list of allowed actions to be able to store checkpoints, but I cannot determine which actions to allow. Does anybody have any familiarity with this problem or any ideas on how to fix it?Ryner Menezes
03/06/2023, 8:45 PMThijs van de Poll
03/06/2023, 11:06 PMpostgres-cdc
connector as source to write to Delta Lake via Trino. However, when using the jdbc
connector to write to Trino I get the following error:
Caused by: java.lang.IllegalStateException: Could not find any jdbc dialect factory that can handle url 'jdbc:<trino://localhost:8080/deltalake>' that implements 'org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory' in the classpath.
What would be the best way to sink the data to Trino? Any suggestions?
Thanks!