Ingest Inventory index pipeline

For information about the Product index schema design, see Ingest Product index schema. For information on calling the Ingest service, see Search Ingest Service API. For a complete listing of Elasticsearch index fields and parameters, see Elasticsearch index field types.

Inventory field mapping from database

The following sequence of steps ​illustrates the Inventory indexing pipeline implemented in Apache NiFi. The flow consists of mainly two stages:

Stage 1: Associate parent inventories to Product document
This stage describes how the Inventory data can be transformed and loaded into the Product index.
It starts with running the following SQL to retrieve inventory data from the Commerce database:
 SELECT A.CATENTRY_ID_PARENT,
             LISTAGG(A.FFMCENTER_ID, ', ') WITHIN GROUP (ORDER BY A.FFMCENTER_ID) FFMCENTER_ID,
	     LISTAGG(TO_CHAR(A.QUANTITY), ', ') WITHIN GROUP (ORDER BY A.FFMCENTER_ID) QUANTITY,
	     SUM(A.QUANTITY) TOTAL
       FROM (SELECT M.CATENTRY_ID_PARENT, M.FFMCENTER_ID, SUM(M.QUANTITY) QUANTITY
		    FROM (SELECT L.CATENTRY_ID_PARENT, L.CATENTRY_ID_CHILD, I.FFMCENTER_ID, I.QUANTITY
					FROM CATENTREL L, CATGPENREL R, CATENTRY C, INVENTORY I
				    WHERE R.CATALOG_ID = ${param.catalogId}
				      AND R.CATALOG_ID IN
				          (SELECT CATALOG_ID FROM STORECAT
				            WHERE STOREENT_ID IN
					              (SELECT RELATEDSTORE_ID FROM STOREREL WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
					  AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTRY_ID = L.CATENTRY_ID_PARENT
					  AND C.CATENTTYPE_ID NOT IN ('ItemBean')
					  AND L.CATRELTYPE_ID NOT IN ('BUNDLE_COMPONENT', 'PACKAGE_COMPONENT')
					  AND L.CATENTRY_ID_CHILD = I.CATENTRY_ID AND I.STORE_ID = ${param.storeId}
				    UNION
				   SELECT CATENTRY_ID_PARENT, KITCOMP.CATENTRY_ID_CHILD, KITCOMP.FFMCENTER_ID, KITCOMP.QUANTITY
				     FROM (SELECT L.CATENTRY_ID_PARENT, L.CATENTRY_ID_CHILD, I.FFMCENTER_ID, I.QUANTITY
				            FROM CATENTREL L, CATGPENREL R, CATENTRY C, INVENTORY I
				           WHERE R.CATALOG_ID = ${param.catalogId}
				             AND R.CATALOG_ID IN (SELECT CATALOG_ID FROM STORECAT
				                                   WHERE STOREENT_ID IN
					                                     (SELECT RELATEDSTORE_ID FROM STOREREL
				                                           WHERE STATE = 1 AND STRELTYP_ID = -4 AND STORE_ID = ${param.storeId}))
					         AND R.CATENTRY_ID = C.CATENTRY_ID AND C.MARKFORDELETE = 0 AND C.CATENTRY_ID = L.CATENTRY_ID_PARENT
						     AND CATRELTYPE_ID in ('BUNDLE_COMPONENT','PACKAGE_COMPONENT')
						     AND L.CATENTRY_ID_CHILD = I.CATENTRY_ID AND I.STORE_ID = ${param.storeId} ) KITCOMP) M
		   GROUP BY M.CATENTRY_ID_PARENT, M.FFMCENTER_ID) A
       GROUP BY A.CATENTRY_ID_PARENT
       ORDER BY A.CATENTRY_ID_PARENT
       OFFSET ${param.offset} ROWS FETCH NEXT ${param.pageSize} ROWS ONLY 	     
Next, the result set is passed to the FindParentInventoriesFromDatabase​ processor for transformation, using the following table to ​map the database field returned from the SQL above to an index field in the Product index:​
​​Index Field​ Name​ ​Index Field Type ​​Description
Properties​​​​
​​inventories/<fulfillment>/quantity ​​float Quantity of the current inventory; mapped to INVENTORY.QUANTITY
inventories/<fulfillment>/id id_string​ Fulfillment center id of the current inventory; mapped to INVENTORY.FFMCENTER_ID
inventories/total ​​float Sum of the quantity from the INVENTORY.QUANTITY from all the fulfillment centers for the catalog entry.
For example code, see stage 1 samples
Stage 2: Associate child inventories to Product document
This stage describes how the child inventory data can be transformed and loaded into the Product index. It starts with running the following SQL to retrieve inventory data from the Commerce database:
SELECT CATENTRY.CATENTRY_ID,
	     		LISTAGG(INVENTORY.FFMCENTER_ID, ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) FFMCENTER_ID,
	     		LISTAGG(TO_CHAR(INVENTORY.QUANTITY), ', ') WITHIN GROUP (ORDER BY INVENTORY.FFMCENTER_ID) QUANTITY,
	     		SUM(INVENTORY.QUANTITY) TOTAL
	     FROM INVENTORY, CATENTRY, CATGPENREL
	    WHERE CATGPENREL.CATALOG_ID = ${param.catalogId}
	      AND CATENTRY.CATENTRY_ID = CATGPENREL.CATENTRY_ID
	      AND CATENTRY.CATENTTYPE_ID = 'ItemBean'
	      AND CATENTRY.MARKFORDELETE = 0
	      AND CATENTRY.CATENTRY_ID = INVENTORY.CATENTRY_ID
	      AND INVENTORY.STORE_ID = ${param.storeId}
	    GROUP BY CATENTRY.CATENTRY_ID
            ORDER BY CATENTRY.CATENTRY_ID
            OFFSET ${param.offset} ROWS FETCH NEXT ${param.pageSize} ROWS ONLY
Next, the result set is passed to the FindChildInventoriesFromDatabase​ processor for transformation, using the following table to ​map the database field returned from the SQL above to an index field in the Product index:​
​​Index Field​ Name​ ​Index Field Type ​​Description
Properties​​​​
​​inventories/<fulfillment>/quantity ​​float Quantity of the current inventory; mapped to INVENTORY.QUANTITY
inventories/<fulfillment>/id id_string​ Fulfillment center id of the current inventory; mapped to INVENTORY.FFMCENTER_ID
inventories/total ​​float Sum of the quantity from the INVENTORY.QUANTITY from all the fulfillment centers for the catalog entry.
For example code, see stage 2 samples
Stage 1 samples
The following code is an example of the input data for the FindParentInventoriesFromDatabase​ processor:
{
  "CATENTRY_ID_PARENT": 13698,
  "FFMCENTER_ID": "10501, 11501",
  "QUANTITY": "606, 600",
  "TOTAL": 1206.0
}
The FindParentInventoriesFromDatabase​ processor transforms the input data into the following output data:
{ "update": { "_id": "1--1-10001-13698", "_index": ".auth.1.product.202006160325", "retry_on_conflict": 5, "_source": false } }
{ "doc": {"inventories":{"total":{"quantity":1206.0},"10501":{"quantity":606.0,"id":"10501"},"11501":{"quantity":600.0,"id":"11501"}},"__meta":{"modified":"2020-07-27T13:49:59.403Z"}} }
Stage 2 samples
The following code is an example of the input data for the FindChildInventoriesFromDatabase​ processor:
{
  "CATENTRY_ID": 11778,
  "FFMCENTER_ID": "10501, 11501, 11502, 11503, 11504, 11505, 11506, 11507, 11508, 11509, 11510, 11511, 11512, 11513, 11514, 11515, 11516, 11517, 11518, 11519, 11520, 11521, 11522, 11523, 11524, 11525, 11526, 11527, 11528, 11529, 11530, 11531, 11532, 11533, 11534, 11535, 11536, 11537, 11538, 11539, 11540, 11541, 11542, 11543, 11544, 11545, 11546, 11547, 11548, 11549, 11550, 11551, 11552, 11553, 11554, 11555",
  "QUANTITY": "101, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100",
  "TOTAL": 5601.0
}
The FindChildInventoriesFromDatabase​ processor transforms the input data into the following output data:
{ "update": { "_id": "1--1-10001-11778", "_index": ".auth.1.product.202006160325", "retry_on_conflict": 5, "_source": false } }
{
  "doc": {
    "inventories": {
      "10501": {
        "quantity": 101,
        "id": "10501"
      },
      "11501": {
        "quantity": 100,
        "id": "11501"
      },
      "11502": {
        "quantity": 100,
        "id": "11502"
      },
      "11503": {
        "quantity": 100,
        "id": "11503"
      },
      "11504": {
        "quantity": 100,
        "id": "11504"
      },
      "11505": {
        "quantity": 100,
        "id": "11505"
      },
      "11506": {
        "quantity": 100,
        "id": "11506"
      },
      "11507": {
        "quantity": 100,
        "id": "11507"
      },
      "11508": {
        "quantity": 100,
        "id": "11508"
      },
      "11509": {
        "quantity": 100,
        "id": "11509"
      },
      "11510": {
        "quantity": 100,
        "id": "11510"
      },
      "11511": {
        "quantity": 100,
        "id": "11511"
      },
      "11512": {
        "quantity": 100,
        "id": "11512"
      },
      "11513": {
        "quantity": 100,
        "id": "11513"
      },
      "11514": {
        "quantity": 100,
        "id": "11514"
      },
      "11515": {
        "quantity": 100,
        "id": "11515"
      },
      "11516": {
        "quantity": 100,
        "id": "11516"
      },
      "11517": {
        "quantity": 100,
        "id": "11517"
      },
      "11518": {
        "quantity": 100,
        "id": "11518"
      },
      "11519": {
        "quantity": 100,
        "id": "11519"
      },
      "11520": {
        "quantity": 100,
        "id": "11520"
      },
      "11521": {
        "quantity": 100,
        "id": "11521"
      },
      "11522": {
        "quantity": 100,
        "id": "11522"
      },
      "11523": {
        "quantity": 100,
        "id": "11523"
      },
      "11524": {
        "quantity": 100,
        "id": "11524"
      },
      "11525": {
        "quantity": 100,
        "id": "11525"
      },
      "11526": {
        "quantity": 100,
        "id": "11526"
      },
      "11527": {
        "quantity": 100,
        "id": "11527"
      },
      "11528": {
        "quantity": 100,
        "id": "11528"
      },
      "11529": {
        "quantity": 100,
        "id": "11529"
      },
      "11530": {
        "quantity": 100,
        "id": "11530"
      },
      "11531": {
        "quantity": 100,
        "id": "11531"
      },
      "11532": {
        "quantity": 100,
        "id": "11532"
      },
      "11533": {
        "quantity": 100,
        "id": "11533"
      },
      "11534": {
        "quantity": 100,
        "id": "11534"
      },
      "11535": {
        "quantity": 100,
        "id": "11535"
      },
      "11536": {
        "quantity": 100,
        "id": "11536"
      },
      "11537": {
        "quantity": 100,
        "id": "11537"
      },
      "11538": {
        "quantity": 100,
        "id": "11538"
      },
      "11539": {
        "quantity": 100,
        "id": "11539"
      },
      "11540": {
        "quantity": 100,
        "id": "11540"
      },
      "11541": {
        "quantity": 100,
        "id": "11541"
      },
      "11542": {
        "quantity": 100,
        "id": "11542"
      },
      "11543": {
        "quantity": 100,
        "id": "11543"
      },
      "11544": {
        "quantity": 100,
        "id": "11544"
      },
      "11545": {
        "quantity": 100,
        "id": "11545"
      },
      "11546": {
        "quantity": 100,
        "id": "11546"
      },
      "11547": {
        "quantity": 100,
        "id": "11547"
      },
      "11548": {
        "quantity": 100,
        "id": "11548"
      },
      "11549": {
        "quantity": 100,
        "id": "11549"
      },
      "11550": {
        "quantity": 100,
        "id": "11550"
      },
      "11551": {
        "quantity": 100,
        "id": "11551"
      },
      "11552": {
        "quantity": 100,
        "id": "11552"
      },
      "11553": {
        "quantity": 100,
        "id": "11553"
      },
      "11554": {
        "quantity": 100,
        "id": "11554"
      },
      "11555": {
        "quantity": 100,
        "id": "11555"
      },
      "total": {
        "quantity": 5601
      }
    },
    "__meta": {
      "modified": "2020-07-27T15:12:15.138Z"
    }
  }
}