This message was deleted.
# general
s
This message was deleted.
n
@Christophe Bornet is the author of PIP-193 and can help here
j
Great. I'm working on using a transform function to transform from our protobuf to String for saving with the ElasticSearchSink
I think I have it mostly working, though I'm having some schema issues between the function and the sink I think.
I am getting an error in the ElasticSearchSink that says
Record does not carry message information
however when I'm looking at the transform function output, it is always producing something. I'm producing my record like this:
Copy code
return context.newOutputRecordBuilder(Schema.STRING)
                .key(id)
                .value(jsonObject.toString())
                .properties(properties)
                .build();
which follows the function example pretty closely. Any help
c
This is because you have
schemaEnable
to false in the ES Sink config, so it tries to write the raw bytes data of the message. But as you have a transform, there's no serialized message. Is it possible for you to set
schemaEnable
to true ? This is a much better option as it uses the record instead of the message. In general, sinks should use the record abstraction, not the message. The default of this config option is false only for backward compatibility reasons.
The error message could probably be improved.
BTW, have you checked https://github.com/datastax/pulsar-transformations ? There's a
cast
operation that could be used to transform to STRING. At the moment, Protobuf is not supported but the pieces are here, just need to add the conversion logic. If you'd like to contribute by a PR, I would be very happy to help.
j
Thanks for the help. I can't use schema enabled because all of our schemas are protobuf. My transform function outputs a json object as a string that is properly formatted for our ES indexes.
I can handle it all as bytes very easily, but when I tried that I think I got a different set of errors...maybe it was schema bytes not supported.
The transform function uses a byte[] input already because it handles multiple types with a hint in the properties to properly convert to json.
c
If you transform your protobuf to schema STRING with your transform function, I think it should work with schemaEnable
j
Ok, thanks! I'll give that a try. I'm out of the office today, but I'll get after that on Monday morning.
I tried turning on schemaEnable this morning with my existing setup (separate proto -> json function (byte[] -> String) ), and the ElasticSearchSink wasn't thrilled about it. https://github.com/apache/pulsar/blob/b7f0004313ea4565717cc6d3c0b99aee5c079c6c/pul[…]/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java that seems to show that really only JSON or AVRO schemas are supported in the sink. It seems like it would be an easy lift to support STRING or BYTES (as long as they both contained utf-8 String data that could be parsed as valid JSON).
Copy code
java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.pulsar.client.api.schema.GenericObject (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.pulsar.client.api.schema.GenericObject is in unnamed module of loader 'app')
	at org.apache.pulsar.io.elasticsearch.ElasticSearchSink.extractIdAndDocument(ElasticSearchSink.java:197) ~[8HdTpyUyRQuezR6JAN4j-g/:?]
	at org.apache.pulsar.io.elasticsearch.ElasticSearchSink.write(ElasticSearchSink.java:113)
That is specifically the error I'm getting.
My function is publishing a Record<String>, I'm going to switch it to a key, value and see if that cleans this up.
c
You're right, the ES Sink only supports JSON and AVRO atm...
So it seems the ES Sink could be improved to support more schema types, and records that don't have a message when schemaEnable is false.
j
I have a branch where I've done the basic work. All of the existing tests pass, I'm just working on how to best test my changes within the existing test framework
I've finally found the time to finish testing my changes and have created an issue (https://github.com/apache/pulsar/issues/20619) to track them. I'm going to submit the PR momentarily.