Extending Ingest connectors

HCL Commerce provides default Ingest connectors that perform data ingestion and transformation tasks to ready the data for the search index. You can use them as is or can customize these default Ingest connectors as per your business requirements using the Search Ingest Service API. This topic describes what customizations you can make and how you can manage these customizations and move them across different versions.

Customizations

You can make the following two types of customizations to the default Ingest connectors:
Customize the structure of the default Ingest connectors (upgrade)

Make structural changes to the default Ingest connectors. A structural change refers to adding or removing pipes (process groups) from the default connectors. For example, adding profit margin pipes to the default auth.reindex connector.

To do the structural change:
  1. 1. Get the full connector descriptor JSON of the specific connector using the endpoint, GET /connector/{connectorID}
    Note: You an also retriever the full connector descriptor JSON of the specific connector from the Ingest container, profile/apps/search-ingest.ear/search-ingest.war/WEB-INF/classes/deployments
  2. After retrieving the full JSON, copy the full descriptor into a new file and edit the structure as per your requirements.
  3. Use Upgrade API with the full connector descriptor (ConnectorDescriptorJson) having the structural change.
Example
Here is an example depicting the full flow of the structural changes made to the auth.reindex connector by inserting a new custom connector descriptor section for a custom pipe.
  1. GET http://\INGEST_HOST:INGEST_PORT/connectors/auth.reindex
    {
      "id": "auth.reindex",
      "name": "auth.reindex",
      "description": "This is the connector to perform a full re-indexing.  This operation involves multiple indexing pipelines executed in a sequential fashion.",
      "created": "2022-01-18T22:56:06.657",
      "modified": "2022-01-18T22:56:06.657",
      "pipes": [
        {
          "name": "ResetLink",
          "label": "ResetLink",
          "properties": [
            {
              "name": "flow.database.listagg",
              "value": "true",
              "scope":
    
    {             "name": "Assign Database Setting",             "type": "PROCESS_GROUP"           }
            },
    .
    .
    .
    
        {
          "name": "Terminal",
          "label": "Terminal",
          "properties": [
            {
              "name": "connector.name",
              "value": "auth.reindex",
              "scope":
    
    {             "name": "Terminal",             "type": "PROCESS_GROUP"           }
            },
            {
              "name": "pipe.name",
              "value": "Terminal",
              "scope":
    
    {             "name": "Terminal",             "type": "PROCESS_GROUP"           }
            }
          ]
        }
      ]
    }
  2. Edit above JSON to add your custom pipe connector descriptor information to the appropriate location in the auth.reindex connector. For more information, see the Elasticsearch Profit Margin Tutorial: Customize connector descriptor of the existing connector with profit margin customization
    {
    "name": "ProfitMarginSchemaUpdateConnector"
    },
    {
    "name": "ProfitMarginDatabaseConnectorPipe",
    "properties": [
    {
    "name": "Database Driver Location(s)",
    "value": "${AUTH_JDBC_DRIVER_LOCATION}",
    "scope": {
    "name": "Database Connection Pool",
    "type": "CONTROLLER_SERVICE"
    }
    },
    {
    "name": "Database Driver Class Name",
    "value": "${AUTH_JDBC_DRIVER_CLASSNAME}",
    "scope": {
    "name": "Database Connection Pool",
    "type": "CONTROLLER_SERVICE"
    }
    },
    {
    "name": "Database Connection URL",
    "value": "${AUTH_JDBC_URL}",
    "scope": {
    "name": "Database Connection Pool",
    "type": "CONTROLLER_SERVICE"
    }
    },
    {
    "name": "Database User",
    "value": "${AUTH_JDBC_USER_NAME}",
    "scope": {
    "name": "Database Connection Pool",
    "type": "CONTROLLER_SERVICE"
    }
    },
    {
    "name": "Password",
    "value": "${AUTH_JDBC_USER_PASSWORD}",
    "scope": {
    "name": "Database Connection Pool",
    "type": "CONTROLLER_SERVICE"
    }
    }
    ]
    }
  3. POST http://\INGEST_HOST : INGEST_PORT/connectors/auth.reindex/upgrade with the new JSON to update the connector with your customization.
Customize property and configuration of the default/existing components (update)
Make properties and configuration changes to the process groups, processors, connections, and controller services. To make these changes, place the update descriptor JSON file inside the container in the folder (/profile/apps/search-ingest.ear/search-ingest.war/WEB-INF/classes/deployments/customization). Based on this, the Search Ingest Service API automatically runs the Update API with this update descriptor JSON provided for customizing the properties and configuration of the default Ingest connectors.
Note: For more information about creating an update descriptor JSON, refer to Updating NiFi process group, processor, controller service using Ingest connector descriptor

Manage customizations

Keeping customizations and deploying other environments with customizations requires you to do the following:
  • Upgrade JSON body or the Update JSON body depending upon the customizations you make (structure change or configuration and property change).
    Note: Upgrade API can also perform Update API functionality.
  • To Update the API, ensure that the pipe and the property exist in the process group, processor, connection, and controller service.
  • To Upgrade the API, ensure that the pipe is added to Registry.
Warning: All Ingest Service processing in NiFi should be completed prior to initiating the Upgrade API to upgrade pipeline process groups (dataflows). NiFi should show no queued data, indicated by "0 / 0 bytes" in the NiFi UI Status Bar. Ingest service processes can be allowed to complete or can be cancelled using the Ingest connectors "cancel" endpoint, as described in Managing connectors in the Ingest service.

If the Upgrade API was executed with queued data or Ingest Service processes running and the NiFi UI Status Bar indicates invalid components, the Upgrade process will fail. The queue must be cleared by locating the Process Groups in the pipeline that has queued flowfile data. Stop the Process Groups that have queued data. For more information, see Stopping a component.

After the Process Group stops, right-click on the Process Group and click on Empty all queues. After the queues are cleared for the pipeline, rerun the Upgrade API for the connector associated with the pipeline. For example,
/connectors/auth.reindex/upgrade

Move customizations across versions

Kubernetes Deployment

The deployment script checks the version of the NiFi images. If it matches, then Nifi PV is not deleted. If it does not match then, NiFi PV is cleared out. When the PV is cleared out, all customizations done on the previous version are also cleared out.

When all customizations that are done on the previous version are cleared out then:
  • Any customizations related to structure need to be done manually again by following the process mentioned in the Customizations section.
  • Any customizations related to property and configuration can be done automatically using Ingest:
    1. Start Ingest docker service.
    2. Move the update descriptor JSON inside the container, /profile/apps/search-ingest.ear/search-ingest.war/WEB-INF/classes/deployments/customization
    3. Commit the container to create a custom image.
    4. Use the normal deployment script with a custom Ingest image tag.