Derivatives of KafkaConsumerService

Derivatives of KafkaConsumerService facilitates receiving messages from Kafka topics. As of release 12.1.2, Content Integration Framework leverages Kafka consumer service to receive domain object events & subsequently process those events based on the object attribute mappings.

Object event interpreter service (Kafka listener)

Just like Webhook support can facilitate event receipt over HTTP, Kafka listeners can help to receive events via topics. It is important to note that, regardless of event channel (HTTP/Kafka), the subsequent event processing is always done based on the object attribute mapping. The following are specialized classes & interfaces available for implementation of Kafka based event receivers.
  • com.hcl.unica.system.integration.service.object.event.AbstractKafkaEventInterpreterService<K, V extends IdentifiableObject>

    The K & V type parameters to AbstractKafkaEventInterpreterService represent the type of Key & Value components of the message received from Kafka topic. As of release 12.1.2, only String type of keys are supported, whereas value type must be subtype of IdentifiableObject. IdentifiableObject’s contract helps Content Integration Framework identify the type/category & identifier of the domain object corresponding to the event. If null is returned from getObjectType() implementation, then Content Integration Framework uses {service-name}.request as the category of corresponding domain object. Wherein, {service-name} is the actual name of the service declared in custom-plugin-services.yml file for respective event receiver/interpreter service. There is no standard service name for Kafka based event receiver/interpreter services. The service declaration should also include the eventProfile standard parameter –
    {service-name}
      params:
        eventProfile:
        label: Credit Card Transaction
    

The label parameter works as the title for respective domain object whenever it is shown on the object mapping screen.

Refer the com.example.service.kafka.events.consumer.ExampleKafkaEventInterpreterService class from asset-integration-starter project for additional clarity.

AbstractKafkaEventInterpreterService declares following methods which service implementation must define –
  1. Optional<ObjectEventDetails> interpret(ExecutionContext<KafkaMessage<String, DomainEntity>> executionContext)This method should interpret the event received via Kafka and possibly provide event details by means of ObjectEventDetails. The semantics of this method are exactly same as the semantics of interpret method required for Webhooks.

  2. boolean accept(ExecutionContext<KafkaMessage<String, DomainEntity>> executionContext)This is an optional method. If implemented, this method can determine the acceptance or rejection of the received event. By default, Content Integration Framework processes all the events successfully interpreted by interpret method. accept method is called prior to the interpretation. If certain event is not suitable for further consideration, this method can return false, thereby skipping its interpretation as well as processing.

  • com.hcl.unica.system.integration.service.kafka.KafkaMessage<K, V>

    KafkaMessage is supplied as request object to previously mentioned methods in AbstractKafkaEventInterpreterService. It encapsulates the key & value corresponding to the message received via Kafka topic.

  • com.hcl.unica.system.integration.service.object.event.GenericKafkaEventInterpreterServiceKafka based object event receipt & interpretation can also be done without creating any explicit service using AbstractKafkaEventInterpreterService. Content Integration Framework provides GenericKafkaEventInterpreterService for receiving and interpreting object events based on certain default assumptions. The service declaration in custom-plugin-services.yml file can be pointed to GenericKafkaEventInterpreterService and fully qualified Java type of associated event object can be declared as shown below
    systemId: Foo
      serviceName: transaction-event-receiver
      factoryClass: c.h.u.s.i.s.object.event.GenericKafkaEventInterpreterService
      params:
        eventProfile:
          class: com.example.service.kafka.events.consumer.DomainEntity
          label: Credit Card Transaction       
    

Wherein, class under the eventProfile service parameter should contain fully qualified name of the class representing domain object and/or event received via Kafka. The classes specified herein must implement IdentifiableObject. Presently there is no way to determine acceptance or rejection of events using this approach though.

Platform configuration

Each Kafka consumer service needs certain additional properties to be set in Platform configuration of corresponding system. Given below is the list of such additional properties. These properties must be listed in “Additional properties” section for respective system configuration. Note that Kafka connectivity details are obtained from Kafka configuration settings made for the respective system –
Parameter Description
{service-name}.kafka.topics Comma separated list of source topic names
{service-name}.kafka.topics.{topic-name}.value.format Expected message format. Supported formats are Avro & Json.
{service-name}.kafka.max-consumers Number of required concurrent consumers for the given service (inclusive of all the topics service would listen to)
{service-name}.kafka.fetch.max.wait.ms Same as standard fetch.max.wait.ms property used for Apache Kafka consumers
{service-name}.kafka.fetch.min.bytes Same as standard fetch.min.bytes property used for Apache Kafka consumers
{service-name}.kafka.fetch.max.bytes Same as standard fetch.max.bytes property used for Apache Kafka consumers
{service-name}.kafka.max.partition.fetch.bytes Same as standard max.partition.fetch.bytes property used for Apache Kafka consumers
{service-name}.kafka.max.poll.records Same as standard max.poll.records property used for Apache Kafka consumers
{service-name}.kafka.heartbeat.interval.ms Same as standard heartbeat.interval.ms property used for Apache Kafka consumers
{service-name}.kafka.session.timeout.ms Same as standard session.timeout.ms property used for Apache Kafka consumers
Note: Refer https://kafka.apache.org/documentation/#consumerconfigs for standard Kafka consumer configurations.

{service-name} represents the name of service declared in custom-plugin-services.yml file.

For example,

transaction-event-receiver.kafka.topics: topic1, topic2

transaction-event-receiver.kafka.topics.topic1.value.format: Avro

transaction-event-receiver.kafka.topics.topic2.value.format: Json

transaction-event-receiver.kafka.max-consumers: 20

transaction-event-receiver.kafka.fetch.max.wait.ms: 500

transaction-event-receiver.kafka.fetch.min.bytes: 26124

transaction-event-receiver.kafka.fetch.max.bytes: 31240

transaction-event-receiver.kafka.max.partition.fetch.bytes: 51124

transaction-event-receiver.kafka.max.poll.records: 200

transaction-event-receiver.kafka.heartbeat.interval.ms: 2000

transaction-event-receiver.kafka.session.timeout.ms: 9000

SASL prerequisites

For SASL connectivity with Kafka, provide appropriate authorization to consumer groups. Content Integration Framework creates consumer group for each configuration of respective system & assigns identifier to it as per below format -

ci-consumer-{Unica-product-platform-config-node}-{partition-name}-{system-id}-{service-name}

For example, for the event-receiver service in system Foo, following consumer groups will be created if Foo is configured in partition1 of Unica Centralized Offer Management (Offer) as well as in Unica Journey (Journey).

ci-consumer-offer-partition1-foo-event-receiver (for system configured under Affinium|Offer|partitions|partition1|assetPicker|dataSources|Foo node in Platform configuration)

ci-consumer-journey-foo-event-receiver (for system configured under Affinium|Journey|assetPicker|dataSources|Foo node in Platform configuration)

Note: Spaces in system identifier are replaced with dash(-) for composing Kafka consumer group ID.