Kafka integration

cornichon-kafka offers a support for Kafka v2.0.0

The kafka client is shared at the feature level by all the scenarios and is configured with a fixed group-id to ‘cornichon-groupId’ and is set with offset-reset to ‘earliest’.

Due to the architecture of kafka and the handling of consumers offsets, the default execution of scenarios is sequential.

  • Comprehensive example
import com.github.agourlay.cornichon.CornichonFeature
import com.github.agourlay.cornichon.kafka.KafkaDsl

class KafkaExample extends CornichonFeature with KafkaDsl {

  override lazy val kafkaBootstrapServersHost: String = "localhost"
  override lazy val kafkaBootstrapServersPort: Int = 9092

  def feature = Feature("Kafka DSL") {

    Scenario("Can write and read arbitrary Strings to/from topic") {
      Given I put_topic(
        topic = "cornichon",
        key = "success",
        message = "I am a plain string"
      )

      When I read_from_topic(
        topic = "cornichon",
        timeoutMs = 200,
        atLeastAmount = 1
      )

      Then assert kafka("cornichon").topic_is("cornichon")
      Then assert kafka("cornichon").key_is("success")
      Then assert kafka("cornichon").message_value.is("I am a plain string")
    }
    
    Scenario("Can use cornichon jsonAssertions on the message value") {
      Given I put_topic(
        topic = "cornichon",
        key = "json",
        message = """{ "coffee": "black", "cornichon": "green"}"""
      )

      When I read_from_topic("cornichon")
      Then assert kafka("cornichon").topic_is("json")
      Then assert kafka("cornichon").message_value.ignoring("coffee").is("""
        {
          "cornichon": "green"
        }
        """
       )
    }

  }
}

Note that this dsl always return the latest amount of messages found on the topic. The consumer polls timeoutMs until it does not find any new messages anymore