Actualización del grupo de procesos NiFi, del procesador y del servicio del responsable mediante el descriptor del conector Ingest
Los procesadores NiFi son los bloques de construcción esenciales de la canalización de flujo de datos versión 9.1. Los procesadores realizan tareas específicas dentro de la canalización como, por ejemplo, escuchar datos entrantes, extraer datos de fuentes externas y direccionamiento, transformar o extraer información de archivos de flujo. Los procesadores se agrupan en grupos de procesos. Puede utilizar una API simple para actualizar procesadores, grupos y sus servicios asociados existentes.
El punto final REST para esta API es:
-
PUT/Conectores
-
BODY connectorEntity Json
Procesadores
Puede actualizar o cambiar las configuraciones de procesador, las propiedades de procesador, la conexión y las configuraciones de conexión según los requisitos de la empresa. Usted especifica los cambios del procesador al incluir la siguiente propiedad en el alcance de las
La personalización se produce en el procesador que se especifica en canalización
, donde ProcessGroupName es el nombre del grupo de procesos que contiene el procesador mismo.Esta API también puede personalizar todos los procesadores que están anidados en el grupo de procesos especificado si se utiliza ''*" en el nombre de alcance. Por ejemplo: "ProcessGroupName*".
Actualizar configuraciones de procesador:
Puede actualizar o cambiar los siguientes elementos de configuración de procesador:
Nombre del elemento de configuración | Nombre de elemento de configuración correspondiente en JSON | Descripción | Valor aceptado |
---|---|---|---|
Tareas simultáneas | Recuento de tareas planificables simultáneamente | El número de tareas que deben planificarse simultáneamente para el procesador especificado. | n Note: n es un entero. |
Ejecutar planificación | SchedulingPeriod | El número mínimo de segundos que deben transcurrir entre la ejecución de la tarea | n seg. Note: n es un entero. |
Ejecución | ExecutionNode | Los nodos en los que se planificará que se ejecute este procesador. | Cualquiera de los valores siguientes:
|
Duración de la penalización | PenaltyDuration | La cantidad de tiempo que se utiliza cuando este procesador utiliza un flowFile. | n seg. Note: n es un entero. |
Duración del rendimiento | YieldDuration | Cuando se produce un procesador, no se volverá a programar hasta que transcurra esta cantidad de tiempo. | n seg. Note: n es un entero. |
Nivel de boletín | Nivel de boletín | El nivel en el que este procesador generará boletines. | Cualquiera de los valores siguientes:
|
Estrategia de planificación | SchedulingStrategy | La estrategia utilizada para planificar este procesador. | Cualquiera de los valores siguientes:
|
Comentarios | Comentarios | Serie |
- Actualice el valor de tareas simultáneas para el procesador "Documento de transformación - Buscar atributos de base de datos" en el grupo de procesos "Buscar atributos de la base de datos" bajo el conducto DatabaseProductStage1c.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "5", "scope": { "name": "Find Attributes from Database.Transform Document - Find Attributes From Database", "type": "PROCESSOR" } } ] } ] } }
- Actualice el valor de tareas simultáneas para todos los procesadores en el grupo de procesos "SCROLL SQL" bajo el conducto DatabaseProductStage1c.
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "ConcurrentlySchedulableTaskCount", "value": "3", "scope": { "name": "SCROLL SQL.*", "type": "PROCESSOR" } } ] } ] }
Actualización de propiedades de procesador:
http://nifiHost:nifiPort/nifi-api/processors/{processorId}
Puede ver en la respuesta JSON (
las propiedades que se pueden cambiar."name": "properties.{property name to change}"
Ejemplo:
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "properties.Max Wait Time",
"value": "25 seconds",
"scope": {
"name": "SCROLL SQL.Execute SQL - Find Parent Category",
"type": "PROCESSOR"
}
}
]
}
]
}
Conexión
- source.name El nombre del procesador/grupo de procesos desde el que va la conexión (que recibe los archivos de flujo).
- destinationName: El nombre del procesador/grupo de procesos al que se va a conectar (enviando los archivos de flujo).
Ejemplo:
{
"name": "BackPressureObjectThreshold",
"value": "600",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Execute SQL - Find Parent Category",
"destinationName": "Analyze Successful SQL Response",
"type": "CONNECTION"
}
}
Actualización de configuración de conexión:
Puede actualizar o cambiar los siguientes elementos de configuración de conexión:
Nombre del elemento de configuración | Nombre de elemento de configuración correspondiente en JSON | Descripción | Valor aceptado |
---|---|---|---|
Umbral de objeto de contrapresión | BackPressureObjectThreshold | El número máximo de objetos que se pueden poner en cola antes de aplicar la copia de seguridad. | n Note: n es un entero. |
Umbral de tamaño | BackPressureDataSizeThreshold | El tamaño máximo de datos de los objetos que se pueden poner en cola antes de aplicar la copia de seguridad. | n GB Note: n es un entero. |
Caducidad del archivo de flujo | FlowFileExpiration | La cantidad máxima de tiempo que un objeto puede estar en el flujo antes de que expire y salga del flujo. . |
n seg. Note: n es un entero. |
Ejemplo:
Actualice una conexión entre el procesador "Analyze Successful SQL Response" y el puerto de salida "SUCCESS" en el grupo "SCROLL SQL" bajo "DatabaseProductStage1c".
{
"name": "auth.reindex",
"pipes": [
{
"name": "DatabaseProductStage1c",
"properties": [
{
"name": "BackPressureObjectThreshold",
"value": "200",
"scope": {
"name": "SCROLL SQL",
"sourceName": "Analyze Successful SQL Response",
"destinationName": "SUCCESS",
"type": "CONNECTION"
}
}
]
}
]
}
Grupos de procesos
Personalice sus grupos de procesos incluyendo la siguiente propiedad en
.Esto personaliza variables a nivel de grupo de procesos y solo en el grupo de procesos especificado. Para personalizar varios grupos de procesos, cada uno necesitará su propia entrada de propiedades de barra vertical.
Ejemplo:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "matchmaker.proximity", "value": "0.1", "scope": { "name": "Rollup Attributes", "type": "PROCESS_GROUP" } }, { "name": "matchmaker.proximity", "value": "0.4", "scope": { "name": "Find Attributes", "type": "PROCESS_GROUP" } } ] } ] }
Servicios de controlador
Personalice los servicios de controlador personalizados incluyendo la siguiente propiedad en
.Esta propiedad permite la personalización de variables en el nivel de barra vertical principal y solo en el servicio de controlador especificado. Para personalizar varios servicios de controlador, cada uno necesitará su propia entrada de propiedades de barra vertical.
Ejemplo:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "Max Total Connections", "value": "20", "scope": { "name": "Database Connection Pool", "type": "CONTROLLER_SERVICE" } } ] } ] }
Varias personalizaciones de canalización
Esta API sí admite la personalización de varios conductos en un conector. Realice esta personalización incluyendo varias entradas de barra vertical.
Ejemplo:
{ "name": "auth.reindex", "pipes": [ { "name": "DatabaseProductStage1c", "properties": [ { "name": "concurrent.tasks", "value": "1", "scope": { "name": "Find Attributes from Database.Transform Document - Find Attributes From Database", "type": "PROCESSOR" } }, { "name": "matchmaker.proximity", "value": "0.6", "scope": { "name": "Rollup Attributes", "type": "PROCESS_GROUP" } }, { "name": "Max Total Connections", "value": "20", "scope": { "name": "Database Connection Pool", "type": "CONTROLLER_SERVICE" } } ] }, { "name": "WaitLink", "label": "WaitLink - Product Stage 1 - 3", "properties": [ { "name": "connector.stage.name", "value": "TESTING", "scope": { "name": "Wait for Completion", "type": "PROCESS_GROUP" } }, { "name": "concurrent.tasks", "value": "5", "scope": { "name": "Wait for Completion.Get Status", "type": "PROCESSOR" } } ] } ] }
![HCL Commerce Version 9.1.5.0 or later](../../base/images/9150plus.png)
-
Las personalizaciones realizadas con esta API se guardan en Zookeeper. En consecuencia, si el conector se borra y se crea de nuevo, entonces puede recuperar el descriptor del conector personalizado utilizando la solicitud
GET http://{IngestHost}:{IngestPort}/connectors/{connectorName}
. -
Puede importar el conector personalizado utilizando la solicitud
POST http://{IngestHost}:{IngestPort}/connectors/{connectorName}
. Mediante esta solicitud, puede importar el conector personalizado de cualquiera de las dos formas siguientes:- con un cuerpo vacío si Zookeeper contiene el descriptor del conector personalizado
O
- Con el descriptor del conector personalizado extraído
- con un cuerpo vacío si Zookeeper contiene el descriptor del conector personalizado
- Guarde el descriptor del conector personalizado, ya que los datos en Zookeeper pueden sobrescribirse desde Ingest. Guardar el descriptor del conector personalizado le ofrece la flexibilidad de moverse fácilmente entre diferentes versiones, ya que el descriptor personalizado guardado puede fusionarse manualmente con el descriptor del conector por defecto.