Source listing

The main thread starts a connection that is named con1 and declares a cursor on table t. It then opens the cursor and makes connection con1 dormant. The main thread then starts six threads (six instances of the threads_all() function) and waits for the threads to complete their work with the pthread_join() DCE call.

Each thread uses the connection con1 and the opened cursor to perform a fetch operation. After the fetch operation, the program makes the connection dormant. Threads use connection con1 in a sequential manner because only one thread can use the connection at a time. Each thread reads the next record from the t table.
/* **************************************************************
* Program Name: thread_safe()
*
* purpose     : If a server connection is initiated with the WITH
*               CONCURRENT TRANSACTION clause, an ongoing transaction
*               can be shared across threads that subsequently
*               connect to that server.
*               In addition, if an open cursor is associated with such
*               connection, the cursor will remain open when the 
*               connection
*                is made dormant. Therefore, multiple threads can share a 
*               cursor.
*
* Methods     : - Create database db_con221 and table t1.
*               - Insert 6 rows into table t1, i.e. values 1 through 6.
*               - Connect to db_con221 as con1 with CONCURRENT 
*                  TRANSACTION.
*                   The CONCURRENT TRANSACTION option is required since 
*                   all 
*                   threads use the cursor throughout the same 
*                   connection.
*               - Declare c1 cursor for "select a from t1 order by a".
*               - Open the cursor.
*               - Start 6 threads. Use DCE pthread_join() to determine if 
*                   all threads complete & all threads do same thing as
*                   follows.
*                   For thread_1, thread_2, ..., thread_6:
*                          o SET CONNECTION con1
*                          o FETCH a record and display it.
*                          o SET CONNECTION con1 DORMANT
*               - Disconnect all connections.
**************************************************************
*/

#include <pthread.h>
#include <dce/dce_error.h>

/* global definitions */
#define num_threads     6

/* Function definitions */
static  void thread_all();
static  long  dr_dbs();
static int checksql(char *str, long expected_err, char *con_name);
static void dce_err();

/* Host variables declaration */
EXEC SQL BEGIN DECLARE SECTION;
   char   con1[] = "con1";
EXEC SQL END DECLARE SECTION;

/* ****************************************************
* Main Thread 
******************************************************/
main()
{
/* create database */

EXEC SQL create database db_con221 with log;
if (! checksql("create database", 0, EMPTYSTR))
   {
   printf("MAIN:: create database returned status {%d}\n", SQLCODE);
   exit(1);
   }

EXEC SQL create table t1( sales int);
if (! checksql( "create_table", 0, EMPTYSTR))
   {
   dr_dbs("db_con221");
   printf("MAIN:: create table returned status {%d}\n", SQLCODE);
   exit(1);
   }
        
if ( populate_tab() != FUNCSUCC)
   {
   dr_dbs("db_con221");
   printf("MAIN:: returned status {%d}\n", SQLCODE);
   exit(1);
   }
 
EXEC SQL close database;
checksql("[main] <close database>", 0, EMPTYSTR);
 
/* Establish connection 'con1' */
EXEC SQL connect to 'db_con221' as 'con1' WITH CONCURRENT TRANSACTION;
if (! checksql("MAIN:: <close database>", 0, EMPTYSTR))
    {
    dr_dbs("db_con221");
    exit(1);
    }

/* Declare cursor c1 associated with the connection con1 */
EXEC SQL prepare tabid from "select sales from t1 order by sales";
checksql("MAIN:: <prepare>", 0, EMPTYSTR);
 
EXEC SQL declare c1 cursor for tabid;
checksql("MAIN:: <declare c1 cursor for>", 0, EMPTYSTR);
 
/* Open cursor c1 and make the connection dormant */
EXEC SQL open c1;
checksql("MAIN:: <open c1>", 0, EMPTYSTR);
EXEC SQL set connection :con1 dormant;
checksql("MAIN:: <set connection con1 dormant>", 0, EMPTYSTR);

/* Start threads  */
start_threads();
 
/* Close cursor and drop database */
EXEC SQL set connection :con1;
checksql("MAIN:: set connection", 0, EMPTYSTR);
EXEC SQL close c1;
checksql("MAIN:: <close cursor>", 0, EMPTYSTR);
EXEC SQL free c1;
checksql("MAIN:: <free cursor>", 0, EMPTYSTR);
 
EXEC SQL disconnect all;
checksql("MAIN:: disconnect all", 0, EMPTYSTR);
dr_dbs("db_con221");
} /* end of Main Thread */
 
/**********************************************************************
* Function: thread_all()
 * Purpose : Uses connection con1 and fetches a row from table t1 using * 
      cursor c1.
 * Returns : Nothing
**********************************************************************/
static void thread_all(thread_num)
int *thread_num;
{
EXEC SQL BEGIN DECLARE SECTION;
   int    val;
EXEC SQL END DECLARE SECTION;

/* Wait for the connection to become available */
do {
   EXEC SQL set connection :con1;
   } while (SQLCODE == -1802);
 
checksql("thread_all: set connection", 0, con1);
 
/* Fetch a row */
EXEC SQL fetch c1 into :val;
checksql("thread_all: fetch c1 into :val", 0, con1);
 
/* Free connection con1 */
EXEC SQL set connection :con1 dormant;
checksql("thread_all: set connection con1 dormant", 0, EMPTYSTR);
printf("Thread id %d fetched value %d from t1\n", *thread_num, val);
} /* thread_all() */

/**********************************************************************
* Function: start_threads()
 * purpose : Create num_threads and passes a thread id number to each 
 *              thread
**********************************************************************/
  
start_threads()  
{
 
   int          thread_num[num_threads];
   pthread_t         thread_id[num_threads];
   int         i, ret, return_value;

for(i=0; i< num_threads; i++)
   {
   thread_num[i] = i;
   if ((pthread_create(&thread_id[i], pthread_attr_default
         (pthread_startroutine_t ) thread_all, &thread_num[i])) == -1)
      {
      dce_err(__FILE__, "pthread_create failed", (unsigned long)-1);
      dr_dbs("db_con221");
      exit(1);
      }
   }
 
 /* Wait for all the threads to complete their work */
for(i=0; i< num_threads; i++)
   {
   ret = pthread_join(thread_id[i], (pthread_addr_t *) &return_value);
   if(ret == -1)
      {
      dce_err(__FILE__, "pthread_join", (unsigned long)-1);
      dr_dbs("db_con221");
      exit(1);
      }
   }
} /* start_threads() */

/**********************************************************************
* Function: populate_tab()
 * Purpose : insert values in table t1.
 * Returns : FUNCSUCC on success and FUNCFAIL when it fails.
**********************************************************************/
static int
populate_tab()
{
EXEC SQL BEGIN DECLARE SECTION;
   int i;
EXEC SQL END DECLARE SECTION;

EXEC SQL begin work;
if (!checksql("begin work", 0,EMPTYSTR))
   return FUNCFAIL;
for (i=1; i<=num_threads; i++)
   {
   EXEC SQL insert into t1 values (:i);
    if(!checksql("insert", 0,EMPTYSTR))
      return FUNCFAIL;
   }
EXEC SQL commit work;
if (!checksql("commit work", 0,EMPTYSTR))
      return FUNCFAIL;
 
return FUNCSUCC;
} /* populate_tab() */

/**********************************************************************
 * Function: dr_dbs()
 * Purpose : drops the database.
**********************************************************************/
long dr_dbs(db_name)
EXEC SQL BEGIN DECLARE SECTION;
   char   *db_name;
EXEC SQL END DECLARE SECTION;

{
EXEC SQL connect to DEFAULT;
checksql("dr_dbs: connect",  0, "DEFAULT");

EXEC SQL drop database :db_name;
checksql("dr_dbs: drop database", 0, EMPTYSTR);

EXEC SQL disconnect all;
checksql("dr_dbs: disconnect all", 0, EMPTYSTR);
} /*dr_dbs() */

/**********************************************************************
 * Function: checksql()
 * Purpose : To check the SQLCODE against the expected error
 *                  (or the expected SQLCODE) and issue a proper message.
 * Returns : FUNCSUCC on success & FUNCFAIL on FAILURE.
**********************************************************************/
int checksql(str, expected_err, con_name)
char *str;
long expected_err;
char *con_name;
{
if (SQLCODE != expected_err)
   {
    printf( "%s %s Returned {%d}, Expected {%d}\n", str, con_name, 
SQLCODE,
      expected_err);
   return(FUNCFAIL);
   }
return (FUNCSUCC);
} /* checksql() */

/**********************************************************************
* Function: dce_err()
 * purpose : prints error from dce lib routines
 * return  : nothing
**********************************************************************/

void dce_err(program, routine, status)
char *program, *routine;
unsigned long status;
{
int dce_err_status;
char dce_err_string[dce_c_error_string_len+1];
 
if(status == (unsigned long)-1)
   {
   dce_err_status = 0;
   sprintf(dce_err_string, "returned FAILURE (errno is %d)", errno);
   }
else
   dce_error_inq_text(status, (unsigned char *)dce_err_string, 
&dce_err_status);
 
if(!dce_err_status)
   {
    fprintf(stderr, "%s: error in %s:\n   ==> %s (%lu) <==\n",
    program, routine, dce_err_string, status);
    }
else
    fprintf(stderr, "%s: error in %s: %lu\n", program, routine, status);
} /* dce_err() */