Shrihari R
09/13/2023, 5:15 AMHi All,
I Need some help in writing a flink datastream to google cloud storage with dynamic path.
I want to keep the output path partitioned by yyyymmdd and hour. writing part is working fine, but i am not able to achieve partitioning here.gcs writing snippet
Date currentDate = new Date(System.currentTimeMillis());
String yyyymmdd = yyyyMmDdFormatter.format(currentDate);
String hh = hHFormatter.format(currentDate);
String hhMm = hHMmFormatter.format(currentDate);
String quarterHour = getQuarterHour(hhMm);
dataStream.writeAsText(surgeSignalsSourceTopic + "/yyyymmdd=" + yyyymmdd + "/hh=" + hh + "/quarter_hour=" + quarterHour);
Shrihari R
09/16/2023, 5:44 AMString path = "<gs://temp_bucket>"
final FileSink<JsonObject> sink = FileSink
.forRowFormat(new Path(path),
new SimpleStringEncoder<JsonObject>("UTF-8"))
.withBucketAssigner(new EventTimeBucketAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();
dataStream.sinkTo(sink);
public static class EventTimeBucketAssigner implements BucketAssigner<JsonObject, String> {
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
@Override
public String getBucketId(JsonObject event, BucketAssigner.Context context) {
String yyyymmdd = event.get("event_yyyymmdd").getAsString();
String hh = event.get("event_hh").getAsString();
String quarterHour = event.get("quarter_hour").getAsString();
String bucketId = "yyyymmdd=" + yyyymmdd + "/hh=" + hh + "/quarter_hour=" + quarterHour;
return bucketId;
}
}