HCL Commerce Version 9.1.8.0 or later

Creating a custom NiFi processor

Use the follow steps to create your new custom NiFi processor.

Procedure

  1. Create the custom processor classes and test cases.
    • Classes for processors are created within the /src/main/java/ directory;
    • Corresponding processor test cases are created within the /src/test/java/ directory.
    1. Create a class under src/main/java and extend it with org.apache.nifi.processor.AbstractProcessor. Alternatively, if you want to extend the default classes, extend it using com.hcl.software.data.ingest.processors.AbstractCommerceBaseProcessor .
    2. Provide appropriate tags and attributes using the following annotations.
      @Tags({"example"})
      @CapabilityDescription("Provide a descrption")
      @SeeAlso({})
      @ReadsAttributes({@ReadsAttribute(attribute="", description="")})
      @WritesAttributes({@WritesAttribute(attribute="", description="")})
      
    3. To organize imports, right click the Java file and select Source > Organize Imports.
    4. Implement the missing inherited method.
      @Override
      public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException 
      {
      // TODO Auto-generated method stub
      }
      
      You are now ready to start writing the logic for your custom processor.
  2. Create the custom logic for your NiFi processor. The following steps illustrate how logic that you create in Java affects the NiFi processor, or displays on the NiFi user interface.

    Refer to the NiFi Developer’s Guidefor further information on how to create a custom NiFi processor, and for thorough explanation of the APIs that are used to develop extensions.

    1. Declare property fields using org.apache.nifi.components. The PropertyDescriptor data type appears on the PROPERTIES tab of the custom processor.

      For example:

      public static final PropertyDescriptor SCROLL_DURATION = new PropertyDescriptor.Builder().name("Scroll Duration")
      	   .description("The scroll duration is how long each search context is kept in memory")
      	   .required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
      	   .expressionLanguageSupported(true).build();
    2. Declare Relationship fields using org.apache.nifi.processor. The Relationship data type appears on the SETTINGS tab of the custom processor.

      For example:

      public static final Relationship RELATIONSHIP_SUCCESS = new Relationship.Builder().name("success")
      		     .description("The flow file with the specified content was successfully transferred").build();
      
      	public static final Relationship RELATIONSHIP_FAILURE = new Relationship.Builder().name("failure")
      		    .description("The flow file with the specified content has encountered an error during the transfer").build();
      	
      	public static final Relationship RELATIONSHIP_NEXT = new Relationship.Builder().name("next")
      		    .description("The flow file with the specified content for the next iteration").build();
    3. Add the declared properties and relationships within the init() method.
                   @Override 
                   protected void init(final ProcessorInitializationContext context) {
      		    getSupportedPropertyDescriptors().add(SCROLL_DURATION);
      		    getRelationships().add(RELATIONSHIP_SUCCESS);
      		    getRelationships().add(RELATIONSHIP_FAILURE);
      		    getRelationships().add(RELATIONSHIP_NEXT);
    4. Override the onTrigger() method to perform the following operations.
            @Override
      	public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
      		
      		final String duration = context.getProperty(SCROLL_DURATION).evaluateAttributeExpressions().getValue();
      		
      		try {
      1. Get the FlowFile and its contents from the ProcessSession.
        final FlowFile flowFile = session.get();
        			
        final ByteArrayOutputStream bytes = new ByteArrayOutputStream();
        session.exportTo(flowFile, bytes);
        bytes.close();
        final String content =  bytes.toString();
      2. Perform the operations that the processor is supposed to do with the content.
        final Map<String, Object>  json = (Map<String, Object>) new JsonSlurper().parseText(content);
        final String scroll = (String) json.get("_scroll_id");
        final Map<String, Object> hits = (Map<String, Object>) json.get(FIELD_HITS);
        final List<Object> docs = (List<Object>) hits.get(FIELD_HITS);
        if (docs.size() > 0) {
            final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", duration, scroll);
            FlowFile nextFlowFile = session.create(flowFile);
            session.putAttribute(nextFlowFile, "index.scroll.uri", "_search/scroll");
      3. Write the result back into the FlowFile.
        nextFlowFile = session.write(flowFile, new OutputStreamCallback() {
          @Override
          public void process(final OutputStream outputStream) throws IOException {
        	 outputStream.write(scrollBody.getBytes());
        	 outputStream.flush();
          }
        });
        session.getProvenanceReporter().modifyContent(nextFlowFile);
      4. Transfer the FlowFile to the desired relationship.
        session.getProvenanceReporter().route(flowFile, RELATIONSHIP_SUCCESS);
        session.transfer(flowFile, RELATIONSHIP_SUCCESS);
  3. In org.apache.nifi.processor.Processor, create an entry containing the fully qualified classname of your newly created custom processor. If you created a custom controller service, create an entry in org.apache.nifi.controller.ControllerService.
    For example, in the Project Explorer, expand commerce-custom-search-bundles > commerce-custom-search-proocessors > src/main/resources > META-INF > services and click on org.apache.nifi.processor.Processor. Enter your classname in the text entry pane on the right side of the window.

Results

Your custom processor is created.