Gestionar conectores en el servicio Ingestion

Cómo administrar sus descriptores y conectores para optimizar su indexación de búsqueda para Elasticsearch.

Los conectores se utilizan para recopilar datos en el servicio HCL Commerce Search. Utilice un descriptor de conector para indicar a NiFi cómo combinar los diversos conductos para crear el conector. La relación es parecida a los conceptos de 'Class' y 'Object' en Java. En este caso, un conector es como un objeto y un descriptor es como una clase.

Para obtener información sobre cómo llamar al servicio Ingest, consulte API Search Ingest Service.

Visualizar todos los conectores

Se puede llamar al punto final de GET: /connectors para obtener los descriptores de todos los conectores existentes en NiFi.

Visualizar un conector específico

El punto final de GET: /connectors/{id} se puede utilizar para obtener el descriptor del conector especificado utilizando su ID de conector.

Si no se conoce el ID de conector, se puede llamar al punto final de GET:/connectors para que devuelva los descriptores de todos los conectores. A continuación, puede examinar esta lista para encontrar el ID del conector específico que está buscando.

Crear un conector

Para obtener instrucciones detalladas sobre la creación de conectores, incluidos ejemplos e ilustraciones, consulte Crear un conector de servicio NiFi.

Habilitación de un conector

El punto final POST /connectors/{id}/enable se puede utilizar para iniciar todas las canalizaciones contenidas en el conector especificado.

Inhabilitación de un conector

El punto final POST /connectors/{id}/disable se puede utilizar para detener todas las canalizaciones que están contenidas en el conector especificado.

Actualizar un conector

El punto final POST: /connectors/{id}/upgrade se puede utilizar para actualizar conectores individuales.

Si no se conoce el ID del conector que se va a actualizar, el punto final de GET: /connectors se puede utilizar para obtener todos los descriptores. A continuación, puede encontrar el conector deseado para identificar su ID.

Warning: Todo el procesamiento del servicio de Ingesta en NiFi debe completarse antes de iniciar la API de actualización para actualizar los grupos de procesos de canalización (flujos de datos). NiFi debería mostrar que no hay datos en cola, indicados por "0 / 0 bytes" en la barra de estado de la interfaz de usuario de NiFi. Se puede permitir que los procesos del Servicio de ingesta se completen o se pueden cancelar utilizando el punto final "cancelar" de los Conectores de ingesta, como se describe en Gestionar conectores en el servicio Ingestion.

Si la API de actualización se ejecutó con datos en cola o con procesos de servicio de Ingesta en marcha y la barra de estado de NiFi UI indica componentes no válidos, el proceso de actualización fallará. La cola debe despejarse localizando los Grupos de Proceso en la tubería que ha puesto en cola los datos del archivo de flujo. Detener los grupos de procesos que tienen datos en cola. Para obtener más información, consulte Detención de un componente.

Después de que el grupo de procesos se detenga, haga clic con el botón derecho en el grupo de procesos y pulse Vaciar todas las colas. Una vez que se han borrado las colas para la canalización, vuelva a ejecutar la API de actualización para el conector asociado con la canalización. Por ejemplo,
/connectors/auth.reindex/upgrade

Actualice el conector actual con la versión más reciente de las canalizaciones

Puede haber casos en los que ya se haya definido un conector, pero las canalizaciones de dicho conector se hayan cambiado (el conector tiene las mismas canalizaciones, pero los conductos del registro han actualizado su contenido).

En este caso, se puede llamar al punto final de POST: /connectors/{id}/upgrade para actualizar el conector actual con la versión más reciente de las canalizaciones.

Vuelva a crear los conectores basándose en el nuevo registro

Puede utilizar este punto final para volver a crear conectores basándose en el nuevo registro. Si tiene habilitado el volumen permanente para NiFi, almacena los conectores en un archivo flow.xml con registryId y flowId enlazados a esa versión del registro. Al cambiar versiones, se extrae y despliega un nuevo registro para volver a crear los conectores basándose en el nuevo registro. Las canalizaciones deben estar enlazadas correctamente al registro para que los conectores se puedan volver a crear. Cuando se inicia Nifi, crea los conectores basados en flow.xml. Si las canalizaciones dentro de los conectores no tienen el registryId y flowId correctos que coincidan con el nuevo registro, causará problemas porque los ID no coinciden. NiFi no podrá ejecutar el índice correctamente. Para evitarlo, las canalizaciones tienen que estar enlazadas correctamente al registro para que la API de actualización pueda volver a crear el conector.

Personalizar conectores

Puede utilizar este punto final para la personalización. Cuando se llama a la API de actualización con el ID/nombre de conector especificado (por ejemplo, auth.reindex), suprime el conector existente en NiFi y lo vuelve a crear utilizando el ConnectorDescriptorJson proporcionado o el ConnectorDescriptorJson que se almacena en Zookeeper. Si el cuerpo de la solicitud está vacío, se utiliza el connectorDescriptorJson que está almacenado en Zookeeper. Cuando se proporciona un cuerpo con la API, lo utiliza para crear el conector y, a continuación, almacenarlo en Zookeeper. Esto le permite realizar las personalizaciones en ConnectorDescriptorJson predeterminado añadiendo o eliminando conductos y cambiando la estructura del conector. Se recomienda mantener las personalizaciones en toda la versión. Para obtener más información, consulte Ampliación del servicio Ingest.

Ejemplo:

Añadir 2 conexiones "UploadPriceStage1" al conector auth.price existente.

JSON existente:
{
  "id": "auth.price",
  "name": "auth.price",
  "description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
  "created": "2020-11-17T12:55:36.949",
  "modified": "2020-11-17T12:55:36.949",
  "pipes": [
    {
      "name": "NRTLink",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "NRTLink",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "Terminal",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "Terminal",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    }
  ]
}
Cuerpo de la API de actualización:
{
  "id": "auth.price",
  "name": "auth.price",
  "description": "This is the connector for the contract price indexing pipeline to perform incremental updates. This operation involves only the Product indexing pipeline.",
  "pipes": [
    {
      "name": "NRTLink",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "NRTLink",
          "scope": {
            "name": "NRTLink",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "UploadPriceStage1",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "UploadPriceStage1",
          "scope": {
            "name": "UploadPriceStage1",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    },
    {
      "name": "Terminal",
      "properties": [
        {
          "name": "connector.name",
          "value": "auth.price",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        },
        {
          "name": "pipe.name",
          "value": "Terminal",
          "scope": {
            "name": "Terminal",
            "type": "PROCESS_GROUP"
          }
        }
      ]
    }
  ]
}

Actualización de la versión por lotes para canalizaciones

Para utilizar la última versión de los componentes integrados, el método preferido es actualizar las canalizaciones utilizando el menú contextual Versión en el editor NiFi. Si desea actualizar todos los conectores, consulte Actualización de la versión por lotes para canalizaciones.

Cancelar un conector

Después de iniciar una operación de introducción, todos los procesos actuales también se pueden cancelar antes de completarse. Esto se puede llevar a cabo llamando al punto final de POST: /connectors/{id}/cancel. Esto borra todas las operaciones de introducción que están actualmente en curso. Este punto final también es útil cuando la ejecución del índice se atasca en medio del conector y desea reiniciar. Este punto final detiene el conector especificado y el servicio de direccionamiento. Detiene el conector utilizando el ID proporcionado (nombre de conector). Por ejemplo, auth.reindex. A continuación, busca en cada conducto del conector y busca en todas las conexiones. Una vez que encuentra una conexión con datos en cola, se invoca una API de solicitud de soltar para borrar la cola en esa conexión.
Note: Si se pone en cola una operación de introducción, es posible que no se cancelen las solicitudes de introducción específicas.

eliminar un conector

Puede eliminar un conector llamando al punto final de DELETE: /connectors/{id}. Si no conoce el ID del conector que se va a eliminar, puede llamar al punto final de GET: /connectors para conocer todos los descriptores. A continuación, puede localizar el conector deseado y leer su ID.