Alberto Otero Lorenzo
06/15/2023, 9:58 AMhighAvailability.storageDir
to <s3://my-minio-host>
. The problem comes with the credentials. Trying to set up from s3.access.key
s3a.access.key
AWS_ACCESS_KEY_ID
with the correspondent secret vars, but no solution has effec and I keep getting the error at jobmanager start log:
java.util.concurrent.CompletionException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, com.amazonaws.auth.profile.ProfileCredentialsProvider@1f86353d: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@1b597309: Failed to connect to service endpoint
Anyone could help me with this?
Thank you.Dominic Lindsay
06/15/2023, 12:57 PM// Representation of a Model Evaluation that was made:
// "This model produced this output when given this input".
message Evaluation {
string id = 1;
Model model = 2;
ModelIO io = 3;
CorrelationData correlation_data = 4;
google.protobuf.Timestamp timestamp = 5;
}
message Model {
string name = 1;
string version = 2;
}
message ModelIO {
google.protobuf.Struct input = 1;
google.protobuf.Struct output = 2;
}
message CorrelationData {
google.protobuf.Struct data = 1;
}
Evaluations store timestamps as google.protobuf.Timestamp
typed values.
Evaluations are serialised as protobuf byte arrays and are written to a evaluations kafka topic as kafka messages:
{
key: evaluation.id
msg: evaluation
}
Consumers are able to consume from topics. When they do they receive the kafka message object, it is serialised as a byte string/array/vector of the structure described above.
When Flink java process consumes these message it will encode the msg
object as a UTF-8
string and pass the encoded string to python so that PyFlink can consume the message.
We are then able to deserialise the object:
def deserialise(value):
bytes = value.encode('utf-8')
evaluation = Evaluation.FromString(bytes)
return evaluation
However an error is raised:
*** google.protobuf.message.DecodeError: Error parsing message with type 'evaluation.Evaluation'
This is because the timestamp field is not correctly handled. The byte string looks like this:
b'\n\npxgqdrnbtj\x12\x06\n\x01c\x12\x016\x1a3\n\x17\n\x15\n\x05input\x12\x0c\x1a\npoietemwix\x12\x18\n\x16\n\x06output\x12\x0c\x1a\nhbzualvwbz"\x17\n\x15\n\x13\n\x03ref\x12\x0c\x1a\nnoqqltgkfq*\x0c\x08\xef\xbf\xbd\xd1\xa7\xef\xbf\xbd\x06\x10\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\x03'
The last 26 bytes (everything from and including the *
) encode the timestamp field.
If we try to deserialise the object whilst omitting the timestamp field, no error is raised:
python
(pdb) Evaluation.FromString(bytes[:-26])
id: "pxgqdrnbtj"
model {
name: "c"
version: "6"
}
io {
input {
fields {
key: "input"
value {
string_value: "poietemwix"
}
}
}
output {
fields {
key: "output"
value {
string_value: "hbzualvwbz"
}
}
}
}
correlation_data {
data {
fields {
key: "ref"
value {
string_value: "noqqltgkfq"
}
}
}
}
I create a second script which uses confluent-kafka.Consumer
and attempt to deserialised the msg
field of a kafka message.
consumer = Consumer({
"bootstrap.servers": BOOTSTRAP_SERVER,
"client.id": "sample-producer",
"group.id": "consumer",
})
consumer.subscribe([TOPIC])
while True:
try:
msg = consumer.poll(1.0)
if not msg:
continue
print(msg.value())
breakpoint()
except KeyboardInterrupt:
break
We can copy the byte string produced by this script into the flink program debug context:
python
(pdb) bs = b'\n\nadtiegqpdd\x12\x06\n\x01c\x12\x015\x1a3\n\x17\n\x15\n\x05input\x12\x0c\x1a\noscqcjntwt\x12\x18\n\x16\n\x06output\x12\x0c\x1a\ngwwkwcizol"\x17\n\x15\n\x13\n\x03ref\x12\x0c\x1a\ncsicpsesyc*\x0c\x08\xd5\xc6\xab\xa4\x06\x10\xe8\x9f\xa9\xba\x03'
(pdb) Evaluation.FromString(bs)
id: "adtiegqpdd"
model {
name: "c"
version: "5"
}
io {
input {
fields {
key: "input"
value {
string_value: "oscqcjntwt"
}
}
}
output {
fields {
key: "output"
value {
string_value: "gwwkwcizol"
}
}
}
}
correlation_data {
data {
fields {
key: "ref"
value {
string_value: "csicpsesyc"
}
}
}
}
timestamp {
seconds: 1686823765
nanos: 927617000
}
This shows the protobuf timestamp type is available on the filesystem.
Therefore I believe the issue is related to how PyFlink is reading kafka messages.
Pyflinks kafka consumer is a java class which is called from a python process.
- The python representation configures initialises a java object via java gateway
, by configuring its charset to UTF-8.
- The Java backend simply serialises the value fields and returns it to caller (source).
I believe there is an issue with the encoding but I am currently unsure.
Has anyone has any similar issues when handling protobuf messages from kafka?Adam Augusta
06/15/2023, 1:47 PMSlackbot
06/15/2023, 3:49 PMmralfredgui
06/15/2023, 4:09 PMkey_by
and reduce
operators when locally running pyflink 1.13. Here is the code:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection(
collection=[(1, 2), (2, 3), (1, 3), (2, 4)],
type_info=Types.ROW([<http://Types.INT|Types.INT>(), <http://Types.INT|Types.INT>()]))
concat_stream = ds.key_by(lambda x: x[0], key_type=<http://Types.INT|Types.INT>()).reduce(lambda a, b: (a[0], a[1] + b[1]))
concat_stream.print()
env.execute("tutorial_job")
This is the key error message:
AttributeError: 'tuple' object has no attribute 'get_fields_by_names'
is there anything wrong with the code?Danila Maksimenko
06/15/2023, 6:46 PM2023-06-15 18:41:30,374 INFO org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation token receiver s3-hadoop loaded and initialized
2023-06-15 18:41:30,375 ERROR org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Failed to initialize delegation token receiver s3-hadoop
java.lang.IllegalStateException: Delegation token receiver with service name {} has multiple implementations [s3-hadoop]
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_372]
at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.<init>(DelegationTokenReceiverRepository.java:60) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:245) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:293) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:486) ~[flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530) ~[flink-dist-1.17.1.jar:1.17.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_372]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) [hadoop-common-3.3.5.jar:?]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:510) [flink-dist-1.17.1.jar:1.17.1]
at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:468) [flink-dist-1.17.1.jar:1.17.1]
my hadoop version is 3.3.5, hive 3.1.3, scala 2.12, iceburg 1.3.0 and serval plugins S3, azure blob, AWS_SDK_VERSION 2.20.18. and for local dev we are using minio for s3.
(edit) adding security.delegation.token.provider.s3-hadoop.enabled: false para to flinkconf.yaml has solved to issue of none start, but I would like a better solution without having to disable anything.Clen Moras
06/15/2023, 10:50 PMid VARCHAR,
name VARCHAR,
type VARCHAR,
hp VARCHAR,
attack VARCHAR,
defense VARCHAR,
weakness VARCHAR,
region VARCHAR
)
WITH (
'connector' = 'hive',
'path' = 's3a://clen-test/',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'true'
);[INFO] Table has been created. Flink SQL> select * from pokemon11; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=hive csv.field-delimiter=, csv.ignore-parse-errors=true format=csv path=s3a://aiops-datalake-test/ schema.0.data-type=VARCHAR(2147483647) schema.0.name=id schema.1.data-type=VARCHAR(2147483647) schema.1.name=name schema.2.data-type=VARCHAR(2147483647) schema.2.name=type schema.3.data-type=VARCHAR(2147483647) schema.3.name=hp schema.4.data-type=VARCHAR(2147483647) schema.4.name=attack schema.5.data-type=VARCHAR(2147483647) schema.5.name=defense schema.6.data-type=VARCHAR(2147483647) schema.6.name=weakness schema.7.data-type=VARCHAR(2147483647) schema.7.name=region The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory I got the following error, but i m not sure what i m doing wrong. any suggestions.
Sucheth Shivakumar
06/15/2023, 11:51 PMorg.apache.flink.util.FlinkException: Fatal error occurred in TaskExecutor <akka.tcp://flink@100.126.185.4:6122/user/rpc/taskmanager_0>.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager <akka.tcp://flink@100.126.2.182:6123/user/rpc/resourcemanager_0> has been rejected: Rejected TaskExecutor registration at the ResourceManager because: The ResourceManager does not recognize this TaskExecutor.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2442) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2386) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist-1.17.0.jar:1.17.0]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Yaroslav Bezruchenko
06/16/2023, 12:30 AM2023-06-16 00:19:51,850 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient received an error.
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Status.asRuntimeException(Status.java:526) ~[blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
...
2023-06-16 00:19:51,852 WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Status.asRuntimeException(Status.java:526) ~[blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
...
2023-06-16 00:19:51,850 ERROR org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer [] - Failed to handle for unknown endpoint
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
....
2023-06-16 00:19:51,861 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient closed, clearing outstanding requests {1=java.util.concurrent.CompletableFuture@73abe63d[Not completed, 1 dependents]}
Next issues claim that there are deserialization issue with record, but actual cause is:
Caused by: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
kingsathurthi
06/16/2023, 7:33 AMAri Huttunen
06/16/2023, 8:16 AM2023-06-16 11:13:05,718 WARN org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory [] - Could not close the state stream for <s3p://some-bucket/flink.internal.checkpoints/18c1d780196f5badf2a4a83d7452d300/chk-1/24f92907-26eb-409d-bb73-da00443b4602>.
java.io.IOException: No space left on device
The partition where Flink is installed does not have much space space, and neither does /tmp
, but we've tried moving everything to a partition that has a lot more storage, like this:
io.tmp.dirs: {{ flink_tmp_dir }}
process.working-dir: {{ flink_tmp_dir }}
web.tmpdir: {{ flink_tmp_dir }}
historyserver.web.tmpdir: {{ flink_tmp_dir }}
jobmanager.archive.fs.dir: {{ flink_tmp_dir }}
jobmanager.web.tmpdir: {{ flink_tmp_dir }}
jobmanager.web.upload.dir: {{ flink_tmp_dir }}
web.upload.dir: {{ flink_tmp_dir }}
env.log.dir: {{ flink_log_dir }}
S3 is not full. Nothing gets full locally as far as I can see with df -h
.
Any suggestions?
Edit: I also tried installing Flink to a partition that has lots of space, but it didn't help. That partition is meant for data, so I moved it back.Sunidhi Tiwari
06/16/2023, 1:24 PMSlackbot
06/16/2023, 1:28 PMLluc
06/16/2023, 2:13 PMJSON_OBJECT('k' VALUE "another json")
to have nested JSONs, this is escaping the value as a string. Someone can give me a hand.
Also, I want to add that our nested JSON is dynamic (doesn't have a predefined form) and we are forced to use Table API only 😕
We are migrating to Flink 1,17Danny Cranmer
06/16/2023, 5:09 PMcluster.evenly-spread-out-slots
set true and I can see that slots are evenly spread across TMs. However, the subTasks are not evenly spread across TMs.Yaroslav Bezruchenko
06/16/2023, 5:49 PMkubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "3m"
With this config my deployment restarts each 10 mins with next logs:
Scaling vertices: Vertex ID a267aa7696f616c525ab17d1bd9850c8 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.55 Vertex ID 5570a4732e197e06bd668b2739d98f8c | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.81 Vertex ID 2963852293169ba90d9d1e7d6308db5c | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID c3c7122991bd9906cb9d1effc69b74a3 | Parallelism 8 -> 6 | Processing capacity 92.21 -> 47.00 | Target data rate 23.36 Vertex ID 456225327bbc50a04e000587e8caa7d1 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.74 Vertex ID c49c06ed19ebbb8d7fd89425e1732dc8 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.78 Vertex ID 6dc0226b15c44c9c2e1f9ea1a65fd400 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 1.84 Vertex ID 5c4ca2fea30dcf09bf3ee40c495fe808 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 4952818ce422a2d9bf045a1179b37754 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.40 Vertex ID 47c512bbcb5e27815beb7632e2a43ec4 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.81 Vertex ID 7b1f408ee98c276c042c8126a0421765 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.71 Vertex ID d412772f7356afb1e854a3e17e6fc99e | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.30 Vertex ID 02dc93997fd5aa6594c92b9fccde27e2 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.23 Vertex ID 3b1890aff09d9152377d394a8c3ad1d5 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.31 Vertex ID bf1889e653b6e7d928d2bd2604dca8dd | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 19.88 Vertex ID e20cdc086f87b1f78afa7062ae50c3da | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 19.93 Vertex ID 6cdc5bb954874d922eaee11a8e7b5dd5 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID de53745574bc42e95b1357827cc9f260 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.77 Vertex ID 1abb23b8bef7c5612f869be2fb3fcf61 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.39 Vertex ID 91ae58460ac994d528a45eda171f987a | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.47 Vertex ID 7cbd5491ae25a563a6d4672f34d0f931 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.45 Vertex ID 01c3449094a3cab3b695255ee333decb | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 20.10 Vertex ID cbc357ccb763df2852fee8c4fc7d55f2 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.50 Vertex ID 7b958a0c17d67bafbb848d71292964cd | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 20.02 Vertex ID adc382fc56e42e5a1c3e97cd60017201 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.50 Vertex ID 1171dea6747ab509fdaefbe74f7195af | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 2fa5fc7df17f61cb0e1946288970d799 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 79eb943b5d8344895c53163f0bcb07cb | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.77 Vertex ID 790ed4d19311724271d676c43c6e3949 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.73 Vertex ID b1a2a2523a4642215643a6a4e58f0d05 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 760b6fbf7de95dce57f3d3f87acab928 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.52
Zhong Chen
06/16/2023, 5:59 PMBharathkrishna G M
06/16/2023, 10:25 PMenv.addCheckpointListener(checkPtComplete())
Or any alternate way I can listen to checkpoints and do some logic ?Sunidhi Tiwari
06/17/2023, 5:42 PMfinal StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
The above code will assign records to the default one hour time buckets. I want to write all the parts in same directory, don't want hourly bucket. Is there is way to do so?Hygor Knust
06/17/2023, 6:08 PM*.ignore-parse-errors
, but it is not available for Avro.Dheeraj Panangat
06/18/2023, 7:08 AMMishel Liberman
06/18/2023, 9:02 AMkiran kumar
06/18/2023, 2:19 PMflink-properties:
checkpoint-interval: 5000
checkpoint-timeout: 15000
checkpoint-pause-between: 500
rest-address: sherlocksvc-jobmanager.sherlocksvc
rest-port: 8082
blob-server-port: 6124
data-port: 6121
jobmanager-rpc-address: sherlocksvc.sherlocksvc.sherlocksvc
jobmanager-rpc-port: 6123
taskmanager-rpc-port: 6122
taskmanager-numberOfTaskSlots: 1
https://blog.knoldus.com/flink-on-kubernetes/kingsathurthi
06/19/2023, 7:56 AMNAME JOB STATUS LIFECYCLE STATE
flinkdeployment-uat UPGRADING
Error:
Status:
Cluster Info:
Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"flinkdeployment-uat\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"flinkdeployment-uat\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: <https://172.30.0.1/apis/apps/v1/namespaces/dev-istio/deployments>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.apps \"flinkdeployment-uat\" is forbidden: cannot set blockOwnerDeletion if an ownerReference refers to a resource you can't set finalizers on: , <nil>."}]}
Job Manager Deployment Status: MISSING
Bhavya Soni
06/19/2023, 9:23 AMapache-flink==1.14.4
apache-flink-libraries==1.14.4
pyflink==1.0
I am facing an error regarding from pyflink.connector.gcp.pubsub import PubSubSource
ModuleNotFoundError: No module named 'pyflink.connector'
.
I want to use the pubsub connector to stream the data to SIEM servers. I came across this documentation https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python/reference/pyflink.datastream/connectors.html and wanted to know that is there any available implementation for pubsub connector in flink for python?Ari Huttunen
06/19/2023, 11:45 AM0ms / 0ms / 19527d 10h 49m 58s
.
Is that -1 by any chance?
It also shows up as Busy (max): 100%, but I have some doubts of that as well.Jonathan Feinberg
06/19/2023, 2:43 PMINSERT INTO sink_table
SELECT
`key`,
`journeyRef`,
MAX(`eventTimestamp`) AS `eventTimestamp`,
SUM(`alightingCount`) AS `totalAlighting`
FROM source_table
GROUP BY `key`, `journeyRef`
I am trying to source from filesystem source connector to filesystem sink connector, and I am getting the error:
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.accjoin_filesystem' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[key, journeyRef], select=[key, journeyRef, MAX(eventTimestamp) AS eventTimestamp, SUM(alightingCount) AS totalAlighting])
However, if I replace the sink with a print connector, it works as expected.
Is this a bug, or am I missing a limitation in how the filesystem connector works?kiran kumar
06/19/2023, 5:40 PMcommand: ["/opt/flink/bin/standalone-job.sh"]
But the issue is, we are dockerizing our flink application and deploying. The start command path "/opt/flink" is not present in our jobmanager pod. We are getting the path unknown. How does "/opt/flink/" path is created and how to find standalone-job.sh file ?Anmol
06/20/2023, 5:35 AMYaroslav Bezruchenko
06/20/2023, 9:02 AM