Justin
09/14/2022, 7:49 PMpostgres
.wilbur
.nfdc.countries
Full stack trace is attached.
DDL:
CREATE TABLE postgres.wilbur.nfdc.countries
(
country_code STRING,
country_name STRING,
numeric_country_code INT,
batch_date TIMESTAMP(6)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/wilbur',
'table-name' = 'postgres.wilbur.`nfdc.countries`'
);
I created a catalog and I am able to query the table without any issues (e.g., SELECT * FROM postgres.wilbur.nfdc.countries
LIMIT 3;) – however, it fails each time I try to register the table.
Any help would be appreciated!
Thanks,
JustinSatya
09/15/2022, 6:46 AMFileSource.forRecordStreamFormat
for reading Avro
files in S3 with a build config of .monitorContinuously(Duration.ofMillis(1000))
. To create StreamFormat
I have used Avro4s
as below
object AvroStreamFormat extends SimpleStreamFormat[Send] {
override def createReader(config: Configuration, stream: FSDataInputStream): StreamFormat.Reader[Send] = {
val schema = AvroSchema[Send]
val reader: AvroInputStream[Send] = AvroInputStream.data[Send].from(stream).build(schema)
new StreamFormat.Reader[Send] {
override def read(): Send =
reader.iterator.next()
override def close(): Unit = reader.close()
}
}
override def getProducedType: TypeInformation[Send] = TypeInformation.of(classOf[Send])
}
With above I created a source as below:
val source = FileSource.forRecordStreamFormat(
AvroStreamFormat,
new Path(s3Path))
.monitorContinuously(Duration.ofMillis(1000))
.build()
And then datastream like below:
val recordStream: DataStream[Send] = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"email-send"
)
When I run the app I am getting java.util.NoSuchElementException
Full stacktrace in comment.
Question I have:
1. FileSource api on S3 can recursively find avro file and any new avro file created on S3 or do I have to give the S3 uri where the avro lives?
2. Is avro4s
is not compatible with FileSource
api ?Kwangin Jung
09/15/2022, 9:51 AMimport org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
// ...
ObjectMapper OBJECT_MAPPER = new ObjectMapper();
StreamingFileSink<String> streamingFileSink = StreamingFileSink
.forRowFormat(
new Path("<s3://output-bucket/>"),
new Encoder<String>() {
@Override
public void encode(String record, OutputStream stream)
throws IOException {
GzipParameters params = new GzipParameters();
params.setCompressionLevel(Deflater.BEST_COMPRESSION);
GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
OBJECT_MAPPER.writeValue(out, record.getBytes("UTF-8"));
out.finish();
}
}
)
.withBucketAssigner(new DateTimeBucketAssigner("yyyy-MM-dd_HH-mm"))
.build();
but this just shows following error
"locationInformation": "org.apache.flink.runtime.taskmanager.Task.transitionState(Task.java:1079)",
"logger": "org.apache.flink.runtime.taskmanager.Task",
"message": "Sink: Trip process sink (1/1)#0 (022614337997d7b7702590819c0d931a) switched from RUNNING to FAILED with failure cause: java.io.IOException: Stream closed.\n\tat org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream.requireOpened(RefCountedFileWithStream.java:73)\n\tat org.apache.flink.fs.s3.common.utils.RefCountedFileWithStream.write(RefCountedFileWithStream.java:53)\n\tat org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:104)\n\tat org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:200)\n\tat org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:168)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.closeForCommit(OutputStreamBasedPartFileWriter.java:62)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:262)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:227)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:220)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:305)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:103)\n\tat org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:492)\n\tat org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)\n\tat org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)\n\tat org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)\n\tat org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)\n\tat org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)\n\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)\n\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)\n\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n",
does someone familiar with this kind of issue? 🙏Hari Krishna Poturi
09/15/2022, 11:09 AMMatyas Orhidi
09/15/2022, 12:03 PMSatya
09/15/2022, 1:12 PM<s3://export-staging/xport/dataexport.stg-07.S3.integration.asdh43kwdhasjrw23423/event_type=users.messages.email.Send/>
let’s we have 2 date partition as below:
date=2022-09-12-10/
date=2022-09-12-11/
With this 2 sets of partition flink filesource API reads avro file fine but when it gets a another partition (example below)
date=2022-09-12-10/
date=2022-09-12-11/
date=2022-09-12-12/
It throws a below error:
Exception in thread "main" org.apache.flink.util.FlinkException: Failed to execute job 'data-export'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1848)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
at com.lightricks.sigma.topologies.EntryPoint$.delayedEndpoint$com$lightricks$sigma$topologies$EntryPoint$1(EntryPoint.scala:74)
at com.lightricks.sigma.topologies.EntryPoint$delayedInit$body.apply(EntryPoint.scala:31)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1$adapted(App.scala:80)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.App.main(App.scala:80)
at scala.App.main$(App.scala:78)
at com.lightricks.sigma.topologies.EntryPoint$.main(EntryPoint.scala:31)
at com.lightricks.sigma.topologies.EntryPoint.main(EntryPoint.scala)
Caused by: java.lang.RuntimeException: Error while waiting for job to be initialized
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$submitJob$2(PerJobMiniClusterFactory.java:83)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:457)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:89)
at org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:144)
... 9 more
Caused by: java.util.concurrent.TimeoutException: Invocation of public default java.util.concurrent.CompletableFuture org.apache.flink.runtime.webmonitor.RestfulGateway.requestJobStatus(org.apache.flink.api.common.JobID,org.apache.flink.api.common.time.Time) timed out.
at com.sun.proxy.$Proxy13.requestJobStatus(Unknown Source)
at org.apache.flink.runtime.minicluster.MiniCluster.lambda$getJobStatus$6(MiniCluster.java:704)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:751)
at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:703)
at org.apache.flink.client.program.PerJobMiniClusterFactory.lambda$null$0(PerJobMiniClusterFactory.java:86)
... 10 more
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<akka://flink/user/rpc/dispatcher_2#248541682]]> after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:874)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:113)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:107)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:872)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
at java.lang.Thread.run(Thread.java:748)
Is it a bug with Flink?Jirawech Siwawut
09/15/2022, 4:30 PM{
"name" : "david",
"events" : [
{
"timestamp" : "2022-08-01 00:00:00",
"id" : "1"
},
{
"timestamp" : "2022-08-01 00:00:01",
"id" : "2"
}
]
}
or
{
"name" : "david",
"events" : {
"timestamp: "2022-08-01 00:00:00"
}
}
I would like to create watermark on column timestampDarin Amos
09/15/2022, 4:41 PMRishabh Kedia
09/15/2022, 5:39 PMRocksDB
as backend for checkpointing and storing the checkpoint
in GCS
. After a few mins of a job running, the first and all the checkpoint fails with the error:
asynchronous part of checkpoint 1 could not be completed.
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:60) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:128) [flink-dist_2.12-1.13.6.jar:1.13.6]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.OutOfMemoryError: Java heap space
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:579) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:380) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:308) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:528) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565) ~[gcs-connector-latest-hadoop2.jar:?]
at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85) ~[gcs-connector-latest-hadoop2.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
... 3 more
Size of first checkpoint - 150mb
Size of second checkpoint - 350 mb
…
The task managers have enough memory 4G assigned. As you can see from the image, no memory limits are being reached. Any idea why are we still seeing OOM errors during checkpointing?Jeesmon Jacob
09/15/2022, 5:54 PM[INFO] +- io.fabric8:kubernetes-client:jar:5.12.2:compile
[INFO] | +- com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:jar:2.13.2:compile
[INFO] | | \- org.yaml:snakeyaml:jar:1.30:compile
Anyone looking at fixing it in v1.1 or in upcoming v1.2? Thanks.Aeden Jameson
09/15/2022, 6:26 PM100 * sum by (pod) (rate(container_cpu_usage_seconds_total{pod=~".*$flinkdeployment.*", container="taskmanager"}[$__rate_interval])/ on (pod)
kube_pod_container_resource_requests{pod=~".*$flinkdeployment.*", resource="cpu"})
Michael LeGore
09/15/2022, 9:14 PMMichael LeGore
09/16/2022, 12:25 AMHuo
09/16/2022, 12:42 AMSource: KafkaTableSource(event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime) -> SourceConversion(table=[default_catalog.default_database.ux_channel_events_progress, source: [KafkaTableSource(event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime)]], fields=[event_ts, upload_time, playback_mode, channel_id, account_id, device_id, roku_content_id, profile_type, provider_id, playback_duration, proctime]) -> Calc(select=[account_id, device_id, roku_content_id, channel_id, profile_type, provider_id, 30 AS streaming_seconds, event_ts], where=[((playback_mode SEARCH Sarg[_UTF-16LE'auto', _UTF-16LE'user']:CHAR(4) CHARACTER SET "UTF-16LE") AND (channel_id SEARCH Sarg[_UTF-16LE'151908':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (account_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (roku_content_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (device_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (profile_type SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (provider_id SEARCH Sarg[(-∞.._UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (() <= 1.0:DECIMAL(2, 1)) AND account_id IS NOT NULL)]) -> SinkConversionToStreamEvent (24/128)
2022-09-14 22:36:47
java.lang.IllegalArgumentException
at sun.reflect.GeneratedConstructorAccessor39.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvokeraw(JavaMirrors.scala:336)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaMethodMirror.jinvoke(JavaMirrors.scala:339)
at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaVanillaMethodMirror.apply(JavaMirrors.scala:355)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$$anonfun$lookupConstructor$2.apply(ScalaCaseClassSerializer.scala:114)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer$$anonfun$lookupConstructor$2.apply(ScalaCaseClassSerializer.scala:113)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:49)
at org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer.createInstance(ScalaCaseClassSerializer.scala:39)
at org.apache.flink.table.data.util.DataFormatConverters$CaseClassConverter.toExternalImpl(DataFormatConverters.java:1498)
at org.apache.flink.table.data.util.DataFormatConverters$CaseClassConverter.toExternalImpl(DataFormatConverters.java:1470)
at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:417)
at SinkConversion$63.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at StreamExecCalc$61.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at SourceConversion$11.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:761)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Kwangin Jung
09/16/2022, 2:00 AMStreamingFileSink
,
OutputFileConfig outputConfig = OutputFileConfig
.builder()
.withPartPrefix("myfile")
.withPartSuffix(".txt")
.build();
StreamingFileSink fileSinker = StreamingFileSink
.forBulkFormat(
new Path(bucket),
new SimpleStringEncoder(),
)
.withOutputFileConfig(outputConfig)
.build();
dataStream
.map(DataObject::toString)
.addSink(fileSinker);
Is there a way to define output file name, by using data inside DataObject
?Kwangin Jung
09/16/2022, 3:50 AMFileSink
and StreamingFileSink
, and history of these two?
Looks like FileSink
is more improved one (also seems support both batch and streaming), but just want more detail 🙏Sumit Nekar
09/16/2022, 7:27 AMDon Li
09/16/2022, 2:37 PMJirawech Siwawut
09/16/2022, 3:36 PM<viewfs://path/mytable>
2022-09-16 22:34:29
java.io.IOException: ViewFs: Cannot initialize: Empty Mount table in config for <viewfs://path/>
at org.apache.hadoop.fs.viewfs.InodeTree.<init>(InodeTree.java:337)
at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.<init>(ViewFileSystem.java:169)
at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
James Timotiwu
09/16/2022, 4:53 PMErwin Cabral
09/16/2022, 6:47 PMIsaac Pohl-Zaretsky
09/16/2022, 7:13 PMRashmin Patel
09/17/2022, 4:03 AMJirawech Siwawut
09/18/2022, 5:31 PM<viewfs://path/mytable>
2022-09-16 22:34:29
java.io.IOException: ViewFs: Cannot initialize: Empty Mount table in config for <viewfs://path/>
at org.apache.hadoop.fs.viewfs.InodeTree.<init>(InodeTree.java:337)
at org.apache.hadoop.fs.viewfs.ViewFileSystem$1.<init>(ViewFileSystem.java:169)
at org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
Slackbot
09/19/2022, 10:47 AMAdesh Dsilva
09/19/2022, 1:31 PMMartijn Visser
09/19/2022, 2:08 PMHenrik Feldt
09/19/2022, 2:43 PMSergey Postument
09/19/2022, 3:20 PMSylvia Lin
09/19/2022, 6:09 PM2022-09-18 01:30:24,154 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14733 (type=CHECKPOINT) @ 1663464623841 for job 18447894cc13c2736d1753cedc9a7bb4.
2022-09-18 01:30:24,538 INFO com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream [] - close closed:false <s3://instacart-emr/flink-streaming/event-router/checkpoints/18447894cc13c2736d1753cedc9a7bb4/chk-14733/_metadata>
2022-09-18 01:30:24,688 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 14733 for job 18447894cc13c2736d1753cedc9a7bb4 (424894 bytes in 781 ms).
2022-09-18 01:31:24,108 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14734 (type=CHECKPOINT) @ 1663464683841 for job 18447894cc13c2736d1753cedc9a7bb4.
2022-09-18 01:34:24,108 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 14734 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
2022-09-18 01:34:24,489 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14735 (type=CHECKPOINT) @ 1663464864113 for job 18447894cc13c2736d1753cedc9a7bb4.
2022-09-18 01:37:24,489 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 14735 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
2022-09-18 01:37:24,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14736 (type=CHECKPOINT) @ 1663465044489 for job 18447894cc13c2736d1753cedc9a7bb4.
2022-09-18 01:40:24,815 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 14736 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
2022-09-18 01:40:25,089 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14737 (type=CHECKPOINT) @ 1663465224815 for job 18447894cc13c2736d1753cedc9a7bb4.
2022-09-18 01:43:25,089 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 14737 of job 18447894cc13c2736d1753cedc9a7bb4 expired before completing.
2022-09-18 01:43:25,380 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 14738 (type=CHECKPOINT) @ 1663465405089 for job 18447894cc13c2736d1753cedc9a7bb4.