Adapter commands for consumers

Kafka adapter commands for consumers are valid for input data sources. For additional details, see the consumer configuration information in the Apache Kafka documentation.

Group ID

Specifies the unique string that identifies the consumer group that a consumer belongs to. This property enables Kafka group management, which is used when the -TP command specifies only topic names without topic partitions. The default is a null string. The corresponding adapter command is -GID groupID (or -GROUPID groupID).

Related Kafka consumer configuration: group.id

Offset Commit Strategy

Specifies the policy that the adapter uses to commit an offset when it consumes a message. The corresponding adapter command is -OCS (or -OFFSETCOMMITSTRATEGY).

The policy is one of the following:
  • Manual: The Kafka adapter commits the offset when it commits the transaction, as defined by the Transaction > Scope setting on the input card of the map. This is the default.
  • Auto: The Kafka cluster periodically commits the offset.
  • Never: Neither the Kafka cluster nor the Kafka adapter commits the offset.

Offset Commit Retry Count

Specifies the number of times to ignore offset commit failures and proceed consuming messages. The corresponding adapter command is -OCRC (-OFFSETCOMMITRETRYCOUNT).

When the offset commit operation fails and the specified count is not reached, the adapter increments the internal counter of failed commits and proceeds to consume messages as if the commit was successful. If the error topic was specified and the value for this command is different from 0, the adapter sends the error message with the topic, partition, and offset information of the failed commit operation to the error topic. The adapter retries to commit the pending offsets again the next time it is scheduled to performs the commit operation. The default value for this command is 0, which instructs the adapter to treat offset commit failures as fatal severity errors and not attempt retries. The special value S is supported and is used to specify unlimited number of retries.

Skip Duplicates

Instructs the adapter to keep track of all messages that it consumed in the current process, and to ignore messages that it has previously consumed. The corresponding adapter command is -SD (-SKIPDUPLICATES).

q

If the error topic is configured, the adapter sends error messages indicating detected duplicates messages (their topic, partition and offset) to the error topic.

-OFFSETCOMMITTRETRYCONSTRAINT value [value] -OCC value [value]

Specifies constraints the adapter must meet when committing pending offsets. One of the following two values can be specified, or both can be specified, separated by a space:

apo: Stands for Assigned Partitions Only and means that the adapter consumer should commit only those pending offsets that belong to the partitions currently assigned to it. The offsets that belong to a partition that the consumer owned at some earlier point but that has subsequently been assigned to another consumer are skipped.

ssc: Stands for Skip Stale Commits and means that the adapter should not commit pending offsets on a partition that are older than the currently committed offsets on the same partition, for the same consumer group. Such old offsets are considered stale and are possible if the consumer loses a partition due to partition rebalancing, and another consumer in the same consumer group is assigned the partition and consumes and commits offsets past the last offset consumer by the original consumer. If the original consumer later regains ownership of the partition, the offsets that it still considers pending for that partition will be stale. If this constraint is specified and the consumer detects stale offsets, it does not commit them and it discards (forgets) them.

Error Message Format Version

Specifies the format of messages sent to the error topic. The corresponding adapter command is -EMFV version (or -ERRORMESSAGEFORMATVERSION version).

The supported values are:
  • 1: The default version, each message is in format t:p-o where t, p and o are the topic, partition and offset of the consumed message for which the processing failed and for which the error record is produced to the error topic
  • 2: Each message is in JSON format: {"topic":"t","partition":"p","offset":"o","code":"c"} where t, p and o are the topic, partition and offset values of the consumed message for which the processing failed, and c is the error code indicating the reason for the failure, and is one of the following:
  • DUPLICATE: Indicates the messages was skipped because it was already consumed earlier for the same consumer group in the current process, and -SKIPDUPLICATES (-SD) adapter command was specified COMMIT_FAILED - indicates the messages was processed successfully, but the offset commit operation for it failed ROLLBACK - indicates that an error was reported while processing the specified message, for example when performing database write operation in one of the map outputs in the same transaction scope in which the message was consumed.

Fetch Minimum Bytes

Specifies the minimum amount of data, in bytes, that must accumulate before the server returns the data to the consumer. The default data size is one byte, which means the server responds to a data request as soon as any data is available to return. The corresponding adapter command is -FMIB size (or - FETCHMINBYTES size).

Related Kafka consumer configuration: fetch.min.bytes

Fetch Maximum Bytes

Specifies the maximum amount of data, in bytes, that the server should return for a fetch request. The corresponding adapter command is -FMAB size (or - FETCHMAXBYTES size).

Related Kafka consumer configuration: fetch.max.bytes

Auto Offset Reset

The offset position to automatically use when no previous offset exists. The corresponding adapter command is -AOR (-AUTOOFFSETRESET).

The offset is one of the following:
  • Latest: Starts consuming new messages from the most recent offset. This is the default.
  • Earliest: Starts consuming from the oldest available record by automatically resetting the offset to the earliest offset.
  • None: Reports an error.

Related Kafka consumer configuration: auto.offset.reset

Synchronized

Specifies that the flow engine listener thread is to operate synchronously. The listener waits to notify a map of an event (a message added to a topic) until the map acknowledges that it processed the previous event. The corresponding adapter command is -SYNC (or -SYNCHRONIZED).

If the map fails, the listener does not report subsequent events unless the -ETP command is specified. With the -ETP command, when the flow engine listener records the failed event on the error topic, the listener is unblocked and proceeds to report new events.

Error Topic

Specifies the name of the error topic where messages that result in an error are recorded. The corresponding adapter command is -ETP topic (or -ERRORTOPIC topic).
  • The Command Server records the error at the time of transaction rollback. In a map, the input card Transaction > Scope setting controls when a transaction failure is processed by the adapter.
  • The flow engine records the error when the listener thread is blocked because it's waiting for the status of the event processing, as specified by the -SYNC command.

After recording the error, the listener resumes processing new events.

Isolation Level

Controls the visibility of messages that are part of a transaction. The corresponding adapter command is -IL (or -ISOLATIONLEVEL).
  • read_committed: Only messages from transactions that are committed (and messages that were not part of a transaction) are visible to consumers. This is the default.
  • read_uncommitted: All messages are visible to consumers, even if they were part of an aborted transaction.

Related Kafka consumer configuration: isolation.level

Quantity

Specifies the number of messages to consume, or S to consume all available messages. The corresponding adapter command is -QTY (or -QUANTITY).

The default value is 1. If you specify a value other than 1, you must set the FetchAs input card setting to Burst and the FetchUnit input card setting to 1 in the map.

Listen

Specifies the wait time for messages. The corresponding adapter command is -LSN (-LISTEN)
  • Seconds: The number of seconds that the consumer waits for messages to arrive.
  • S: Wait for an unlimited time for messages to arrive. This is the default.
  • 0: Consume all available messages and do not wait for new messages to arrive.

Logical Message Count

Specifies the logical message count. Specifies the number of Kafka messages to concatenate and return as a single logical message from the adapter. By default, the adapter returns each Kafka message as a separate logical message. To concatenate all available messages, specify 0. The corresponding adapter command is -LMC count (or -LOGICALMESSAGECOUNT count).

This command is valid only in logical message mode (-LMM command). Logical message mode is not valid in a flow engine scenario.

Logical Message Bytes

Logical message buffer-size limit, in bytes. The adapter buffers messages until it exceeds this limit, then returns all buffered messages as a single logical message. The corresponding adapter command is -LMS bytes (or -LOGICALMESSAGESIZE bytes)

This command is valid only in logical message mode (-LMM command). Logical message mode is not valid in a flow engine scenario.

Avro JSON Conversion Mode

Specifies the mode for converting Avro records to JSON representation. The default mode is Simple (case-insensitive) and results in producing the default simple JSON. The other mode is Strict (case-insensitive) and results in producing JSON in compliance with the Avro schema. Avro schema must be specified when using Strict mode. When this command is omitted, the default value is Simple. In most cases, the two modes will produce equivalent JSON representation, but in some cases, such as when the Avro schema contains elements of union types, the JSON produced in Strict mode will be more verbose. That will make it less convenient for processing, but it will allow converting it back to Avro in compliance with the same Avro schema.

The corresponding adapter command is -AJCM mode (or -AVROJSONCONVERSIONMODE mode).

Avro JSON Encoder Class Name

Specifies the Java class to use for converting Avro to JSON representation. This command is optional and if not specified the default class org.apache.avro.io.JsonEncoder from the Apache Avro library is used internally by the adapter. When specified, the provided value must be a fully-qualified name of the Java class that implements a public constructor which takes two arguments: org.apache.avro.Schema which represents the Avro schema to use for the conversion, and java.io.OutputStream which represents the stream to which to write JSON content. The jar file that implements this class must be included in the classpath in use for the adapter

The corresponding adapter command is -AJECN name (or -AVROJSONENCODERCLASSNAME name).