Derivatives of KafkaConsumerService
Derivatives of KafkaConsumerService facilitates receiving messages from Kafka topics. As of release 12.1.2, Content Integration Framework leverages Kafka consumer service to receive domain object events & subsequently process those events based on the object attribute mappings.
Object event interpreter service (Kafka listener)
-
com.hcl.unica.system.integration.service.object.event.AbstractKafkaEventInterpreterService<K, V extends IdentifiableObject>
The K & V type parameters toAbstractKafkaEventInterpreterService
represent the type of Key & Value components of the message received from Kafka topic. As of release 12.1.2, only String type of keys are supported, whereas value type must be subtype ofIdentifiableObject
.IdentifiableObject’s
contract helps Content Integration Framework identify the type/category & identifier of the domain object corresponding to the event. If null is returned fromgetObjectType()
implementation, then Content Integration Framework uses{service-name}
.request as the category of corresponding domain object. Wherein, {service-name} is the actual name of the service declared in custom-plugin-services.yml file for respective event receiver/interpreter service. There is no standard service name for Kafka based event receiver/interpreter services. The service declaration should also include theeventProfile
standard parameter –{service-name} params: eventProfile: label: Credit Card Transaction
The label parameter works as the title for respective domain object whenever it is shown on the object mapping screen.
Refer the
com.example.service.kafka.events.consumer.ExampleKafkaEventInterpreterService
class from asset-integration-starter
project for additional clarity.
AbstractKafkaEventInterpreterService
declares following methods which
service implementation must define –-
Optional<ObjectEventDetails> interpret(ExecutionContext<KafkaMessage<String, DomainEntity>> executionContext)
This method should interpret the event received via Kafka and possibly provide event details by means ofObjectEventDetails
. The semantics of this method are exactly same as the semantics ofinterpret
method required for Webhooks. -
boolean accept(ExecutionContext<KafkaMessage<String, DomainEntity>> executionContext)
This is an optional method. If implemented, this method can determine the acceptance or rejection of the received event. By default, Content Integration Framework processes all the events successfully interpreted byinterpret
method.accept
method is called prior to the interpretation. If certain event is not suitable for further consideration, this method can return false, thereby skipping its interpretation as well as processing.
com.hcl.unica.system.integration.service.kafka.KafkaMessage<K, V>
KafkaMessage is supplied as request object to previously mentioned methods in
AbstractKafkaEventInterpreterService
. It encapsulates the key & value corresponding to the message received via Kafka topic.com.hcl.unica.system.integration.service.object.event.GenericKafkaEventInterpreterService
Kafka based object event receipt & interpretation can also be done without creating any explicit service usingAbstractKafkaEventInterpreterService
. Content Integration Framework providesGenericKafkaEventInterpreterService
for receiving and interpreting object events based on certain default assumptions. The service declaration incustom-plugin-services.yml
file can be pointed toGenericKafkaEventInterpreterService
and fully qualified Java type of associated event object can be declared as shown belowsystemId: Foo serviceName: transaction-event-receiver factoryClass: c.h.u.s.i.s.object.event.GenericKafkaEventInterpreterService params: eventProfile: class: com.example.service.kafka.events.consumer.DomainEntity label: Credit Card Transaction
Wherein, class under the eventProfile service parameter should contain fully qualified name of the class representing domain object and/or event received via Kafka. The classes specified herein must implement IdentifiableObject. Presently there is no way to determine acceptance or rejection of events using this approach though.
Platform configuration
Parameter | Description |
{service-name}.kafka.topics | Comma separated list of source topic names |
{service-name}.kafka.topics.{topic-name}.value.format | Expected message format. Supported formats are Avro & Json. |
{service-name}.kafka.max-consumers | Number of required concurrent consumers for the given service (inclusive of all the topics service would listen to) |
{service-name}.kafka.fetch.max.wait.ms | Same as standard fetch.max.wait.ms property used for Apache Kafka consumers |
{service-name}.kafka.fetch.min.bytes | Same as standard fetch.min.bytes property used for Apache Kafka consumers |
{service-name}.kafka.fetch.max.bytes | Same as standard fetch.max.bytes property used for Apache Kafka consumers |
{service-name}.kafka.max.partition.fetch.bytes | Same as standard max.partition.fetch.bytes property used for Apache Kafka consumers |
{service-name}.kafka.max.poll.records | Same as standard max.poll.records property used for Apache Kafka consumers |
{service-name}.kafka.max.poll.interval.ms | Same as standard max.poll.interval.ms property used for Apache Kafka consumers |
{service-name}.kafka.heartbeat.interval.ms | Same as standard heartbeat.interval.ms property used for Apache Kafka consumers |
{service-name}.kafka.session.timeout.ms | Same as standard session.timeout.ms property used for Apache Kafka consumers |
{service-name} represents the name of service declared in custom-plugin-services.yml file.
For example,
transaction-event-receiver.kafka.topics: topic1, topic2
transaction-event-receiver.kafka.topics.topic1.value.format: Avro
transaction-event-receiver.kafka.topics.topic2.value.format: Json
transaction-event-receiver.kafka.max-consumers: 20
transaction-event-receiver.kafka.fetch.max.wait.ms: 500
transaction-event-receiver.kafka.fetch.min.bytes: 26124
transaction-event-receiver.kafka.fetch.max.bytes: 31240
transaction-event-receiver.kafka.max.partition.fetch.bytes: 51124
transaction-event-receiver.kafka.max.poll.records: 200
transaction-event-receiver.kafka.heartbeat.interval.ms: 2000
transaction-event-receiver.kafka.session.timeout.ms: 9000
SASL prerequisites
For SASL connectivity with Kafka, provide appropriate authorization to consumer groups. Content Integration Framework creates consumer group for each configuration of respective system & assigns identifier to it as per below format -
ci-consumer-{Unica-product-platform-config-node}-{partition-name}-{system-id}-{service-name}
For example, for the event-receiver
service in system
Foo
, following consumer groups will be created if
Foo
is configured in partition1
of Unica
Centralized Offer Management (Offer
) as well as in Unica Journey
(Journey
).
ci-consumer-offer-partition1-foo-event-receiver
(for system
configured under
Affinium|Offer
|partitions|partition1
|assetPicker|dataSources|Foo
node in Platform configuration)
ci-consumer-journey-foo-event-receiver
(for system configured under
Affinium|Journey
|assetPicker|dataSources|Foo
node in Platform configuration)