Nithin kharvi
08/22/2022, 9:07 AMSylvia Lin
08/22/2022, 5:31 PMflink.task.numRecordsOut
, 19606 tags generated for task_attempt_id in past one hourJirawech Siwawut
08/22/2022, 5:45 PM17:38:15.505 [main] ERROR org.apache.thrift.transport.TSaslTransport - SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed
It seems Flink SQL Client does not init Kerberos session?Hilmi Al Fatih
08/23/2022, 4:10 AMJirawech Siwawut
08/23/2022, 5:18 AMThe only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
I check the code but it seems the only option is all
at the moment
https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.javaGiannis Polyzos
08/23/2022, 6:12 AMkubectl get pods -n kube-system | grep cert
cert-manager-cainjector-695476768b-jsk4t 1/1 Running 0 6d14h
cert-manager-controller-95d89d7c8-5sr6g 1/1 Running 0 6d14h
cert-manager-webhook-66697d8b77-l69sv 1/1 Running 0 6d14h
I'm trying to install the flink-operator in a flink namespace, but I'm getting this error:
Error: INSTALLATION FAILED: unable to build kubernetes objects from release manifest: [resource mapping not found for name: "flink-operator-serving-cert" namespace: "flink" from "": no matches for kind "Certificate" in version "<http://cert-manager.io/v1|cert-manager.io/v1>"
ensure CRDs are installed first, resource mapping not found for name: "flink-operator-selfsigned-issuer" namespace: "flink" from "": no matches for kind "Issuer" in version "<http://cert-manager.io/v1|cert-manager.io/v1>"
ensure CRDs are installed first]
Metehan Yıldırım
08/23/2022, 7:24 AMIvan M
08/23/2022, 12:51 PMConfiguring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
error.
Any ideas where should look for a root cause? 🙂Pedro Cunha
08/23/2022, 3:15 PM1.14.3
to 1.15.1
I started getting errors of Regressing checkpoing IDs
. It looks like Flink is trying to use old ids to create checkpoints, for some reason. Previous checkpointId = 31235, new checkpointId = 504
. On some deployments, it fails when the id is the same. Anyone has any clue on how to do this? Is there an config I’m missing?Will Norman
08/23/2022, 3:33 PMFlinkKinesisConsumer
extends RichParallelSourceFunction
where as I need a Source
object, similar to KafkaSource
. Does anyone know if it's possible to read from Kinesis as part of a hybrid source?Hunter Medney
08/23/2022, 7:02 PMfinal Configuration jobConfiguraiton = new Configuration();
jobConfiguraiton.setString("s3.endpoint", "<http://localhost:9000>");
jobConfiguraiton.setString("s3.path.style.access", "true");
jobConfiguraiton.setString("s3.access-key", "minio");
jobConfiguraiton.setString("s3.secret-key", "minio123");
jobConfiguraiton.setString("s3.connection.ssl.endabled", "false");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(jobConfiguraiton);
DataStreamSource<String> text = env.readTextFile("<s3://test/example.csv>");
text.print();
env.execute();
and via FLINK_PROPERTIES:
FLINK_PROPERTIES="s3.access-key: minio\ns3.secret-key: minio123\ns3.endpoint: <http://localhost:9000> "
and it keeps behaving as if no endpoint or credentials are defined:
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, com.amazonaws.auth.profile.ProfileCredentialsProvider@21c90685: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@574cf28b: Failed to connect to service endpoint: ]
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1257)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:833)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:783)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
at com.amazonaws.services.s3.AmazonS3Client.getBucketRegionViaHeadRequest(AmazonS3Client.java:6220)
at com.amazonaws.services.s3.AmazonS3Client.fetchRegionFromCache(AmazonS3Client.java:6193)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5244)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360)
at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1334)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:667)
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1690)
at org.apache.flink.fs.s3presto.common.HadoopFileSystem.exists(HadoopFileSystem.java:165)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:215)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
...feel like I'm missing something simple but have been wrestling with this for a while. Thank you for any insights!Prasanth Kothuri
08/23/2022, 8:47 PMWatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofMinutes(5))
.withIdleness(Duration.ofMinutes(10))
Rafał Trójczak
08/24/2022, 8:10 AMStreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
Is there a way to set configuration options (like taskmanager.memory.network.min
) from command line when running this Flink job?
Of course, I can set configuration options using the Configuration
class and createLocalEnvironment
but I don't want to do this because the same job will run on a cluster.Felix Angell
08/24/2022, 11:28 AMKevin L
08/24/2022, 6:36 PMkafka_source_query = """
CREATE TABLE if not exists kafka_topic (
`c1` VARCHAR,
`c2` VARCHAR,
`c3` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'kafka-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
result = t_env.execute_sql(kafka_source_query)
tbl = t_env.from_path("kafka_topic")
tbl.select(col("c1"),col("c2"), col("c3")).limit(10).execute().print()
tldr: The last statement never finishes (ie. tbl.select(col("c1")...
), it just prints out 10 rows to the console and hangs. I would like to run this script with it exiting on its own.
Any thoughts on this? is this expected or a bug?Krish Narukulla
08/24/2022, 7:26 PMLocalTableEnvironment
for debugging purpose. https://nightlies.apache.org/flink/flink-docs-master/docs/dev/dataset/local_execution/Darin Amos
08/24/2022, 9:39 PMContinuousFileReaderOperator
and an S3 source (a custom FileMonitoringFunction). I have not been able to find an answer on Google or documentation.
My understanding was that checkpoint barriers cannot overtake stream records and I considered file splits to be like any other type of record. I also understand that aligned checkpoints are not supposed to persist in-flight data. I don’t think my observations perfectly align with this. Currently my aligned checkpoints run every minute and I have several S3 files that take longer than 1 minute to parse and process. However, when a checkpoint is taken while I am processing one of these large files my checkpoint starts and completes while the file is still being processed. I also notice when this happens that parallel reader operators have a mix of “checkpointed data size” values.
I’m curious what is happening behind the scenes here. Is it possible that “transform” operators such as the continuous file reader are treated as a special case where checkpoint barriers are allowed to overtake records (file splits in this case) and the input buffers are persisted in the checkpoint?
My guess would have been that the checkpoint barrier needs to wait for all the file splits to be processed.Sylvia Lin
08/24/2022, 11:32 PMKafkaSink
? Previously we use FlinkKafkaProducer
, it serializes record as ProducerRecord
which holds topic info.
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html
But we know FlinkKafkaProducer
will be deprecated soon, so we want to switch to KafkaSink
or other similar methodKrish Narukulla
08/25/2022, 12:18 AMException in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:108)
at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:300)
at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:971)
at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:943)
at com.roku.common.frameworks.flink.execution.ExecutionV1.execute(ExecutionV1.scala:31)
at com.roku.common.frameworks.flink.FlinkApp$.main(FlinkApp.scala:27)
at com.roku.common.frameworks.flink.FlinkApp.main(FlinkApp.scala)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:526)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105)
... 6 more
Carl Choi
08/25/2022, 2:18 AMRashmin Patel
08/25/2022, 5:44 AMAdesh Dsilva
08/25/2022, 10:53 AMString[]
instead of List<String>
.
My question is why doesn’t Flink automatically create this type information?
I can understand List<T> can be difficult but it should be possible for primitive types like List<String>
or List<Integer>
.
Another approach I see is to use avro or protobuf files and generate java files from them but would like to avoid them if possible.Felix Angell
08/25/2022, 11:14 AMDhavan
08/26/2022, 4:57 AMDELETE
statements. Though all my queries are INSERT
queries with a few LEFT JOIN
. So, could it be that my queries are somehow causing records to delete?Prasanth Kothuri
08/26/2022, 7:26 AMKeyedOneInputStreamOperatorTestHarness
, are there any examples that I can look at ? Thanks a ton
The code I want to unit test is like this - https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/windows/#incremental-window-aggr[…]n-with-aggregatefunctionchunilal kukreja
08/26/2022, 1:00 PMSince RocksDB is part of the default Flink distribution, you do not need this dependency if you are not using any RocksDB code in your job and configure the state backend via state.backend and further checkpointing and RocksDB-specific parameters in your flink-conf.yaml.
That means we needn’t have to include dependency “*right*”;
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>1.15.1</version>
<scope>provided</scope>
</dependency>
But this doesn’t work 😞 while if i do this programmatically it works as expected.
Can someone who has done this through config way look into this or suggest?Krish Narukulla
08/26/2022, 6:53 PMsrc/test/scala/com/xxx/common/frameworks/flink/Test.scala:16: error: Class org.apache.flink.runtime.testutils.MiniClusterResource not found - continuing with a stub.
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
^
src/test/scala/com/xxx/common/frameworks/flink/Test.scala:16: error: Class org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not found - continuing with a stub.
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
^
src/test/scala/com/xxx/common/frameworks/flink/Test.scala:41: error: could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[Long]
env.fromElements(1L, 21L, 22L)
^
Jeff Levesque
08/27/2022, 1:30 AMpip
package to run. Within my UDAF, I have the following segment to ensure it has the desired xxx
package in order to run:
try:
import xxx
except ModuleNotFoundError:
import sys, subprocess
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'xxx'])
finally:
import xxx
However, I don't want to install the pip
package xxx
as performed above. Instead, I'd like it to be downloaded within the PyFlink application directory, and imported via a relative path. This way I could bundle up the build using some CI/CD pipeline. Before I go about this, I wanted to double check on best practices regarding importing pip packages from a local path (preferably within the PyFlink application directory).Sigh
08/27/2022, 8:30 AMKrish Narukulla
08/28/2022, 5:01 AMLocalEnvironment
. Resulting exception.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:526)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:276)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:93)
at com.roku.common.frameworks.flink.Sample$.main(Sample.scala:31)
at com.roku.common.frameworks.flink.Sample.main(Sample.scala)
Code:
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
// environment configuration
val settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
val tEnv = TableEnvironment.create(settings);
// register Orders table in table environment
// ...
// specify table program
val orders = tEnv.from("Orders") // schema (a, b, c, rowtime)
val result = orders
.groupBy($"a")
.select($"a", $"b".count as "cnt")
.toDataStream
.print()