Actualización del grupo de procesos NiFi, del procesador y del servicio del responsable mediante el descriptor del conector Ingest

Los procesadores NiFi son los bloques de construcción esenciales de la canalización de flujo de datos versión 9.1. Los procesadores realizan tareas específicas dentro de la canalización como, por ejemplo, escuchar datos entrantes, extraer datos de fuentes externas y direccionamiento, transformar o extraer información de archivos de flujo. Los procesadores se agrupan en grupos de procesos. Puede utilizar una API simple para actualizar procesadores, grupos y sus servicios asociados existentes.

El punto final REST para esta API es:

  • PUT/Conectores

  • BODY connectorEntity Json

Procesadores

Puede actualizar o cambiar las configuraciones de procesador, las propiedades de procesador, la conexión y las configuraciones de conexión según los requisitos de la empresa. Usted especifica los cambios del procesador al incluir la siguiente propiedad en el alcance de las entradas de canalización > propiedades de canalización > alcance > tipo = PROCESSOR

La personalización se produce en el procesador que se especifica en canalización propiedades > alcance > nombre = ProcessGroupName.Processor, donde ProcessGroupName es el nombre del grupo de procesos que contiene el procesador mismo.

Esta API también puede personalizar todos los procesadores que están anidados en el grupo de procesos especificado si se utiliza ''*" en el nombre de alcance. Por ejemplo: "ProcessGroupName*".

Actualizar configuraciones de procesador:

Puede actualizar o cambiar los siguientes elementos de configuración de procesador:

Nombre del elemento de configuración Nombre de elemento de configuración correspondiente en JSON Descripción Valor aceptado
Tareas simultáneas Recuento de tareas planificables simultáneamente El número de tareas que deben planificarse simultáneamente para el procesador especificado. n
Note: n es un entero.
Ejecutar planificación SchedulingPeriod El número mínimo de segundos que deben transcurrir entre la ejecución de la tarea n seg.
Note: n es un entero.
Ejecución ExecutionNode Los nodos en los que se planificará que se ejecute este procesador. Cualquiera de los valores siguientes:
  • Todos
  • PRIMARIA
Duración de la penalización PenaltyDuration La cantidad de tiempo que se utiliza cuando este procesador utiliza un flowFile. n seg.
Note: n es un entero.
Duración del rendimiento YieldDuration Cuando se produce un procesador, no se volverá a programar hasta que transcurra esta cantidad de tiempo. n seg.
Note: n es un entero.
Nivel de boletín Nivel de boletín El nivel en el que este procesador generará boletines. Cualquiera de los valores siguientes:
  • NINGUNO
  • DEBUG
  • INFO
  • AVISO
  • ERROR
Estrategia de planificación SchedulingStrategy La estrategia utilizada para planificar este procesador. Cualquiera de los valores siguientes:
  • TIMER_DRIVEN
  • EVENT_DRIVEN
  • CRON_DRIVEN
Comentarios Comentarios Serie
Ejemplos
  • Actualice el valor de tareas simultáneas para el procesador "Documento de transformación - Buscar atributos de base de datos" en el grupo de procesos "Buscar atributos de la base de datos" bajo el conducto DatabaseProductStage1c.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                    {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "5",
                        "scope": {
                            "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }
    }
  • Actualice el valor de tareas simultáneas para todos los procesadores en el grupo de procesos "SCROLL SQL" bajo el conducto DatabaseProductStage1c.
    {
        "name": "auth.reindex",
        "pipes": [
            {
                "name": "DatabaseProductStage1c",
                "properties": [
                        {
                        "name": "ConcurrentlySchedulableTaskCount",
                        "value": "3",
                        "scope": {
                            "name": "SCROLL SQL.*",
                            "type": "PROCESSOR"
                        }
                    }
                ]
            }
        ]
    }

Actualización de propiedades de procesador:

Además de las configuraciones de procesador mencionadas anteriormente, también puede cambiar las propiedades del procesador. Utilice la siguiente API para obtener la lista de propiedades que puede cambiar para el procesador:
http://nifiHost:nifiPort/nifi-api/processors/{processorId}

Puede ver en la respuesta JSON ( propiedades de configuración > de > componente)las propiedades que se pueden cambiar.

El cuerpo json utilizado para actualizar las propiedades es el mismo que la actualización de la configuración, excepto la forma en que especifica el nombre de propiedad. Se cambia como:
"name": "properties.{property name to change}"

Ejemplo:

Actualice la propiedad "Tiempo de espera máximo" para el procesador "Ejecutar SQL - Buscar categoría principal".
{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "properties.Max Wait Time",
                    "value": "25 seconds",
                    "scope": {
                        "name": "SCROLL SQL.Execute SQL - Find Parent Category",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}

Conexión

La nueva versión de la API también admite a la actualización de conexiones y configuraciones de conexión. El cuerpo de una actualización de conexiones presenta un aspecto distinto al de una actualización de procesador o de grupo de procesos. A diferencia de los procesadores y los grupos de procesos, las conexiones no tienen un nombre de identificación exclusivo y, por lo tanto, no es posible obtener un ID de conexión basado en el nombre de conexión. Para resolverlo, se han añadido estos nuevos campos en el alcance:
  • source.name El nombre del procesador/grupo de procesos desde el que va la conexión (que recibe los archivos de flujo).
  • destinationName: El nombre del procesador/grupo de procesos al que se va a conectar (enviando los archivos de flujo).
El nombre en el ámbito es el nombre del grupo de procesos inmediato que contiene la conexión.

Ejemplo:

{
    "name": "BackPressureObjectThreshold",
    "value": "600",
    "scope": {
        "name": "SCROLL SQL",
        "sourceName": "Execute SQL - Find Parent Category",
        "destinationName": "Analyze Successful SQL Response",
        "type": "CONNECTION"
    }
}

Actualización de configuración de conexión:

Puede actualizar o cambiar los siguientes elementos de configuración de conexión:

Nombre del elemento de configuración Nombre de elemento de configuración correspondiente en JSON Descripción Valor aceptado
Umbral de objeto de contrapresión BackPressureObjectThreshold El número máximo de objetos que se pueden poner en cola antes de aplicar la copia de seguridad. n
Note: n es un entero.
Umbral de tamaño BackPressureDataSizeThreshold El tamaño máximo de datos de los objetos que se pueden poner en cola antes de aplicar la copia de seguridad. n GB
Note: n es un entero.
Caducidad del archivo de flujo FlowFileExpiration

La cantidad máxima de tiempo que un objeto puede estar en el flujo antes de que expire y salga del flujo.

.
n seg.
Note: n es un entero.

Ejemplo:

Actualice una conexión entre el procesador "Analyze Successful SQL Response" y el puerto de salida "SUCCESS" en el grupo "SCROLL SQL" bajo "DatabaseProductStage1c".

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "BackPressureObjectThreshold",
                    "value": "200",
                    "scope": {
                        "name": "SCROLL SQL",
                        "sourceName": "Analyze Successful SQL Response",
                        "destinationName": "SUCCESS",
                        "type": "CONNECTION"
                    }
                }
            ]
        }
    ]
}

Grupos de procesos

Personalice sus grupos de procesos incluyendo la siguiente propiedad en entradas de canalización > propiedades de canalización > alcance > tipo = PROCESS_GROUP.

Esto personaliza variables a nivel de grupo de procesos y solo en el grupo de procesos especificado. Para personalizar varios grupos de procesos, cada uno necesitará su propia entrada de propiedades de barra vertical.

Ejemplo:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.1",
                        "scope": {
                            "name": "Rollup Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    },
                    {
                        "name": "matchmaker.proximity",
                        "value": "0.4",
                        "scope": {
                            "name": "Find Attributes",
                            "type": "PROCESS_GROUP"
                        }
                    }
            ]
        }
    ]
}

Servicios de controlador

Personalice los servicios de controlador personalizados incluyendo la siguiente propiedad en entradas de canalización > propiedades de canalización > alcance > tipo = CONTROLLER_SERVICE.

Esta propiedad permite la personalización de variables en el nivel de barra vertical principal y solo en el servicio de controlador especificado. Para personalizar varios servicios de controlador, cada uno necesitará su propia entrada de propiedades de barra vertical.

Ejemplo:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        }
    ]
}

Varias personalizaciones de canalización

Esta API sí admite la personalización de varios conductos en un conector. Realice esta personalización incluyendo varias entradas de barra vertical.

Ejemplo:

{
    "name": "auth.reindex",
    "pipes": [
        {
            "name": "DatabaseProductStage1c",
            "properties": [
                {
                    "name": "concurrent.tasks",
                    "value": "1",
                    "scope": {
                        "name": "Find Attributes from Database.Transform Document - Find Attributes From Database",
                        "type": "PROCESSOR"
                    }
                },
                {
                    "name": "matchmaker.proximity",
                    "value": "0.6",
                    "scope": {
                        "name": "Rollup Attributes",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "Max Total Connections",
                    "value": "20",
                    "scope": {
                        "name": "Database Connection Pool",
                        "type": "CONTROLLER_SERVICE"
                    }
                }
            ]
        },
        {
            "name": "WaitLink",
            "label": "WaitLink - Product Stage 1 - 3",
            "properties": [
                {
                    "name": "connector.stage.name",
                    "value": "TESTING",
                    "scope": {
                        "name": "Wait for Completion",
                        "type": "PROCESS_GROUP"
                    }
                },
                {
                    "name": "concurrent.tasks",
                    "value": "5",
                    "scope": {
                        "name": "Wait for Completion.Get Status",
                        "type": "PROCESSOR"
                    }
                }
            ]
        }
    ]
}
HCL Commerce Version 9.1.5.0 or later
Important:
  • Las personalizaciones realizadas con esta API se guardan en Zookeeper. En consecuencia, si el conector se borra y se crea de nuevo, entonces puede recuperar el descriptor del conector personalizado utilizando la solicitud GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}.

  • Puede importar el conector personalizado utilizando la solicitud POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}. Mediante esta solicitud, puede importar el conector personalizado de cualquiera de las dos formas siguientes:
    • con un cuerpo vacío si Zookeeper contiene el descriptor del conector personalizado

      O

    • Con el descriptor del conector personalizado extraído
  • Guarde el descriptor del conector personalizado, ya que los datos en Zookeeper pueden sobrescribirse desde Ingest. Guardar el descriptor del conector personalizado le ofrece la flexibilidad de moverse fácilmente entre diferentes versiones, ya que el descriptor personalizado guardado puede fusionarse manualmente con el descriptor del conector por defecto.