Kafka plug-in structure

Configuring a Kafka event source
A Generic event source that receives KafkaEvent type events can be configured in the following way:
Bootstrap Server
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required
The URL of the server where Kafka is running.
Topic
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required
The Kafka topic you want to communicate with.
Security protocol
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
  • Values:
    • PLAINTEXT
    • SASL_PLAINTEXT
    • SSL
    • SASL_SSL
  • Default: if no security protocol is specified, the default value is PLAINTEXT
The security protocol used to connect to the Kafka server.
identificationAlgo
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
The algorithm used to verify the hostname.
saslMechanism
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
  • Values:
    • PLAIN
    • GSSAPI
    • SCRAM-SHA-256
    • OAUTHBEARER
    • SCRAM-SHA-512
  • Default: if no SASL mechanism is specified, the default value is PLAIN
The mechanism used for simple authentication and security layer.
Username
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
The username used for authentication.
Password
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
  • isPassword
The password used for authentication.
pollInMS
  • Type: nonnegativeinteger
  • minlength: 1
  • maxlength: 1000
  • Default: if no value is specified for the interval, the default value is 100
You can define the interval between message polling instances.
kerberosPrincipal
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
The Kerberos principal name used for authentication.
kerberosKeytab
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
The Kerberos keytab file used for authentication.
kerberosService
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Required*
The Kerberos service name used for authentication.
groupID
  • Type: string
  • minlength: 1
  • maxlength: 1000
  • Default: if no group ID is specified, the default value is the name of the event source consumer
The ID of the consumer groups.
Note:
  • If the securityProtocol is set to SSL or SASL_SSL, the identificationAlgo is used to enable the hostname verification.
  • If the securityProtocol is set to SASL_PLAINTEXT or SASL_SSL, the saslMechanism is optional.
  • If the saslMechanism is set to PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512, the username and password are required.
  • If the saslMechanism is set to GSSAPI, the kerberosPrincipal, kerberosKeytab and kerberosService are required.
The following is an example of a generic type event source configured to receive KafkaEvents type events:
$eventsource

EVENT_SOURCE /Kafka1
DESCRIPTION "Kafka testing"
PLUGIN Kafka
TYPE Generic
CONFIGURATION {

      "bootstrapServer": "bootstrapservervalue",
      "topic": "topicvalue",
      "securityProtocol": "SASL_SSL",
      "identificationAlgo": "identificationAlgovalue",
      "saslMechanism": "GSSAPI",
      "kerberosPrincipal": "kerberosPrincipalvalue",
      "kerberosKeytab": "kerberosKeytabvalue",
      "kerberosService": "kerberosServicevalue",
      "pollInMs": "30"

         }
END
Kafka event condition
KafkaEvent type event conditions can be configured in job streams specifying the following required properties:
  • Name
  • Type
  • Event source
Note: When defining event conditions, all the relative fields are case insensitive
You can also specify any number of properties that filter the events received from the event source:
Message
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • minlength: 1
  • maxlength: 1000
  • Wildcard allowed: true
  • Multiple filters: false
  • Single predicate: true
  • Case sensitive: false
Specify the Kafka message body to enable filtering of events that either match or do not match the specified input.
Key
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • minlength: 1
  • maxlength: 1000
  • Wildcard allowed: true
  • Multiple filters: false
  • Single predicate: true
  • Case sensitive: false
Specify the key of the Kafka record to enable filtering of events that either match or do not match the specified input.
Partition
  • Type: string
  • Operator: EQ("=") or NE("!=")
  • minlength: 1
  • maxlength: 1000
  • Wildcard allowed: true
  • Multiple filters: false
  • Single predicate: true
  • Case sensitive: false
Specify the partition of the Kafka record to enable filtering of events that either match or do not match the specified input.
The following is an example of a job stream with two Kafka event conditions:
$jobstream

JOBSTREAM /WS_AGT_1#/JS_EVT_TRIGGER_KAFKA
  TRIGGER TRIGGER_KAFKA1 DESCRIPTION "Sample event trigger" TIMEOUT PT1M CONDITIONS
    NAME messagebody TYPE "Kafka/KafkaEvent" SOURCE /KAFKA1 FILTERS message = "test message"
    NAME KafkaKey TYPE "Kafka/KafkaEvent" SOURCE /KAFKA1 FILTERS key = mockKafkaKey
  ENDTRIGGER
:
END