Managing connectors in the Ingest service

How to manage your descriptors and connectors, so as to optimize your search indexing for Elasticsearch.

Connectors are used to ingest data into the HCL Commerce Search service. You use a connector descriptor to tell NiFi how to combine the various pipes to make the connector. The relationship is akin to the concepts of ‘class’ and ‘object’ in Java. In this case, a connector is like an object and a descriptor is like a class.

For information on calling the Ingest service, see Search Ingest Service API.

Viewing all connectors

The GET: /connectors endpoint can be called to get the descriptors of all existing connectors in NiFi.

Viewing a specific connector

The GET: /connectors/{id} endpoint can be used to get the descriptor of the specified connector using its connector Id.

If the connector Id is not known then, GET:/connectors endpoint can be called to return the descriptors of all connectors. You can then look through this list to find the id of the specific connector you are looking for.

Creating a connector

For detailed instructions on creating connectors, including examples and illustrations, see Creating a NiFi service connector.

Enabling a connector

The POST /connectors/{id}/enable endpoint can be used to start all the pipes that are contained in the specified connector.

Disabling a connector

The POST /connectors/{id}/disable endpoint can be used to stop all the pipes that are contained in the specified connector.

Upgrading a connector

The POST: /connectors/{id}/upgrade endpoint can be used to upgrade individual connectors.

If the ID of the connector to be upgraded is not known, then the GET: /connectors endpoint can be used to get all descriptors. You can then find the desired connector to identify its Id.

Warning: All Ingest Service processing in NiFi should be completed prior to initiating the Upgrade API to upgrade pipeline process groups (dataflows). NiFi should show no queued data, indicated by "0 / 0 bytes" in the NiFi UI Status Bar. Ingest service processes can be allowed to complete or can be cancelled using the Ingest connectors "cancel" endpoint, as described in Managing connectors in the Ingest service.

If the Upgrade API was executed with queued data or Ingest Service processes running and the NiFi UI Status Bar indicates invalid components, the Upgrade process will fail. The queue must be cleared by locating the Process Groups in the pipeline that has queued flowfile data. Stop the Process Groups that have queued data. For more information, see Stopping a component.

After the Process Group stops, right-click on the Process Group and click on Empty all queues. After the queues are cleared for the pipeline, rerun the Upgrade API for the connector associated with the pipeline. For example,
/connectors/auth.reindex/upgrade

Upgrade the current connector with the newest version of the pipes

There may be cases where a connector is already defined, but the pipes in that connector have been changed (The connector has the same pipes, but the pipes in Registry have had their contents updated).

In this case, the POST: /connectors/{id}/upgrade endpoint can be called to update the current connector with the newest version of the pipes.

Recreate the connectors based on the new registry

You can use this endpoint to recreate connectors based on the new registry. If you have persistent volume enabled for NiFi, then it stores the connectors in a flow.xml file with the registryId and flowId linked to that version of the registry. When changing versions, a new registry is pulled and deployed to recreate the connectors based on the new registry. The pipes have to be properly linked to the registry so that the connectors can be re-built. When NiFi starts up, it creates the connectors based on the flow.xml. If the pipes within the connectors do not have the correct registryId and flowId that match up with the new registry, it will cause issues because the IDs do not match up. NiFi will not be able to run the index properly. To avoid this the pipes have to be properly linked to the registry so that the upgrade API can rebuild the connector.

Customize connectors

You can use this endpoint for customization. When the upgrade API is called with the specified connector id/name (For example, auth.reindex), it deletes the existing connector in NiFi and recreates it using either the provided ConnectorDescriptorJson or, ConnectorDescriptorJson that is stored in Zookeeper. If the body of the request is empty, then it uses the ConnectorDescriptorJson that is stored in Zookeeper. When a body is provided with the API, it uses this to create the connector and then store it in Zookeeper. This enables you to make the customizations on the default ConnectorDescriptorJson by adding or removing pipes and changing the structure of the connector. This is recommended if you want to keep the customizations across the version. For more information, see Extending the Ingest service.

Example:

Adding 2 "UploadPriceStage1" pipes to the existing auth.price connector.

Existing JSON:
{
  "id": "auth.price",
  "name": "auth.price",
  "description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
  "created": "2020-11-17T12:55:36.949",
  "modified": "2020-11-17T12:55:36.949",
  "pipes": [
    {
      "name": "NRTLink",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "NRTLink",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "Terminal",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "Terminal",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    }
  ]
}
Upgrade API Body:
{
  "id": "auth.price",
  "name": "auth.price",
  "description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
  "pipes": [
    {
      "name": "NRTLink",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "NRTLink",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "Terminal",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "Terminal",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    }
  ]
}

Upgrading batch version for pipes

To use the latest version of your Ingest components, the preferred approach is to upgrade your pipelines using the Version context menu in the NiFi editor. If you want to update all of your connectors, see Upgrading batch version for pipes.

Cancelling a connector

After starting an Ingest operation, all current processes can also be cancelled before completion. This can be accomplished by calling the POST: /connectors/{id}/cancel endpoint. This clears out all Ingest operations that are currently in progress. This endpoint is also useful when the index run gets stuck in the middle of the connector and you want to restart. This endpoint stops the specified connector and the routing service. It stops the connector using the provided ID (connector name). For example, auth.reindex. It then looks through each pipe in the connector and looks into all connections. Once it finds a connection with queued data, a drop request api is called to clear the queue in that connection.
Note: If an ingest operation is queued up, those specific ingest requests may not be cancelled.

Deleting a connector

You can delete a connector by calling the DELETE: /connectors/{id} endpoint. If you do not know the ID of the connector to be deleted, you can call the GET: /connectors endpoint to learn all descriptors. You can then locate the desired connector and read its ID.