Hello all, I am verifying a consumer (kafka consum...
# pact-js
z
Hello all, I am verifying a consumer (kafka consumer handler) and a producer ( a post API call to create an object). When I create the object tvia a post request, an event is triggered and my consumer consumes a Kafka message. I have used my project (SUT) existing handler in my consumer test :
describe("receive transfer event", () => {
it("accepts a transfer", () => {
return messagePact
.given("a new transfer")
.expectsToReceive(
"Action handlers consumes a new message in Kafka topic",
) // test description is the contract identifier and should match on cosumer side
.withContent(transferBodyExpectation)
.verify(asynchronousBodyHandler(actionHandler)); //existing SUT handler function
});
});
});
Here are my questions please : 1-Is my test strategy correct/acceptable according to contract testing? 2-When running my tests I have noticed this non blocking error, I am not sure what this error means :
ERROR (29508): pact@12.4.0: no handler found for message Action handlers consumes a new message in Kafka topic
3-Finally, my provider test is failing with this message :
1.1) has a matching body
$ -> Actual map is missing the following keys: body
I am not sure why it's complaining about key body when I have set it in expected body :
const transferBodyExpectation = {
from: like("***********"),
body: {
id: like("************"),
originator: { "*************") },
beneficiary: { "*************") },
asset: like("*************"),
amountSubunits: like("*************"),
The message I am expecting from Kafka is this one : Note that I am ignoring some attributes such as to, type because they are not added by action handler map function but added by another protocol that I am not testing for now.
{
"id": "***********",
"from": "***********",
"to": [
"***********"
],
"type": "***********",
"body": {
m
The error seems clear to me. What is your provider sending? It looks like you haven't mapped the message producer in your verification code. What does that code look like?
z
Hey Matt! the mapping is based on the test description right?
here is my code :
consumer :
describe("receive transfer event", () => {
it("accepts a transfer", () => {
return messagePact
.given("a new transfer")
.expectsToReceive("a request for a new transfer") // test description is the contract identifier and should match on ion cosumer side
.withContent(transferBodyExpectation)
.verify(asynchronousBodyHandler(actionHandler));
});
});
});
z
Producer :
describe("a request for a new transfer", () => {
// test description is the contract identifier and should match on ion cosumer side
it("sends a valid transfer", () => {
return p.verify();
});
});
});
m
Yeah you haven't mapped the scenarios.
See the examples / docs for how to map the event producers to each scenario
1
That's why you got an error above about "no handler for ..."
z
My bad, the reason was that pacts where not generated in the right folder. The mapping was fine 🙂
👍 1
m
Ah ok, the mapping wasn't present in your code snippet above. Glad you're back on track!
z
Thanks, I am still a bit confused though on the final step : "verifying the provider". As explained earlier on consumer side I am using the SUT existing handler function this function consumes a message from Kafka by reading a stream. and this steps passes as expected. On the provider side though, I am creating an object through a POST request call. Once the object is created then an event is triggered then a message is added to Kafka topic then should be consumer by my consumer app. My provider test is failing on matching body check. I am wondering if this error is simply cause by my test code or my whole setup is incorrect and maybe I should not use an API call as my provider? As the response I get from a post request does not match the message to be consumed by Kafka. The message to be consumed is transformed in between.
Here is the error I am getting when running provider test :
) Verifying a pact between MyActionHandlerConsumer and MyTransferProvider Given a new transfer - a request for a new transfer
1.1) has a matching body
$ -> Actual map is missing the following keys: agents, amountSubunits, asset, beneficiary, originator
-{
"agents": [
{
"@id": "***********************",
"for": "***********************",
"role": "***********************"
}
],
"amountSubunits": "***********************",
"asset": "***********************",
"beneficiary": {
"***********************"
},
"originator": {
"***********************"
}
}
+{}
I have put on Log level to trace and found out that the actual message is empty :
actual message = AsynchronousMessage { id: None, key: None, description: "Asynchronous/Message Interaction", provider_states: [], contents: MessageContents { contents: Present(b"{}", Some(ContentType { main_type: "application", sub_type: "json", attributes: {"charset": "utf-8"}, suffix: None }), None), metadata: {"contentType": String("application/json; charset=utf-8")}, matching_rules: MatchingRules { rules: {} }, generators: Generators { categories: {} } }, comments: {}, pending: false, plugin_config: {}, interaction_markup: InteractionMarkup { markup: "", markup_type: "" }, transport: None }
I can see the message created in Kafka UI topic. still debugging but if someone can help please get in touch.
m
I’m a bit confused how your provider setup is happening. Can you please share your provider verification code?
I am wondering if this error is simply cause by my test code or my whole setup is incorrect and maybe I should not use an API call as my provider? As the response I get from a post request does not match the message to be consumed by Kafka. The message to be consumed is transformed in between.
you should be registering a producer function in a message test, the verifier won’t be talking to an HTTP endpoint for this
z
Hello Matt, here is my provider setup :
Copy code
describe("Transfers provider tests", () => {
  const p = new MessageProviderPact({
    messageProviders: {
      "a request for a new transfer": providerWithMetadata(
        //should match expect to receive on consumer side
        () => createTransfer("af7625bb-b272-41c8-988a-5372d100346d"), // note this is a post REST API call
        { queue: "" },
      ),
    },
    stateHandlers: {
      "some state": () => {
        // TODO: prepare system useful in order to create a transfer
        console.log('State handler: setting up "some state" for interaction');
        return Promise.resolve(`state set to create a transfer`);
        // we can add VASP creation here
      },
    },
    logLevel: LOG_LEVEL as LogLevel,
    provider: "MyTransferProvider",
    providerVersion: versionFromGitTag(),
    //Broker validation
    pactBrokerUrl: "<https://notabene.pactflow.io/>",
    pactBrokerToken: process.env.PACT_BROKER_TOKEN || "************",
    providerVersionBranch: process.env.GIT_BRANCH || "pact-integration", // use an env var later for the brancg name
    publishVerificationResult: true,

    // Find _all_ pacts that match the current provider branch
    consumerVersionSelectors: [
      {
        matchingBranch: true,
      },
    ],
  });

  describe("send a create transfer request", () => {
    it("creates a new transfer", () => {
      return p.verify();
    });
  });
});
as you mentioned was trying ti set up a kafka message handler from consumer side and REST API from the producer side. If I understand well what you said this won't work?
I should replace the REST API with the producer function writing into the topic?
for now I have created a separate handlers function to validate messages, I guess this is not ideal. Meanwhile I will check with devs in my team on how to integrate the producer function. By looking at the code I see that the protocol and the handler function are not separated.
m
Thanks
() => createTransfer(“af7625bb-b272-41c8-988a-5372d100346d”), // note this is a post REST API call
so
createTransfer(…)
needs to return a JSON blob that matchers whatever is in the consumer contract. If it’s not, then that’s why it will fail
I should replace the REST API with the producer function writing into the topic? (edited)
yes, that’s correct
you shouldn’t need to talk to Kafka at all in the tests (either side)
z
Thanks Matt, got you. Our SUT uses : https://www.npmjs.com/package/kafka-streams to create factories to run streams in order to read and write. The current code is not fitted to be called inside a synchronousBodyHandler on consumer side neither on providerWithMetadata provider side. We are currently looking on how to refactor our code to make it PACT compatible. Any advices/suggestions will be welcomed.
Hello again, we have worked on refactoring our code but currently I am facing an issue related to the function signature of my handler. I am wondering if i can find a PACT solution as I cannot modify my SUT handler further. as mentioned already we are using kafak-streams. js library. My current function is the following :
Copy code
export async function actionHandler(
  actions: Stream<object>,
  events: (s: Stream<any>) => Stream<any>,
  outbox: (s: Stream<any>) => Stream<any>,
) {
............;
.............
.............;
return mergeArray([actionsToEvents, actionsToOutbox]);
This function returns the following type :
(alias) mergeArray<Stream<any>[]>(streams: Stream<any>[]): Stream<any>
import mergeArray
_@param_ streams — array of stream to merge
@returns
stream containing events from all input observables in time order. If two events are simultaneous they will be merged in arbitrary order.
Now when calling the function in asynchronousBodyHandler obviously I have a signature error as the asynchronousBodyHandler expects a
(body: AnyJson | Buffer): Promise<unknown>'
and not a
'Promise<Stream<any>>'
So in this case is there a way to still use asynchronousBodyHandler to verify that my consumer can consume the provided message, or is there another function available by PACT that takes streams as a param instead? Or should I implement my own function? (asking that to no implement my own function in case an alternative already exists).
m
there is no Pact function that takes a stream. I think you’ll need to create your own function
z
Hello Matt I have successfully completed the consumer test, moving to the provider. My provider starts by sending an API post request ti create an object. That API call will then trigger a function to publish the created message into a kafka-topic. My question here is should I got through the API in my test provider or should I directly target the function that publishes the message in KAFKA? I am asking since in the message example in pact js it seems that it's : https://github.com/pact-foundation/pact-js/blob/master/examples/messages/provider/message-provider.spec.ts going through the API
m
This is example doesn’t test an API - it directly registers a function (e.g. the “directly target the function”): https://github.com/pact-foundation/pact-js/blob/master/examples/messages/provider/message-provider.spec.ts#L14 It’s OK if you have an API if that’s the easiest way to make it work, but ideally you hit the fn directly
z
Thanks Matt, in my case the provider function would be the one that write a Stream to a specific kafka topic 🙂 that's what I am trying to implement now.
in that was do I still need to mock kafka also in provider test? or is it okay to use real Kafak and write to real topic?
s my understanding is that when we test the provider we use real services not a mock
unless what you mean is that we target the function itself and therefor we still mock kafka, that way no message would be written at all
☝️ 1
1. From pact docs : The provider should stub out its dependencies during a Pact test, to ensure tests are fast and more deterministic.
1
I guess I should stub Kafka and any other dependency and stick with provider target function. Please confirm when you have time.
👍 1