The TSIncLoad procedure example

The TSIncLoad procedure loads data into a database that contains a time series of corporate bond prices.

The TSIncLoad procedure loads time-variant data from a file into a table that contains time series. It assumes that the table is already populated with the time-invariant data. If the table already has time series data, the new data overwrites the old data or is appended to the existing time series, depending on the time stamps.

To set up the TSIncLoad example, create the procedure, the row subtype, and the database table as shown in the following example. The code for this example is in the $ONEDB_HOME/extend/TimeSeries.version/examples directory.
create procedure if not exists TSIncLoad( table_name lvarchar,
                              file_name lvarchar,
                              calendar_name lvarchar,
                              origin datetime year to fraction(5),
                              threshold integer,
                              regular boolean,
                              container_name lvarchar,
                              nelems integer)
external name '/tmp/Loader.bld(TSIncLoad)'
language C;

create row type day_info (
        ValueDate          datetime year to fraction(5),
        carryover          char(1),
        spread            integer,
        pricing_bmk_id    integer,
        price              float,
        yield              float,
        priority          char(1) );

create table corporates (
        Secid              integer UNIQUE,
        series            TimeSeries(day_info));

create index if not exists corporatesIdx on corporates( Secid);

execute procedure TSContainerCreate('ctnr_daily', 'rootdbs',
'day_info', 0, 0);


insert into corporates values ( 25000006, 'container(ctnr_daily), 
                               origin(2011-01-03 00:00:00.00000), 
                               calendar(daycal), threshold(0)');
execute procedure TSIncLoad('corporates',
        '/tmp/daily.dat',
        'daycal',
        '2011-01-03 00:00:00.00000',
        0,
        't',
        'ctnr_daily',
        1);

Any name can be used for the corporates table. The corporates table can have any number of columns in addition to the Secid and series columns.

Each line of the data file has the following format:
Secid year-mon-day carryover spread pricing_bmk_id price yield priority
For example:
25000006 2010-1-7 m 2 12 2.2000000000 22.2 6
You can run the TSIncLoad procedure with an SQL statement like:
execute procedure TSIncLoad( 'corporates',
                             'data_file_name',
                             'cal_name',
                             '2010-1-1',
                              20,
                              't',
                              'container-name',
                              1);
#include <ctype.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "datetime.h"
#include "mi.h"
#include "tseries.h"

#define DAY_INFO_TYPE_NAME "day_info"
#define DAILY_COL_COUNT 7

typedef struct 
    {
    mi_integer   fd;
    mi_unsigned_integer flags;
#define LDBUF_LAST_CHAR_EOL 0x1

    mi_integer  buf_index;
    mi_integer  buf_len;
    mi_integer  line_no;
    mi_lvarchar *file_name;
    mi_string  data[2048];
    }
FILE_BUF;

#define STREAM_EOF (-1)

typedef struct sec_entry_s 
    {
    mi_integer  sec_id;
    ts_tsdesc  *tsdesc;
    int  in_row; /* Indicates whether the time series is stored in
row. */
    struct  sec_entry_s *next;
    }
sec_entry_t;

typedef struct 
    {
    mi_lvarchar *table_name;
    MI_TYPEID ts_typeid; /* The type id of timeseries(day_info) */
    mi_string *calendar_name;
    mi_datetime *origin;
    mi_integer threshold;
    mi_boolean regular;
    mi_string *container_name;
    mi_integer nelems; /* For created time series. */
    
    mi_integer hash_size;
    MI_CONNECTION *conn;
    sec_entry_t **hash;
    /* Value buffers -- only allocated once. */
    MI_DATUM col_data[ DAILY_COL_COUNT];
    mi_boolean col_is_null[ DAILY_COL_COUNT];
    char *carryover;
    char *priority;
    mi_double_precision price, yield;

    mi_integer instances_created;
    /* A count of the number of tsinstancetable entries added.  Used
to 
     * decide when to update statistics on this table.
     */
    MI_SAVE_SET *save_set;
    }
loader_context_t;


/*
 ***************************************************************************
 * name:     init_context
 *
 * purpose:  Initialize the loader context structure.
 *
 * notes:    
 ***************************************************************************
 */
static void
init_context( mi_lvarchar *table_name,
         mi_lvarchar *calendar_name,
         mi_datetime *origin,
         mi_integer threshold,
         mi_boolean regular,
         mi_lvarchar *container_name,
         mi_integer nelems,
         loader_context_t *context_ptr)
{
    mi_string buf[256];
    mi_integer table_name_len = mi_get_varlen( table_name);
    MI_ROW *row = NULL;
    MI_DATUM retbuf = 0;
    mi_integer retlen = 0;
    mi_lvarchar *typename = NULL;
    MI_TYPEID *typeid = NULL;
    mi_integer err = 0;
    
    
    if( table_name_len > IDENTSIZE)
   mi_db_error_raise( NULL, MI_EXCEPTION, "The table name is too long");
    
    memset( context_ptr, 0, sizeof( *context_ptr));
    context_ptr->conn = mi_open( NULL, NULL, NULL);

    typename = mi_string_to_lvarchar
               ( "timeseries(" DAY_INFO_TYPE_NAME ")");
    typeid = mi_typename_to_id( context_ptr->conn, typename);
    mi_var_free( typename);
    if( NULL == typeid)
   mi_db_error_raise( NULL, MI_EXCEPTION,
            "Type timeseries(" DAY_INFO_TYPE_NAME ") not defined.");
    context_ptr->ts_typeid = *typeid;
    
    context_ptr->table_name = table_name;
    
    context_ptr->calendar_name = mi_lvarchar_to_string( calendar_name);
    context_ptr->origin = origin;
    context_ptr->threshold = threshold;
    context_ptr->regular = regular;
    context_ptr->container_name = mi_lvarchar_to_string( container_name);
    context_ptr->nelems = nelems;

    /* Use the size (count) of the table as the hash table size. */
    sprintf( buf, "select count(*) from %.*s;",
        table_name_len,
        mi_get_vardata( table_name));
    if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY))
   mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed");
    if( MI_ROWS != mi_get_result( context_ptr->conn))
   {
   sprintf( buf, "Could not get size of %.*s table.",
        table_name_len,
        mi_get_vardata( table_name));
   mi_db_error_raise( NULL, MI_EXCEPTION, buf);
   }
    if( NULL == (row = mi_next_row( context_ptr->conn, &err)))
   mi_db_error_raise( NULL, MI_EXCEPTION, "mi_next_row failed");
    if( MI_NORMAL_VALUE != mi_value( row, 0, &retbuf, &retlen)
   || 0 != dectoint( (mi_decimal *) retbuf, &context_ptr->hash_size))
   context_ptr->hash_size = 256;
    (void) mi_query_finish( context_ptr->conn);
    context_ptr->hash
   = mi_zalloc( context_ptr->hash_size*sizeof( *context_ptr->hash));

    context_ptr->col_data[1] = (MI_DATUM) mi_new_var(1); /* carryover
*/
    context_ptr->col_data[6] = (MI_DATUM) mi_new_var(1); /* priority
*/
    
    if( NULL == context_ptr->hash
   || NULL ==  context_ptr->col_data[1]
   || NULL ==  context_ptr->col_data[6])
   mi_db_error_raise( NULL, MI_EXCEPTION, "Not enough memory.");

    context_ptr->carryover
   = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[1]);
    context_ptr->col_data[4] = (MI_DATUM) &context_ptr->price;
    context_ptr->col_data[5] = (MI_DATUM) &context_ptr->yield;
    context_ptr->priority
   = mi_get_vardata( (mi_lvarchar *) context_ptr->col_data[6]);

    context_ptr->save_set = mi_save_set_create( context_ptr->conn);
} /* End of init_context. */


/*
 ***************************************************************************
 * name:     close_context
 *
 * purpose:  Close the context structure.  Free up all allocated memory.
 *
 ***************************************************************************
 */
static void
close_context( loader_context_t *context_ptr)
{
    mi_free( context_ptr->hash);
    context_ptr->hash = NULL;
    context_ptr->hash_size = 0;

    mi_var_free( (mi_lvarchar *) context_ptr->col_data[1]);
    mi_var_free( (mi_lvarchar *) context_ptr->col_data[6]);
    context_ptr->col_data[1] = context_ptr->col_data[6] = 0;
    context_ptr->carryover = context_ptr->priority = NULL;

    (void) mi_save_set_destroy( context_ptr->save_set);
    context_ptr->save_set = NULL;
    
    (void) mi_close( context_ptr->conn);

    mi_free( context_ptr->calendar_name);
    context_ptr->calendar_name = NULL;
    mi_free( context_ptr->container_name);
    context_ptr->container_name = NULL;
    
    context_ptr->conn = NULL;
} /* End of close_context. */


/*
 ***************************************************************************
 * name:     update_series
 *
 * purpose:  Update all the time series back into the table.
 *
 * returns:  
 *
 * notes:    
 ***************************************************************************
 */
static void
update_series( loader_context_t *context_ptr)
{
    mi_integer i = 0;
    register struct sec_entry_s *entry_ptr = NULL;
    struct sec_entry_s *next_entry_ptr = NULL;
    MI_STATEMENT *statement = NULL;
    char buf[256];
    mi_integer rc = 0;
    MI_DATUM values[2] = {0, 0};
    mi_integer lengths[2] = {-1, sizeof( mi_integer)};
    static const mi_integer nulls[2] = {0, 0};
    static const mi_string const *types[2]
   = {"timeseries(day_info)", "integer"};
    mi_unsigned_integer yield_count = 0;
    

    sprintf( buf, "update %.*s set series = ? where Secid = ?;",
        mi_get_varlen( context_ptr->table_name),
        mi_get_vardata( context_ptr->table_name));
    statement = mi_prepare( context_ptr->conn, buf, NULL);
    if( NULL == statement)
   mi_db_error_raise( NULL, MI_EXCEPTION, "mi_prepare failed");

    /* Look at all the entries in the hash table. */
    for( i = context_ptr->hash_size - 1; 0 <= i; i--)
   {
   for( entry_ptr = context_ptr->hash[i];
        NULL != entry_ptr;
        entry_ptr = next_entry_ptr)
       {
       if( NULL != entry_ptr->tsdesc)
      {
      yield_count++;
      if( 0 == (yield_count & 0x3f))
          {
          if( mi_interrupt_check())
         mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted.");
          mi_yield();
          }
      
      values[0] = ts_get_ts( entry_ptr->tsdesc);
      values[1] = (MI_DATUM) entry_ptr->sec_id;
      lengths[0] = mi_get_varlen( ts_get_ts( entry_ptr->tsdesc));
      
      if( mi_exec_prepared_statement( statement,
                  MI_BINARY,
                  1,
                  2,
                  values,
                  lengths,
                  (int *) nulls,
                  (char **) types,
                  0,
                  NULL)
          != MI_OK)
          mi_db_error_raise( NULL, MI_EXCEPTION,
                   "mi_exec_prepared_statement(update) failed");
      ts_close( entry_ptr->tsdesc);
      }
       next_entry_ptr = entry_ptr->next;
       mi_free( entry_ptr);
       }
   context_ptr->hash[i] = NULL;
   }
} /* End of update_series. */


/*
 ***************************************************************************
 * name:     open_buf
 *
 * purpose:  Open a file for reading and attach it to a buffer.
 *
 ***************************************************************************
 */
static void
open_buf( mi_lvarchar *file_name,
     FILE_BUF *buf_ptr)
{
    mi_string *file_name_str = mi_lvarchar_to_string( file_name);

    memset( buf_ptr, 0, sizeof( *buf_ptr));
    buf_ptr->fd = mi_file_open( file_name_str, O_RDONLY, 0);
    mi_free( file_name_str);
    buf_ptr->file_name = file_name;
    
    if( MI_ERROR == buf_ptr->fd)
   {
   char buf[356];
   mi_integer name_len = (256 < mi_get_varlen( file_name))
       ? 256 : mi_get_varlen( file_name);

   sprintf( buf, "mi_file_open(%.*s) failed",
       name_len, mi_get_vardata( file_name));
   
   mi_db_error_raise( NULL, MI_EXCEPTION, buf);
   }
    buf_ptr->buf_index = 0;
    buf_ptr->buf_len = 0;
    buf_ptr->line_no = 1;
} /* End of open_buf. */


/*
 ***************************************************************************
 * name:     get_char
 *
 * purpose:  Get the next character from a buffered file.
 *
 * returns:  The character or STREAM_EOF
 *
 ***************************************************************************
 */
static mi_integer
get_char( FILE_BUF *buf_ptr)
{
    register mi_integer c = STREAM_EOF;

    if( buf_ptr->buf_index >= buf_ptr->buf_len)
   {
   buf_ptr->buf_index = 0;
   buf_ptr->buf_len = mi_file_read( buf_ptr->fd,
                buf_ptr->data,
                sizeof( buf_ptr->data));
   if( MI_ERROR == buf_ptr->buf_len)
       {
       char buf[356];
       mi_integer name_len = (256 < mi_get_varlen( buf_ptr->file_name))
      ? 256 : mi_get_varlen( buf_ptr->file_name);

       sprintf( buf, "mi_file_read(%.*s) failed",
           name_len, mi_get_vardata(buf_ptr->file_name));
   
       mi_db_error_raise( NULL, MI_EXCEPTION, buf);
       }
   if( 0 == buf_ptr->buf_len)
       return( STREAM_EOF);
   }

    /* Increment buf_ptr->line_no until we have started on the next
line, 
     * not when the newline character is seen.
     */
    if( buf_ptr->flags & LDBUF_LAST_CHAR_EOL)
   {
   buf_ptr->line_no++;
   buf_ptr->flags &= ~LDBUF_LAST_CHAR_EOL;
   }
    
    c = buf_ptr->data[ buf_ptr->buf_index++];
    if( '\n' == c)
   buf_ptr->flags |= LDBUF_LAST_CHAR_EOL;
    return( c);
} /* End of get_char. */


/*
 ***************************************************************************
 * name:     close_buf
 *
 * purpose:  Close a file attached to a buffer.
 *
 * notes:    
 ***************************************************************************
 */
static void
close_buf( FILE_BUF *buf_ptr)
{
    mi_file_close( buf_ptr->fd);
    buf_ptr->fd = MI_ERROR;
    buf_ptr->buf_index = 0;
    buf_ptr->buf_len = 0;
    buf_ptr->file_name = NULL;
} /* End of close_buf. */


/*
 ***************************************************************************
 * name:     get_token
 *
 * purpose:  Get the next token from an input stream.
 *
 * returns:  The token in a buffer and the next character after the
buffer.
 *
 * notes:    Assumes that the tokens are separated by white space.
 ***************************************************************************
 */
static mi_integer
get_token( FILE_BUF *buf_ptr,
      mi_string *token,
      size_t token_buf_len)
{
    register mi_integer c = get_char( buf_ptr);
    register mi_integer i = 0;

    while( STREAM_EOF != c && isspace( c))
   c = get_char( buf_ptr);

    for( ;STREAM_EOF != c && ! isspace( c); c = get_char(
buf_ptr))
   {
   if( i >= token_buf_len - 1)
       {
       char err_buf[128];

       sprintf( err_buf, "Word is too long on line %d.", buf_ptr->line_no);
       mi_db_error_raise( NULL, MI_EXCEPTION, err_buf);
       }
   token[i++] = c;
   }
    token[i] = 0;
    
    return( c);
} /* End of get_token. */


/*
 ***************************************************************************
 * name:     increment_instances_created
 *
 * purpose:  Increment the instances_created field and update statistics
 *           when it crosses a threshold.  If the statistics for the
 *           time series instance table were never updated then the
server
 *           would not use the index on the instance table, and time
series
 *           opens would be very slow.
 *
 * returns:  nothing
 *
 * notes:    
 ***************************************************************************
 */
static void
increment_instances_created( loader_context_t *context_ptr)
{
    context_ptr->instances_created++;
    if( 50 != context_ptr->instances_created)
   return;

    (void) mi_exec( context_ptr->conn,
          "update statistics high for table tsinstancetable( id);",
          MI_QUERY_BINARY);
} /* End of increment_instances_created. */


/*
 ***************************************************************************
 * name:     get_sec_entry
 *
 * purpose:  Get the security entry for a security ID
 *
 * returns:  A pointer to security entry
 *
 * notes:    If the entry is not found in the hash table then the
security
 *           is looked up in the table and a new entry made in the
hash 
 *           table. A warning message will be emitted if the security
ID 
 *           cannot be found.  In this case the security entry will
have 
 *           a NULL tsdesc.
 ***************************************************************************
 */
static sec_entry_t *
get_sec_entry( loader_context_t *context_ptr,
          mi_integer sec_id,
          mi_integer line_no)
{
    mi_unsigned_integer i
   = ((mi_unsigned_integer) sec_id) % context_ptr->hash_size;
    sec_entry_t *entry_ptr = context_ptr->hash[i];
    mi_string buf[256];
    mi_integer rc = 0;

    /* Look the security ID up in the hash table. */
    for( ; NULL != entry_ptr; entry_ptr = entry_ptr->next)
   {
   if( sec_id == entry_ptr->sec_id)
       return( entry_ptr);
   }
    /* This is the first time this security ID has been seen. */
    entry_ptr = mi_zalloc( sizeof( *entry_ptr));
    entry_ptr->sec_id = sec_id;
    entry_ptr->next = context_ptr->hash[i];
    context_ptr->hash[i] = entry_ptr;

    /* Look up the security ID in the database table. */
    sprintf( buf,
        "select series from %.*s where Secid = %d;",
        mi_get_varlen( context_ptr->table_name),
        mi_get_vardata( context_ptr->table_name),
        sec_id);
    if( MI_OK != mi_exec( context_ptr->conn, buf, MI_QUERY_BINARY))
   mi_db_error_raise( NULL, MI_EXCEPTION, "mi_exec failed.");

    rc = mi_get_result( context_ptr->conn);
    if( MI_NO_MORE_RESULTS == rc)
   {
   sprintf( buf, "Security %d (line %d) not in %.*s.",
       sec_id, line_no,
       mi_get_varlen( context_ptr->table_name),
       mi_get_vardata( context_ptr->table_name));
   mi_db_error_raise( NULL, MI_MESSAGE, buf);
   /* Mi_db_error_raise returns after raising messages of type MI_MESSAGE.
    */
   }
    else if( MI_ROWS != rc)
   mi_db_error_raise( NULL, MI_EXCEPTION, "mi_get_result failed.");
    else 
   {
   mi_integer err = 0;
   MI_ROW *row = mi_next_row( context_ptr->conn, &err);
   MI_DATUM ts_datum = 0;
   mi_integer retlen = 0;

   /* Save the row so that the time series column will not be erased
when
    * the query is finished.
    */
   if( NULL != row
       && MI_NORMAL_VALUE == mi_value( row, 0, &ts_datum, &retlen))
       {
       if( NULL == (row = mi_save_set_insert( context_ptr->save_set,
                     row)))
      mi_db_error_raise( NULL, MI_EXCEPTION,
               "mi_save_set_insert failed");
       }

   if( NULL != row)
       rc = mi_value( row, 0, &ts_datum, &retlen);
   else
       rc = MI_ERROR;
   if( MI_NORMAL_VALUE != rc && MI_NULL_VALUE != rc)
       {
       if( 0 != err)
      {
      sprintf( buf, "Look up of security ID %d in %.*s failed.",
          sec_id,
          mi_get_varlen( context_ptr->table_name),
          mi_get_vardata( context_ptr->table_name));
      mi_db_error_raise( NULL, MI_EXCEPTION, buf);
      }
       else
      {
      sprintf( buf, "Security %d (line %d) not in %.*s.",
          sec_id, line_no,
          mi_get_varlen( context_ptr->table_name),
          mi_get_vardata( context_ptr->table_name));
      mi_db_error_raise( NULL, MI_MESSAGE, buf);
      return( entry_ptr);
      }
       }
   if( MI_NULL_VALUE != rc)
       entry_ptr->in_row = (TS_IS_INCONTAINER( (ts_timeseries *) ts_datum)
             != 0);
   else
       {
       /* No time series has been created for this security yet.
        * Start one.
        */
       ts_datum = ts_create( context_ptr->conn,
              context_ptr->calendar_name,
              context_ptr->origin,
              context_ptr->threshold,
              context_ptr->regular ? 0 : TS_CREATE_IRR,
              &context_ptr->ts_typeid,
              context_ptr->nelems,
              context_ptr->container_name);
       entry_ptr->in_row = (TS_IS_INCONTAINER( (ts_timeseries *) ts_datum)
             == 0);
       if( entry_ptr->in_row)
      increment_instances_created( context_ptr);
       }
   entry_ptr->tsdesc = ts_open( context_ptr->conn,
                 ts_datum,
                 &context_ptr->ts_typeid,
                 0);
   }
    return( entry_ptr);
} /* End of get_sec_entry. */


/*
 ***************************************************************************
 * name:     is_null
 *
 * purpose:  Determine whether a token represents a null value.
 *
 * returns:  1 if so, 0 if not
 *
 ***************************************************************************
 */
static int
is_null( register mi_string *token)
{
    return( ('N' == token[0] || 'n' == token[0])
       && ('U' == token[1] || 'u' == token[1])
       && ('L' == token[2] || 'l' == token[2])
       && ('L' == token[3] || 'l' == token[3])
       && 0 == token[4]);
} /* End of is_null. */


/*
 ***************************************************************************
 * name:     read_day_data
 *
 * purpose:  Read in the daily data for one security.
 *
 * returns:  Fills in the timestamp structure, the col_data and col_is_null
 *           arrays.
 *
 * notes:    Assumes that the col_is_null array is initialized to
all TRUE.
 ***************************************************************************
 */
static void
read_day_data( loader_context_t *context_ptr,
          FILE_BUF *buf_ptr,
          mi_string *token,
          size_t token_buf_len,
          mi_datetime *tstamp_ptr)
{
    register mi_integer i = 0;
    register mi_integer c;
    
    /* ValueDate  DATETIME year to day*/
    c = get_token( buf_ptr, token, token_buf_len);
    if( STREAM_EOF== c && 0 == strlen( token)
   || '\n' == c)
   return;
    tstamp_ptr->dt_qual = TU_DTENCODE( TU_YEAR, TU_DAY);
    if( is_null( token))
   tstamp_ptr->dt_dec.dec_pos = DECPOSNULL;
    else
   {
   if( 0 == dtcvasc( token, tstamp_ptr))
       {
       context_ptr->col_is_null[0] = MI_FALSE;
       context_ptr->col_data[0] = (MI_DATUM) tstamp_ptr;
       }
   else
       {
       mi_string err_buf[128];

       sprintf( err_buf, "Illegal date on line %d", buf_ptr->line_no);
       mi_db_error_raise( NULL, MI_MESSAGE, err_buf);
       }
   }

    /* carryover char(1) */
    c = get_token( buf_ptr, token, token_buf_len);
    if( STREAM_EOF== c && 0 == strlen( token) || '\n' == c)
   return;
    if( ! is_null( token))
   {
   *(context_ptr->carryover) = token[0];
   context_ptr->col_is_null[1] = MI_FALSE;
   }

    /* spread integer,
     * pricing_bmk_id integer
     */
    for( i = 2; i < 4; i++)
   {
   c = get_token( buf_ptr, token, token_buf_len);
   if( STREAM_EOF== c && 0 == strlen( token)
       || '\n' == c)
       return;
   if( ! is_null( token))
       {
       context_ptr->col_data[i] = (MI_DATUM) atoi( token);
       context_ptr->col_is_null[i] = MI_FALSE;
       }
   }

    /* price float,
     * yield float
     */
    for( i = 4; i < 6; i++)
   {
   c = get_token( buf_ptr, token, token_buf_len);
   if( STREAM_EOF== c && 0 == strlen( token)
       || '\n' == c)
       return;
   if( ! is_null( token))
       {
       *((double *) context_ptr->col_data[i]) = atof( token);
       context_ptr->col_is_null[i] = MI_FALSE;
       }
   }

    /* priority char(1) */
    c = get_token( buf_ptr, token, token_buf_len);
    if( (STREAM_EOF == c || '\n' == c) && 0 == strlen( token))
   return;
    if( ! is_null( token))
   {
   *(context_ptr->priority) = token[0];
   context_ptr->col_is_null[6] = MI_FALSE;
   }
} /* End of read_day_data. */


/*
 ***************************************************************************
 * name:     read_line
 *
 * purpose:  Read a line from the file, fetch the time series descriptor
 *           corresponding to the Secid, create a time series element
for
 *           the line, and convert the date into an mi_datetime structure.
 *
 * returns:  1 if there was more data in the file,
 *           0 if the end of the file was found.
 *
 * notes:    Creates a new time series if the series column for the
Secid is
 *           NULL.
 ***************************************************************************
 */
int
read_line( loader_context_t *context_ptr,
      FILE_BUF *buf_ptr,
      ts_tsdesc **tsdesc_ptr,
      ts_tselem *day_elem_ptr,
      int *null_line,
      mi_datetime *tstamp_ptr,
      sec_entry_t **sec_entry_ptr_ptr)
{
    mi_integer sec_id = -1;
    sec_entry_t *sec_entry_ptr = NULL;    
    mi_string token[256];
    mi_integer c = 0; /* Next character from file. */
    mi_integer i = 0;
    

    *sec_entry_ptr_ptr = NULL;
    *null_line = 1;
    for( i = 0; i < DAILY_COL_COUNT; i++)
   context_ptr->col_is_null[ i] = MI_TRUE;
    
    c = get_token( buf_ptr, token, sizeof( token));
    if( STREAM_EOF== c && 0 == strlen( token))
   return( 0);

    sec_id = atoi( token);

    *sec_entry_ptr_ptr = sec_entry_ptr
   = get_sec_entry( context_ptr, sec_id, buf_ptr->line_no);

    read_day_data( context_ptr,
         buf_ptr,
         token,
         sizeof( token),
         tstamp_ptr);

    *tsdesc_ptr = sec_entry_ptr->tsdesc;
    if( NULL == sec_entry_ptr->tsdesc)
   /* An invalid security ID. */
   return( 1);
    
    if( context_ptr->col_is_null[0]
   && TS_IS_IRREGULAR( ts_get_ts( sec_entry_ptr->tsdesc)))
   {
   mi_string err_buf[128];

   sprintf( err_buf, "Missing date on line %d.", buf_ptr->line_no);
   mi_db_error_raise( NULL, MI_MESSAGE, err_buf);
   return(1);
   }
    *null_line = 0;
    *day_elem_ptr = ts_make_elem_with_buf( sec_entry_ptr->tsdesc,
                  context_ptr->col_data,
                  context_ptr->col_is_null,
                  NULL,
                  *day_elem_ptr);
    return(1);
} /* End of read_line. */


/*
 ***************************************************************************
 * name:     TSIncLoad
 *
 * purpose:  UDR for incremental loading of timeseries from a file.
 *
 ***************************************************************************
 */
void
TSIncLoad( mi_lvarchar *table_name, /* the table that holds the time
series. */
      mi_lvarchar *file_name,
      /* The name of the file containing the data.  It must be accessible
       * on the server machine.
       */
      /*
       * The following parameters are only used to create new time
       * series.
       */
      mi_lvarchar *calendar_name,
      mi_datetime *origin,
      mi_integer threshold,
      mi_boolean regular,
      mi_lvarchar *container_name,
      mi_integer nelems,
      MI_FPARAM *fParamPtr)
{
    FILE_BUF buf = {0};
    ts_tselem day_elem = NULL;
    ts_tsdesc *tsdesc = NULL;
    ts_timeseries *ts = NULL;
    mi_datetime tstamp = {0};
    loader_context_t context = {0};
    mi_unsigned_integer yield_count = 0;
    sec_entry_t *sec_entry_ptr = NULL;
    int null_line = 0;
    

    init_context( table_name,
        calendar_name,
        origin,
        threshold,
        regular,
        container_name,
        nelems,
        &context);
    
    open_buf( file_name, &buf);

    while( read_line( &context,
            &buf,
            &tsdesc,
            &day_elem,
            &null_line,
            &tstamp,
            &sec_entry_ptr))
   {
   yield_count++;
   /* Periodically (once every 64 input lines) check for interrupts
and
    * yield the processor to other threads.
    */
   if( 0 == (yield_count & 0x3f))
       {
       if( mi_interrupt_check())
      mi_db_error_raise( NULL, MI_EXCEPTION, "Load aborted.");
       mi_yield();
       }
   
   if( null_line)
       continue;

   ts = ts_put_elem_no_dups( tsdesc, day_elem, &tstamp);
   if( sec_entry_ptr->in_row && TS_IS_INCONTAINER( ts))
       {
       sec_entry_ptr->in_row = 0;
       increment_instances_created( &context);
       }
   }

    if( NULL != day_elem)
   ts_free_elem( tsdesc, day_elem);
    
    close_buf( &buf);
    update_series( &context);
    close_context( &context);
} /* End of TSIncLoad. */