I see that for the streaming file sink you can ```...
# troubleshooting
a
I see that for the streaming file sink you can
Copy code
OutputFileConfig config = OutputFileConfig
 .builder()
 .withPartPrefix("prefix")
 .withPartSuffix(".ext")
 .build();
but I have written the file sink using SQL syntax. How can I make this generate files with suffix
.parquet
?
Copy code
CREATE TABLE save_polystar_aggregates_core (
...
) 
PARTITIONED BY (EVENT_DAY, EVENT_HOUR)
WITH (
  'connector' = 'filesystem',
  'path' = 's3://.../core',
  'format' = 'parquet',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='success-file',
  'sink.partition-commit.trigger'='partition-time'
);
Also, can I change the directory names from
Copy code
path
└── EVENT_DAY=2019-08-25
syntax to
Copy code
path
└── 2019-08-25
?
s
AFAIK it’s not available in the Table API. You need to either use the DataStream API or patch Flink.
d
I'll just mention that it's pretty easy to attach a DataStream sink to a pipeline that's otherwise implemented using the Table API.
a
@David Anderson Yes, I thought I’d do that if there’s no direct way.
In the grand scheme of things, Flink functions effectively; however, navigating its coding process can be as challenging as guiding a ball through a labyrinth game, owing to inconsistencies in the APIs and/or documentation.
d
Yes, that’s fair. That’s one outcome of a successful open source project being developed collaboratively across several different organizations — over time it gets pulled in different directions. We should try to improve the docs, but that’s challenging to get done.
a
A couple of places hit my productivity especially badly. As I needed to use pyflink due to company policy, and I need quantile calculation due to analyst requirements, and there’s a few terabytes of data coming daily through Kafka, • I implemented quantile calculation using the Java version of the t-digest algorithm, i.e. wrote a couple of classes that interfaces with the reference implementation. • I write the actual code with a combination of pyflink and straight Flin SQL. some problems..
Using the default settings, writing streaming data as parquet files to S3 produces nothing at all in S3. It started working when we figured out how to configure checkpointing.
Writing the classes for t-digest interfacing was painful when I needed to add type annotations by trial and error.
I first tried python UDFs, but I believe you can’t interface with t-digest as they require the aggregate to be a data type Flink knows about, instead of raw.
In my opinion, the best help you could provide to new users would be to have full example base projects doing useful tasks, which they could modify to suit their needs.
Ah, I almost forgot to mention that the main documentation does not mention that you can define a Java class as a temporary system function in Python. It’s in the Java section, but the needed Python function is not.
d
Small, targeted PRs to fill important holes (and fix mistakes) in the documentation are certainly welcome. Ping me if something needs a review.
m
Hey guys, sorry for barging into the conversation, but do any of you have an example of creating a parquet sink without using avro? I’m having a hard time figuring out how to do that
d
I answered this in its own thread.
🙏 1