Updating NiFi process group, processor, controller service using Ingest connector descriptor

NiFi processors are the basic building blocks of the Version 9.1 dataflow pipeline. Processors perform specific tasks within the pipeline such as listening for incoming data, pulling data from external sources, and routing, transforming, or extracting information from flow files. Processors are grouped into process groups. You can use a simple API to update existing processors, groups, and their associated services.

The REST endpoint for this API is:

  • PUT /connectors

  • BODY connectorEntity Json

Processors

You can update or change the processor configurations, processor properties, connection, connection configurations as per your business requirements. You specify processor changes by including the following property in pipe entries > pipe properties > scope > type = PROCESSOR

The customization occurs in the processor that is specified in pipe properties > scope > name = ProcessGroupName.Processor, where ProcessGroupName is the name of the process group that contains the processor itself.

This API can also customize all processors that are nested in the process group specified if ''*" is used in the scope name. for example: "ProcessGroupName*".

Update processor configurations:

You can update or change the following processor configuration items:

Configuration item name Corresponding configuration item name in JSON Description Accepted value
Concurrent Tasks ConcurrentlySchedulableTaskCount The number of tasks that should be concurrently scheduled for the specified processor. n
Note: n is an integer.
Run Schedule SchedulingPeriod The minimum number of seconds that should elapse between task execution n sec
Note: n is an integer.
Execution ExecutionNode The nodes that this processor will be scheduled to run on. Any one of the following values:
  • All
  • PRIMARY
Penalty Duration PenaltyDuration The amount of time used when this processor penalizes a flowFile. n sec
Note: n is an integer.
Yield Duration YieldDuration When a processor yields, it will not be rescheduled again until this amount of time has elapsed. n sec
Note: n is an integer.
Bulletin Level BulletinLevel The level at which this processor will generate bulletins. Any one of the following values:
  • NONE
  • DEBUG
  • INFO
  • WARN
  • ERROR
Scheduling Strategy SchedulingStrategy The strategy used to schedule this processor. Any one of the following values:
  • TIMER_DRIVEN
  • EVENT_DRIVEN
  • CRON_DRIVEN
Comments Comments String
Examples
  • Update concurrent tasks value for the processor "Transform Document - Find Attributes From Database" in process group "Find Attributes from Database" under the DatabaseProductStage1c pipe.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                    {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "5",
                        "scope": {
                            "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }
    }
  • Update the concurrent tasks value for all processors in "SCROLL SQL" process group under the DatabaseProductStage1c pipe.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                        {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "3",
                        "scope": {
                            "name": "SCROLL SQL.*",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }

Update processor properties:

Apart from the aforementioned processor configurations, you can also change the processor properties. Use the following API to get the list of properties you can change for the processor:
http://nifiHost:nifiPort/nifi-api/processors/{processorId}

You can view in JSON response ( component > config > properties) the properties that can be changed.

The Json body used for updating the properties looks the same as updating the configuration except the way you specify the property name. It is changed as:
"name": "properties.{property name to change}"

Example:

Update the "Max Wait Time" property for "Execute SQL - Find Parent Category" processor.
{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "properties.Max Wait Time",
                    "value": "25 seconds",
                    "scope": {
                        "name": "SCROLL SQL.Execute SQL - Find Parent Category",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}

Connection

The new version of the API also supports updating connections and connection configurations. The body for a connections update looks different from a processor or process group update. Unlike processor and process groups, connections do not have a uniquely identifiable name and so getting a connection Id based on connection name is not possible. To resolve, this new fields in the scope have been added:
  • sourceName: The name of the processor/process group the connection is going from (receiving the flow files).
  • destinationName: The name of the processor/process group the connection is going to (sending the flow files).
The name in scope is the name of the immediate process group that contains the connection.

Example:

{
    "name": "BackPressureObjectThreshold",
    "value": "600",
    "scope": {
        "name": "SCROLL SQL",
        "sourceName": "Execute SQL - Find Parent Category",
        "destinationName": "Analyze Successful SQL Response",
        "type": "CONNECTION"
    }
}

Update connection configurations:

You can update or change the following connection configuration items:

Configuration item name Corresponding configuration item name in JSON Description Accepted value
Back Pressure Object Threshold BackPressureObjectThreshold The maximum number of objects that can be queued before backpressure is applied. n
Note: n is an integer.
Size Threshold BackPressureDataSizeThreshold The maximum data size of objects that can be queued before backpressure is applied. n GB
Note: n is an integer.
FlowFile Expiration FlowFileExpiration

The maximum amount of time an object may be in the flow before it is aged out of the flow.

.
n sec
Note: n is an integer.

Example:

Update a connection between "Analyze Successful SQL Response" processor and "SUCCESS" output port in the "SCROLL SQL" group under "DatabaseProductStage1c".

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "BackPressureObjectThreshold",
                    "value": "200",
                    "scope": {
                        "name": "SCROLL SQL",
                        "sourceName": "Analyze Successful SQL Response",
                        "destinationName": "SUCCESS",
                        "type": "CONNECTION"
                    }
                }
            ]
        }
    ]
}

Process groups

Customize your process groups by including the following property in pipe entries > pipe properties > scope > type = PROCESS_GROUP.

This customizes variables at the process group level and only on the process group specified. To customize multiple process group each will need its own pipe properties entry.

Example:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.1",
                        "scope": {
                            "name": "Rollup Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    },
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.4",
                        "scope": {
                            "name": "Find Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    }
            ]
        }
    ]
}

Controller services

Customize controller services by including the following property in pipe entries > pipe properties > scope > type = CONTROLLER_SERVICE.

This property permits customization of variables in the main pipe level and only on the controller service specified. To customize multiple controller services each will need its own pipe properties entry.

Example:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        }
    ]
}

Multiple pipe customizations

This API does support customizing multiple pipe in one connector. Do this customization by including multiple pipe entries.

Example:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "concurrent.tasks",
                    "value": "1",
                    "scope": {
                        "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                        "type": "PROCESSOR"
                    }
                },
                {
                    "name": "matchmaker.proximity",
                    "value": "0.6",
                    "scope": {
                        "name": "Rollup Attributes",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        },
        {
            "name": "WaitLink",
            "label": "WaitLink - Product Stage 1 - 3",
            "properties": [
                {
                    "name": "connector.stage.name",
                    "value": "TESTING",
                    "scope": {
                        "name": "Wait for Completion",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "concurrent.tasks",
                    "value": "5",
                    "scope": {
                        "name": "Wait for Completion.Get Status",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}
HCL Commerce Version 9.1.5.0 or later
Important:
  • Customizations made with this API are saved in Zookeeper. Consequently, if the connector is deleted and created again, then you can retrieve the customized connector descriptor using the request, GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}.

  • You can import customized connector using the request, POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}. Using this request, you can import the customized connector in any one of the following two ways:
    • with an empty body if Zookeeper contains the customized connector descriptor

      OR

    • With the extracted customized connector descriptor
  • Save the customized connector descriptor as data in Zookeeper can get overwritten from Ingest. Saving the customized connector descriptor offers you the flexibility to easily move across different versions as saved customized descriptor can be manually merged with default connector descriptor.