Solution Source Code

A solution is a domain-specific configuration of Drive that is customized for a particular customer in that domain. Different solutions can be in different domains E.g., Telco, banking, retail, healthcare. They can also be tailored towards different customers in that domain E.g., different data sources, communication, and fulfillment services, etc.

To build custom solutions based on the requirement, the source code for it would reside here. All feed applications, its parsers and event consumers development would happen here. The enrichment helpers logic are written here after they are declared in the enrichment_functions.json file. Let's dive into each topic.

Feed Applications

A feed application consist of a flow with a defined set of operators in a certain structure. Operators are individual components designed to perform a certain set of tasks.

A typical feed application has the following components:

  • A Source: receives the input data from an external data source
  • A Parser: parses the input data into tuple format
  • An Enricher: enriches the tuple data with profile information stored in PinPoint, and the aggregate data stored in FastPast
  • An Aggregator: updates the aggregate data stored in FastPast
  • A Trigger Evaluator: Detects trigger conditions of interest
  • One or more Sinks: Sends trigger results to the campaign executor via a Kafka queue

A feed application is typically configured via parameters specified in drive.json. A typical entry would look something like this:

{"drive":{..."feedApplications":[{"logLevel":"INFO","name":"CC Usage"}]...}}

The source data for a feed application commonly come from files or Kafka queues. Custom source operators can be written for connecting to different sources.

An example of a simple feed application which would scan a directory for csv every 10 secs and ingest data would look like the code below:

classCCUsageFeedApplication(FeedApplication):"""Implements the Goliath feed application"""defcreate_flow(self):"""Creates the Credit Card Usage flow"""scan_schema=Schema({"filename":String,"modTime":Int64},name="scan_schema")parser_schema=Schema({drive_constants.CA_EPOCH_ATTRIBUTE:Int64,drive_constants.CA_TIMESTAMP_ATTRIBUTE:Int64,"departmentName":String,"description":String,"cardHolderName":String,"customerID":String,"merchant":String,"category":String,"mcc":String,"transactionAmount":Double,"transactionType":String},name="parser_schema")scanner=self.instantiate_directory_scanner(file_name_filter=PatternBasedFileNameFilter(r".*\.csv"),output_schema=scan_schema,period_in_seconds=10)parser=self.instantiate_operator(class_=CCUsageReader,input_schema=scan_schema,move_path=self.feed_data_move_dir(),name=drive_constants.DRIVE_APPLICATION_PARSER_OPERATOR_NAME,output_schema=parser_schema)returnself.compose_flow(ingress_flow=scanner>>parser)
  • The class CCUsageFeedApplication is inherited from FeedApplication base class.
  • create_flow is an abstract method of FeedApplication class which is implemented here. This method is used to define the flow of the operators for the feed application.
  • Every operator by default requires a 3 parameters: name, input schema and output schema.
  • Schema defines the name and data type of the attributes that the operator would be receiving or sending.
  • Input Schema defines the schema that would be entering the operator.
  • Output Schema defines the schema that would be sent ahead from the operator.
  • scan_schema and parser_schema are two such schemas defined in the example above. scan_schema contains the attributes would be sent by the directory scanner operator which would be file name and the last modified timestamp of the file. parser_schema would contain the attributes that would be populated after reading the input file from the source directory.
  • CCUsageReader operator class will parse the contents of the file, raise errors if any, populate the output tuple and then emit it to the next operator.
  • compose_flow adds on the standard pre defined operators like aggregator, enricher and kafka sink after the ingress flow.
Parsers

Parser operators are used for parsing data from input source and setting it into tuple format. Here the data read from source is converted to a format which would be easier for the feed application to consume and process. A sample of a parser class operator would look like the snippet below:

classCCUsageReader(CSVParser):"""A sample operator that generates cc_usage related data"""CURRENCY_SIGN="$"@oxygen_operator()def__init__(self,batch_size=None,delimiter=",",move_path=None):super(CCUsageReader,self).__init__(batch_size=batch_size,date_format=None,delimiter=delimiter,epoch_attribute=CA_EPOCH_ATTRIBUTE,move_path=move_path,row_processor=self._parse_row)defset_up(self):super(CCUsageReader,self).set_up()def_parse_row(self,fields,out_tuple):"""Parses the fields in a row"""out_tuple.departmentName=fields[3].upper()out_tuple.cardHolderName=fields[4].upper()out_tuple.customerID=fields[2]out_tuple.description=""out_tuple.merchant=fields[5].upper()out_tuple.category=category=fields[6].upper()setattr(out_tuple,CA_TIMESTAMP_ATTRIBUTE,int(self._datetime_parser.utime(fields[7])))out_tuple.transactionAmount=float(fields[8].split(CCUsageReader.CURRENCY_SIGN)[1])out_tuple.transactionType=fields[9]
  • The class CCUsageReader is inherited from CSVParser base class.
  • The delimiter passed to the constructor will split each line in the file and then send the list to the _parse_row method.
  • The method _parse_row is passed on as row_processor to the constructor of the CSVParser class.
  • The method will have 2 parameters passed to it - fields is the list of the raw data of a single line read from a file, the list is created based on the split set by the delimiter. out_tuple is the output tuple for the operator based on the output schema set in the feed application class.