Kafka plug-in structure

Configuring a Kafka event source
A Generic event source that receives KafkaEvent type events can be configured in the following way:
Bootstrap Server
  • Type: string
  • Multiple filters
  • Required
  • Single predicate
The URL of the server where Kafka is running.
Topic
  • Type: string
  • Multiple filters
  • Required
  • Single predicate
The Kafka topic you want to communicate with.
Security protocol
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
  • Values:
    • PLAINTEXT
    • SASL_PLAINTEXT
    • SSL
    • SASL_SSL
  • Default: if no security protocol is specified, the default value is PLAINTEXT
The security protocol used to connect to the Kafka server.
identificationAlgo
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The algorithm used to verify the hostname.
saslMechanism
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
  • Values:
    • PLAIN
    • GSSAPI
    • SCRAM-SHA-256
    • OAUTHBEARER
    • SCRAM-SHA-512
  • Default: if no SASL mechanism is specified, the default value is PLAIN
The mechanism used for simple authentication and security layer.
Username
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The username used for authentication.
Password
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The password used for authentication.
pollInMS
  • Type: nonnegativeinteger
  • Multiple filters
  • Single predicate
  • Default: if no value is specified for the interval, the default value is 100
You can define the interval between message polling instances.
kerberosPrincipal
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The Kerberos principal name used for authentication.
kerberosKeytab
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The Kerberos keytab file used for authentication.
kerberosService
  • Type: string
  • Multiple filters
  • Required*
  • Single predicate
The Kerberos service name used for authentication.
groupID
  • Type: string
  • Multiple filters
  • Single predicate
  • Default: if no group ID is specified, the default value is the name of the event source -consumer
The ID of the consumer groups.
Note:
  • If the securityProtocol is set to SSL or SASL_SSL, the identificationAlgo is used to enable the hostname verification.
  • If the securityProtocol is set to SASL_PLAINTEXT or SASL_SSL, the saslMechanism is optional.
  • If the saslMechanism is set to PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512, the username and password are required.
  • If the saslMechanism is set to GSSAPI, the kerberosPrincipal, kerberosKeytab and kerberosService are required.
The following is an example of a generic type event source configured to receive KafkaEvents type events:
$eventsource

EVENT_SOURCE /Kafka1
DESCRIPTION "Kafka testing"
PLUGIN Kafka
TYPE Generic
CONFIGURATION {

      "bootstrapServer": "bootstrapservervalue",
      "topic": "topicvalue",
      "securityProtocol": "SASL_SSL",
      "identificationAlgo": "identificationAlgovalue",
      "saslMechanism": "GSSAPI",
      "kerberosPrincipal": "kerberosPrincipalvalue",
      "kerberosKeytab": "kerberosKeytabvalue",
      "kerberosService": "kerberosServicevalue",
      "pollInMs": "30"

         }
END
Kafka event condition
KafkaEvent type event conditions can be configured in job streams specifying the following required properties:
  • Name
  • Type
  • Event source
You can also specify any number of porperties that filter the events received from the event source:
Message
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • Wildcard allowed
  • Single predicate
Specify the Kafka message body to enable filtering of events that either match or do not match the specified input.
Key
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • Wildcard allowed
  • Single predicate
Specify the key of the Kafka record to enable filtering of events that either match or do not match the specified input.
Partition
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • Wildcard allowed
  • Single predicate
Specify the partition of the Kafka record to enable filtering of events that either match or do not match the specified input.
The following is an example of a job stream with two Kafka event conditions:
TRIGGER MY_TRIGGER DESCRIPTION "my event trigger" TIMEOUT 1000 TRIGGER_ON_TIMEOUT CORRELATIONS A, A1
    CONDITIONS IN ORDER
    NAME messagebody  TYPE plugin/Kafka    SOURCE MY_EVENT_SOURCE FILTERS MESSAGE = test message
    NAME KafkaKey     TYPE plugin/Kafka    SOURCE MY_EVENT_SOURCE FILTERS KEY = mockKafkaKey