/*
* $Id: dbase.c 1633 2007-02-14 09:52:55Z bogdan_iancu $
*
* POSTGRES module, portions of this code were templated using
* the mysql module, thus it's similarity.
*
* Copyright (C) 2003 August.Net Services, LLC
* Copyright (C) 2006 Norman Brandinger
*
* This file is part of openser, a free SIP server.
*
* openser is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* openser is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* ---
*
* History
* -------
* 2003-04-06 initial code written (Greg Fausak/Andy Fullford)
* 2006-07-28 within pg_get_result(): added check to immediatly return of no
* result set was returned added check to only execute
* convert_result() if PGRES_TUPLES_OK added safety check to avoid
* double pg_free_result() (norm)
* 2006-08-07 Rewrote pg_get_result().
* Additional debugging lines have been placed through out the code.
* Added Asynchronous Command Processing (PQsendQuery/PQgetResult)
* instead of PQexec. this was done in preparation of adding FETCH
* support. Note that PQexec returns a result pointer while
* PQsendQuery does not. The result set pointer is obtained from
* a call (or multiple calls) to PQgetResult.
* Removed transaction processing calls (BEGIN/COMMIT/ROLLBACK) as
* they added uneeded overhead. Klaus' testing showed in excess of
* 1ms gain by removing each command. In addition, OpenSER only
* issues single queries and is not, at this time transaction aware.
* The transaction processing routines have been left in place
* should this support be needed in the future.
* Updated logic in pg_query / pg_raw_query to accept a (0) result
* set (_r) parameter. In this case, control is returned
* immediately after submitting the query and no call to
* pg_get_results() is performed. This is a requirement for
* FETCH support. (norm)
* 2006-10-27 Added fetch support (norm)
* Removed dependency on aug_* memory routines (norm)
* Added connection pooling support (norm)
* Standardized API routines to pg_* names (norm)
* 2006-11-01 Updated pg_insert(), pg_delete(), pg_update() and
* pg_get_result() to handle failed queries. Detailed warnings
* along with the text of the failed query is now displayed in the
* log. Callers of these routines can now assume that a non-zero
* rc indicates the query failed and that remedial action may need
* to be taken. (norm)
*/
#define MAXCOLUMNS 512
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "../../dprint.h"
#include "../../mem/mem.h"
#include "../../db/db.h"
#include "db_utils.h"
#include "defs.h"
#include "dbase.h"
#include "pg_con.h"
long getpid();
static char _s[SQL_BUF_LEN];
static int submit_query(db_con_t* _con, const char* _s);
//static int connect_db(db_con_t* _con);
//static int disconnect_db(db_con_t* _con);
static int free_query(db_con_t* _con);
/*
* Store name of table that will be used by
* subsequent database functions
*/
int pg_use_table(db_con_t* _con, const char* _t)
{
#ifdef PARANOID
if (!_con) {
LOG(L_ERR, "PG[pg_use_table]: db_con_t parameter cannot be NULL\n");
return -1;
}
if (!_t) {
LOG(L_ERR, "PG[pg_use_table]: _t parameter cannot be NULL\n");
return -1;
}
#endif
CON_TABLE(_con) = _t;
return 0;
}
#if 0
/*
** connect_db Connect to a database
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
**
** Returns :
** 0 upon success
** negative number upon failure
**
** Notes :
** If currently connected, a disconnect is done first
** if this process did the connection, otherwise the
** disconnect is not done before the new connect.
** This is important, as the process that owns the connection
** should clean up after itself.
*/
static int connect_db(db_con_t* _con)
{
char* user, *password, *host, *port, *database;
char urlbuf[SQLURL_LEN];
#ifdef PARANOID
if(! _con)
{
LOG(L_ERR, "PG[connect_db]: db_con_t parameter cannot be NULL\n");
return(-1);
}
#endif
if(CON_CONNECTED(_con))
{
DLOG("connect_db", "disconnect first!");
disconnect_db(_con);
}
if (!CON_SQLURL(_con)) {
PLOG("connect_db","FATAL ERROR: no sql url!");
return(-1);
}
/*
** CON_CONNECTED(_con) is now 0, set by disconnect_db()
*/
/*
** Note :
** Make a scratch pad copy of given SQL URL in urlbuf and work on it.
** all memory allocated to this connection is rooted
** from this.
** This is an important concept.
** as long as you always allocate memory using the function:
** mem = aug_alloc(size, CON_SQLURL(_con)) or
** where size is the amount of memory, then in the future
** when CON_SQLURL(_con) is freed (in the function disconnect_db())
** all other memory allocated in this manner is freed.
** this will keep memory leaks from happening.
*/
snprintf(urlbuf,SQLURL_LEN,"%s",CON_SQLURL(_con));
/*
** get the connection parameters parsed from the db_url string
** it looks like: postgres://username:userpass@dbhost:dbport/dbname
** username/userpass : name and password for the database
** dbhost : the host name or ip address hosting the database
** dbport : the port to connect to database on
** dbname : the name of the database
*/
if(parse_sql_url(urlbuf, &user, &password, &host, &port, &database) < 0)
{
char buf[SQL_BUF_LEN];
snprintf(buf, SQL_BUF_LEN, "Error while parsing %s", CON_SQLURL(_con));
PLOG("connect_db", buf);
return -3;
}
/*
** finally, actually connect to the database
*/
CON_CONNECTION(_con) =
PQsetdbLogin(host,port,NULL,NULL,database,user, password);
if(CON_CONNECTION(_con) == 0
|| PQstatus(CON_CONNECTION(_con)) != CONNECTION_OK)
{
PLOG("connect_db", PQerrorMessage(CON_CONNECTION(_con)));
PQfinish(CON_CONNECTION(_con));
return -4;
}
CON_PID(_con) = getpid();
/*
** all is well, database was connected, we can now submit_query's
*/
CON_CONNECTED(_con) = 1;
return 0;
}
/*
** disconnect_db Disconnect a database
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
**
** Returns :
** 0 upon success
** negative number upon failure
**
** Notes :
** All memory associated with CON_SQLURL is freed.
**
*/
static int disconnect_db(db_con_t* _con)
{
#ifdef PARANOID
if(! _con)
{
LOG(L_ERR, "PG[disconnect_db]: db_con_t parameter cannot be NULL\n");
return(-1);
}
#endif
/*
** ignore if there is no current connection
*/
if(CON_CONNECTED(_con) != 1)
{
DLOG("disconnect_db", "not connected, ignored!\n");
return 0;
}
/*
** make sure we are trying to close a connection that was opened
** by our process ID
*/
if(CON_PID(_con) == getpid())
{
PQfinish(CON_CONNECTION(_con));
CON_CONNECTED(_con) = 0;
}
else
{
DLOG("disconnect_db",
"attempt to release connection not owned, ignored!\n");
}
return 0;
}
#endif
/*
** pg_init initialize database for future queries
**
** Arguments :
** char *_url; sql database to open
**
** Returns :
** db_con_t * NULL upon error
** db_con_t * if successful
**
** Notes :
** pg_init must be called prior to any database functions.
*/
db_con_t *pg_init(const char* _url)
{
struct db_id* id;
struct pg_con* _con;
db_con_t* _res;
int con_size = sizeof(db_con_t) + sizeof(struct pg_con*);
if (strlen(_url)>(SQLURL_LEN-1))
{
LOG(L_ERR, "PG[pg_init]: ERROR sql url too long\n");
return 0;
}
/*
** this is the root memory for this database connection.
*/
_res = (db_con_t*)pkg_malloc(con_size);
if (!_res) {
LOG(L_ERR, "PG[pg_init]: Failed trying to allocate %d bytes for database connection\n", con_size);
return 0;
}
LOG(L_DBG, "PG[pg_init]: %p=pkg_malloc(%d) for database connection\n", (db_con_t*)_res, con_size);
memset(_res, 0, con_size);
id = new_db_id(_url);
if (!id) {
LOG(L_ERR, "PG[pg_init]: Error: Cannot parse URL '%s'\n", _url);
goto err;
}
/* Find the connection in the pool */
_con = (struct pg_con*)pool_get(id);
if (!_con) {
/*
* The LOG below exposes the username/password of the database connection
* by default, it is commented.
* LOG(L_DBG, "PG[pg_init]: Connection '%s' not found in pool\n", _url);
*
*/
LOG(L_DBG, "PG[pg_init]: Connection %p not found in pool\n", id);
_con = pg_new_conn(id);
if (!_con) {
LOG(L_ERR, "PG[pg_init]: Error: pg_new_con failed to add connection to pool\n");
goto err;
}
pool_insert((struct pool_con*)_con);
} else {
/*
* The LOG below exposes the username/password of the database connection
* by default, it is commented.
* LOG(L_DBG, "PG[pg_init]: Connection '%s' found in pool\n", _url);
*
*/
LOG(L_DBG, "PG[pg_init]: Connection %p found in pool\n", id);
}
_res->tail = (unsigned long)_con;
return _res;
err:
if (id) free_db_id(id);
if (_res) {
LOG(L_ERR, "PG[pg_init]: Error: Cleaning up: %p=pkg_free()\n", _res);
pkg_free(_res);
}
return 0;
}
/**
** pg_close last function to call when db is no longer needed
**
** Arguments :
** db_con_t * the connection to shut down, as supplied by pg_init()
**
** Returns :
** (void)
**
** Notes :
** All memory and resources are freed.
*/
void pg_close(db_con_t* _con)
{
struct pool_con* con;
con = (struct pool_con*)_con->tail;
if (pool_remove(con) != 0) {
pg_free_conn((struct pg_con*)con);
}
LOG(L_DBG, "PG[pg_close]: %p=pkg_free() _con\n", _con);
pkg_free(_con);
}
/*
** submit_query run a query
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
** char *_s the text query to run
**
** Returns :
** 0 upon success
** negative number upon failure
*/
static int submit_query(db_con_t* _con, const char* _s)
{
#ifdef PARANOID
if(! _con)
{
LOG(L_ERR, "PG[submit_query]: db_con_t parameter cannot be NULL\n");
return(-1);
}
#endif
/*
** this bit of nonsense in case our connection get screwed up
*/
switch(PQstatus(CON_CONNECTION(_con)))
{
case CONNECTION_OK:
break;
case CONNECTION_BAD:
LOG(L_DBG, "PG[submit_query]: connection reset\n");
PQreset(CON_CONNECTION(_con));
break;
case CONNECTION_STARTED:
case CONNECTION_MADE:
case CONNECTION_AWAITING_RESPONSE:
case CONNECTION_AUTH_OK:
case CONNECTION_SETENV:
case CONNECTION_SSL_STARTUP:
case CONNECTION_NEEDED:
default:
LOG(L_ERR, "PG[submit_query]: %p PQstatus(%s) invalid: %s\n", _con, PQerrorMessage(CON_CONNECTION(_con)), _s);
return -1;
}
/*
** free any previous query that is laying about
*/
if(CON_RESULT(_con))
{
free_query(_con);
}
/*
** exec the query
*/
if (PQsendQuery(CON_CONNECTION(_con), _s)) {
LOG(L_DBG, "PG[submit_query]: %p PQsendQuery(%s)\n", _con, _s);
} else {
LOG(L_ERR, "PG[submit_query]: %p PQsendQuery Error: %s Query: %s\n", _con, PQerrorMessage(CON_CONNECTION(_con)), _s);
return -1;
}
return 0;
}
/**
*
* pg_fetch_result: Gets a partial result set.
*
*/
int pg_fetch_result(db_con_t* _con, db_res_t** _res, int nrows)
{
int rows;
PGresult *res = NULL;
ExecStatusType pqresult;
#ifdef PARANOID
if (!_con) {
LOG(L_ERR, "PG[fetch_result]: db_con_t parameter cannot be NULL\n");
return -1;
}
if (!_res) {
LOG(L_ERR, "PG[fetch_result]: db_res_t parameter cannot be NULL\n");
return -1;
}
if (nrows < 0) {
LOG(L_ERR, "PG[fetch_result]: nrows parameter cannot be less than zero\n");
return -1;
}
#endif
/* exit if the fetch count is zero */
if (nrows == 0) {
return 0;
}
if (*_res == NULL) {
/* Allocate a new result structure */
*_res = pg_new_result();
/* Get the result of the previous query */
while (1) {
if ((res = PQgetResult(CON_CONNECTION(_con)))) {
CON_RESULT(_con) = res;
} else {
break;
}
}
pqresult = PQresultStatus(CON_RESULT(_con));
LOG(L_DBG, "PG[fetch_result]: %p PQresultStatus(%s) PQgetResult(%p)\n", _con, PQresStatus(pqresult), CON_RESULT(_con));
switch(pqresult) {
case PGRES_COMMAND_OK:
/* Successful completion of a command returning no data (such as INSERT or UPDATE). */
return 0;
case PGRES_TUPLES_OK:
/* Successful completion of a command returning data (such as a SELECT or SHOW). */
if (pg_get_columns(_con, *_res) < 0) {
LOG(L_ERR, "PG[fetch_result]: Error while getting column names\n");
return -2;
}
break;
case PGRES_EMPTY_QUERY:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
LOG(L_WARN, "PG[fetch_result]: %p Warning: probable invalid query\n", _con);
default:
LOG(L_WARN, "PG[fetch_result]: %p Warning: PQresultStatus(%s)\n", _con, PQresStatus(pqresult));
if (*_res)
pg_free_result(*_res);
*_res = 0;
return 0;
}
} else {
if(RES_ROWS(*_res) != NULL) {
pg_free_rows(*_res);
}
RES_ROW_N(*_res) = 0;
}
/* determine the number of rows remaining to be processed */
rows = RES_NUM_ROWS(*_res) - RES_LAST_ROW(*_res);
/* If there aren't any more rows left to process, exit */
if (rows <= 0)
return 0;
/* if the fetch count is less than the remaining rows to process */
/* set the number of rows to process (during this call) equal to the fetch count */
if (nrows < rows)
rows = nrows;
RES_ROW_N(*_res) = rows;
LOG(L_DBG, "PG[fetch_result]: Converting row %d of %d count %d\n", RES_LAST_ROW(*_res), RES_NUM_ROWS(*_res), RES_ROW_N(*_res));
if (pg_convert_rows(_con, *_res, RES_LAST_ROW(*_res), RES_ROW_N(*_res)) < 0) {
LOG(L_ERR, "PG[fetch_result]: Error while converting rows\n");
pg_free_columns(*_res);
if (*_res)
pg_free_result(*_res);
*_res = 0;
return -3;
}
/* update the total number of rows processed */
RES_LAST_ROW(*_res) += rows;
return 0;
}
/**
** free_query clear the db channel and clear any old query result status
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
**
** Returns :
** 0 upon success
** negative number upon failure
*/
static int free_query(db_con_t* _con)
{
if(CON_RESULT(_con))
{
LOG(L_DBG, "PG[free_query]: PQclear(%p) result set\n", CON_RESULT(_con));
PQclear(CON_RESULT(_con));
CON_RESULT(_con) = 0;
}
return 0;
}
/*
** db_free_query free the query and free the result memory
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
** db_res_t * the result of a query
**
** Returns :
** 0 upon success
** negative number upon failure
*/
int pg_free_query(db_con_t* _con, db_res_t* _r)
{
free_query(_con);
if (_r) pg_free_result(_r);
_r = 0;
return 0;
}
#if 0
/*
** begin_transaction begin transaction
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
** char * this is just in case an error message
** is printed, we will know which query
** was going to be run, giving us a code debug hint
**
** Returns :
** 0 upon success
** negative number upon failure
**
** Notes :
** This function may be called with a messed up communication
** channel. Therefore, alot of this function is dealing with
** that. Wen the layering gets corrected later this stuff
** should continue to work correctly, it will just be
** way to defensive.
*/
static int begin_transaction(db_con_t * _con, char *_s)
{
PGresult *mr;
int rv;
/*
** Note:
** The upper layers of code may attempt a transaction
** before opening or having a valid connection to the
** database. We try to sense this, and open the database
** if we have the sqlurl in the _con structure. Otherwise,
** all we can do is return an error.
*/
if(_con)
{
if(CON_CONNECTED(_con))
{
mr = PQexec(CON_CONNECTION(_con), "BEGIN");
if(!mr || PQresultStatus(mr) != PGRES_COMMAND_OK)
{
/*
** We get here if the connection to the
** db is corrupt, which can happen a few
** different ways, but all of them are
** related to the parent process forking,
** or being forked.
*/
LOG(L_ERR,"PG[begin_transaction]: corrupt connection\n");
CON_CONNECTED(_con) = 0;
}
else
{
/*
** this is the normal way out.
** the transaction ran fine.
*/
LOG(L_DBG, "PG[begin_transaction]: %p PQclear(%p) result set\n", _con, mr);
PQclear(mr);
return(0);
}
}
else
{
LOG(L_DBG, "PG[begin_transaction]: called before pg_init\n");
}
/*
** if we get here we have a corrupt db connection,
** but we probably have a valid db_con_t structure.
** attempt to open the db.
*/
if((rv = connect_db(_con)) != 0)
{
/*
** our attempt to fix the connection failed
*/
char buf[SQL_BUF_LEN];
snprintf(buf, SQL_BUF_LEN, "no connection, FATAL %d!", rv);
LOG(L_DBG, "PG[begin_transaction]: %s\n", buf);
return(rv);
}
LOG(L_DBG, "PG[begin_transaction]: successfully reconnected\n");
}
else
{
LOG(L_DBG, "PG[begin_transaction]: must call pg_init first!\n");
return(-1);
}
/*
** we get here if the database connection was corrupt,
** i didn't want to use recursion ...
*/
mr = PQexec(CON_CONNECTION(_con), "BEGIN");
if(!mr || PQresultStatus(mr) != PGRES_COMMAND_OK)
{
char buf[SQL_BUF_LEN];
snprintf(buf, SQL_BUF_LEN, "FATAL %s, '%s'!\n",
PQerrorMessage(CON_CONNECTION(_con)), _s);
LOG(L_DBG, "PG[begin_transaction]: %s\n", buf);
return(-1);
}
LOG(L_DBG, "PG[begin_transaction]: db channel reset successful\n");
LOG(L_DBG, "PG[begin_transaction]: %p PQclear(%p) result set\n", _con, mr);
PQclear(mr);
return(0);
}
/*
** commit_transaction any begin_transaction must be terminated with this
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
**
** Returns :
** 0 upon success
** negative number upon failure
*/
static int commit_transaction(db_con_t * _con)
{
PGresult *mr;
mr = PQexec(CON_CONNECTION(_con), "COMMIT");
if(!mr || PQresultStatus(mr) != PGRES_COMMAND_OK)
{
LOG(L_ERR, "PG[commit_transaction]: error");
return -1;
}
LOG(L_DBG, "PG[commit_transaction]: %p PQclear(%p) result set\n", _con, mr);
PQclear(mr);
return(0);
}
/*
** rollback_transaction: any failed begin_transaction must be terminated with this
**
** Arguments :
** db_con_t * as previously supplied by pg_init()
**
** Returns :
** 0 upon success
** negative number upon failure
*/
static int rollback_transaction(db_con_t * _con)
{
PGresult *mr;
mr = PQexec(CON_CONNECTION(_con), "ROLLBACK");
if(!mr || PQresultStatus(mr) != PGRES_COMMAND_OK)
{
LOG(L_ERR, "PG[rollback_transaction]: error");
return -1;
}
LOG(L_DBG, "PG[rollback_transaction]: %p PQclear(%p) result set\n", _con, mr);
PQclear(mr);
return(0);
}
#endif
/*
* Print list of columns separated by comma
*/
static int print_columns(char* _b, int _l, db_key_t* _c, int _n)
{
int i;
int res = 0;
for(i = 0; i < _n; i++) {
if (i == (_n - 1)) {
res += snprintf(_b + res, _l - res, "%s ", _c[i]);
} else {
res += snprintf(_b + res, _l - res, "%s,", _c[i]);
}
}
return res;
}
/*
* Print list of values separated by comma
*/
static int print_values(db_con_t* _con, char* _b, int _l, db_val_t* _v, int _n)
{
int i, res = 0, l;
for(i = 0; i < _n; i++) {
l = _l - res;
/* LOG(L_ERR, "%d sizes l = _l - res %d = %d - %d\n", i, l,_l,res);
*/
if (val2str(_con, _v + i, _b + res, &l) < 0) {
LOG(L_ERR, "PG[print_values]: Error converting value to string\n");
return 0;
}
res += l;
if (i != (_n - 1)) {
*(_b + res) = ',';
res++;
}
}
return res;
}
/*
* Print where clause of SQL statement
*/
static int print_where(db_con_t* _con, char* _b, int _l, db_key_t* _k,
db_op_t* _o, db_val_t* _v, int _n)
{
int i;
int res = 0;
int l;
for(i = 0; i < _n; i++) {
if (_o) {
res += snprintf(_b + res, _l - res, "%s%s",
_k[i], _o[i]);
} else {
res += snprintf(_b + res, _l - res, "%s=", _k[i]);
}
l = _l - res;
val2str(_con, &(_v[i]), _b + res, &l);
res += l;
if (i != (_n - 1)) {
res += snprintf(_b + res, _l - res, " AND ");
}
}
return res;
}
/*
* Print set clause of update SQL statement
*/
static int print_set(db_con_t* _con, char* _b, int _l, db_key_t* _k,
db_val_t* _v, int _n)
{
int i;
int res = 0;
int l;
for(i = 0; i < _n; i++) {
res += snprintf(_b + res, _l - res, "%s=", _k[i]);
l = _l - res;
val2str(_con, &(_v[i]), _b + res, &l);
res += l;
if (i != (_n - 1)) {
if ((_l - res) >= 1) {
*(_b + res++) = ',';
}
}
}
return res;
}
/*
* Query table for specified rows
* _con: structure representing database connection
* _k: key names
* _op: operators
* _v: values of the keys that must match
* _c: column names to return
* _n: nmber of key=values pairs to compare
* _nc: number of columns to return
* _o: order by the specified column
*/
int pg_query(db_con_t* _con, db_key_t* _k, db_op_t* _op,
db_val_t* _v, db_key_t* _c, int _n, int _nc,
db_key_t _o, db_res_t** _r)
{
int off, rv;
if (!_c) {
off = snprintf(_s, SQL_BUF_LEN,
"select * from %s ", CON_TABLE(_con));
} else {
off = snprintf(_s, SQL_BUF_LEN, "select ");
off += print_columns(_s + off, SQL_BUF_LEN - off, _c, _nc);
off += snprintf(_s + off, SQL_BUF_LEN - off,
"from %s ", CON_TABLE(_con));
}
if (_n) {
off += snprintf(_s + off, SQL_BUF_LEN - off, "where ");
off += print_where(_con, _s + off, SQL_BUF_LEN - off,
_k, _op, _v, _n);
}
if (_o) {
off += snprintf(_s + off, SQL_BUF_LEN - off,
"order by %s", _o);
}
LOG (L_DBG, "PG[pg_query]: %p %p %s\n", _con, _r, _s);
if(_r) {
/* if(begin_transaction(_con, _s)) return(-1); */
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[pg_query]: Error while submitting query\n");
/* rollback_transaction(_con); */
return -2;
}
rv = pg_get_result(_con, _r);
/* commit_transaction(_con); */
return(rv);
} else {
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[pg_query]: Error while submitting query\n");
return -2;
}
}
return 0;
}
/*
* Execute a raw SQL query
*/
int pg_raw_query(db_con_t* _con, char* _s, db_res_t** _r)
{
int rv;
LOG (L_DBG, "PG[pg_raw_query]: %p %p %s\n", _con, _r, _s);
if(_r) {
/* if(begin_transaction(_con, _s)) return(-1); */
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[pg_raw_query]: Error while submitting query\n");
/* rollback_transaction(_con); */
return -2;
}
rv = pg_get_result(_con, _r);
/* commit_transaction(_con); */
return(rv);
} else {
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[pg_raw_query]: Error while submitting query\n");
return -2;
}
}
return 0;
}
/*
* Retrieve result set
*
* Input:
* db_con_t* _con Structure representing the database connection
* db_res_t** _r pointer to a structure represending the result set
*
* Output:
* return 0: If the status of the last command produced a result set and,
* If the result set contains data or the convert_result() routine
* completed successfully.
*
* return < 0: If the status of the last command was not handled or if the
* convert_result() returned an error.
*
* Notes:
* A new result structure is allocated on every call to this routine.
*
* If this routine returns 0, it is the callers responsbility to free the
* result structure. If this routine returns < 0, then the result structure
* is freed before returning to the caller.
*
*/
int pg_get_result(db_con_t* _con, db_res_t** _r)
{
PGresult *res = NULL;
ExecStatusType pqresult;
int rc = 0;
*_r = pg_new_result();
while (1) {
if ((res = PQgetResult(CON_CONNECTION(_con)))) {
CON_RESULT(_con) = res;
} else {
break;
}
}
pqresult = PQresultStatus(CON_RESULT(_con));
LOG(L_DBG, "PG[get_result]: %p PQresultStatus(%s) PQgetResult(%p)\n", _con, PQresStatus(pqresult), CON_RESULT(_con));
switch(pqresult) {
case PGRES_COMMAND_OK:
/* Successful completion of a command returning no data (such as INSERT or UPDATE). */
rc = 0;
break;
case PGRES_TUPLES_OK:
/* Successful completion of a command returning data (such as a SELECT or SHOW). */
if (pg_convert_result(_con, *_r) < 0) {
LOG(L_ERR, "PG[get_result]: %p Error returned from convert_result()\n", _con);
if (*_r) pg_free_result(*_r);
*_r = 0;
rc = -4;
}
rc = 0;
break;
case PGRES_EMPTY_QUERY:
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_BAD_RESPONSE:
case PGRES_NONFATAL_ERROR:
case PGRES_FATAL_ERROR:
LOG(L_WARN, "PG[get_result]: %p Warning: Probable invalid query\n", _con);
default:
LOG(L_WARN, "PG[get_result]: %p Warning: %s\n", _con, PQresStatus(pqresult));
LOG(L_WARN, "PG[get_result]: %p Warning: %s\n", _con, PQresultErrorMessage(CON_RESULT(_con)));
if (*_r) pg_free_result(*_r);
*_r = 0;
rc = (int)pqresult;
break;
}
free_query(_con);
return (rc);
}
/*
* Insert a row into specified table
* _con: structure representing database connection
* _k: key names
* _v: values of the keys
* _n: number of key=value pairs
*/
int pg_insert(db_con_t* _con, db_key_t* _k, db_val_t* _v, int _n)
{
db_res_t* _r = NULL;
int off;
int rv = 0;
off = snprintf(_s, SQL_BUF_LEN, "insert into %s (", CON_TABLE(_con));
off += print_columns(_s + off, SQL_BUF_LEN - off, _k, _n);
off += snprintf(_s + off, SQL_BUF_LEN - off, ") values (");
off += print_values(_con, _s + off, SQL_BUF_LEN - off, _v, _n);
*(_s + off++) = ')';
*(_s + off) = '\0';
LOG(L_DBG, "PG[insert]: %p %s\n", _con, _s);
/* if(begin_transaction(_con, _s)) return(-1); */
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[insert]: Error while inserting\n");
/* rollback_transaction(_con); */
return -2;
}
rv = pg_get_result(_con,&_r);
if (rv != 0) {
LOG(L_WARN, "PG[insert]: Warning: %p Query: %s\n", _con, _s);
}
if (_r)
pg_free_result(_r);
/* commit_transaction(_con); */
return(rv);
}
/*
* Delete a row from the specified table
* _con: structure representing database connection
* _k: key names
* _o: operators
* _v: values of the keys that must match
* _n: number of key=value pairs
*/
int pg_delete(db_con_t* _con, db_key_t* _k, db_op_t* _o, db_val_t* _v, int _n)
{
db_res_t* _r = NULL;
int off;
int rv = 0;
off = snprintf(_s, SQL_BUF_LEN, "delete from %s", CON_TABLE(_con));
if (_n) {
off += snprintf(_s + off, SQL_BUF_LEN - off, " where ");
off += print_where(_con, _s + off, SQL_BUF_LEN - off, _k,
_o, _v, _n);
}
LOG(L_DBG, "pg_delete: %p %s\n", _con, _s);
/* if(begin_transaction(_con, _s)) return(-1); */
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[delete]: Error while deleting\n");
/* rollback_transaction(_con); */
return -2;
}
rv = pg_get_result(_con,&_r);
if (rv != 0) {
LOG(L_WARN, "PG[delete]: Warning: %p Query: %s\n", _con, _s);
}
if (_r)
pg_free_result(_r);
/* commit_transaction(_con); */
return(rv);
}
/*
* Update some rows in the specified table
* _con: structure representing database connection
* _k: key names
* _o: operators
* _v: values of the keys that must match
* _uk: updated columns
* _uv: updated values of the columns
* _n: number of key=value pairs
* _un: number of columns to update
*/
int pg_update(db_con_t* _con, db_key_t* _k, db_op_t* _o, db_val_t* _v,
db_key_t* _uk, db_val_t* _uv, int _n, int _un)
{
db_res_t* _r = NULL;
int off;
int rv = 0;
off = snprintf(_s, SQL_BUF_LEN, "update %s set ", CON_TABLE(_con));
off += print_set(_con, _s + off, SQL_BUF_LEN - off, _uk, _uv, _un);
if (_n) {
off += snprintf(_s + off, SQL_BUF_LEN - off, " where ");
off += print_where(_con, _s + off, SQL_BUF_LEN - off, _k,
_o, _v, _n);
*(_s + off) = '\0';
}
LOG(L_DBG, "PG[update]: %p %s\n", _con, _s);
/* if(begin_transaction(_con, _s)) return(-1); */
if (submit_query(_con, _s) < 0) {
LOG(L_ERR, "PG[update]: Error while updating\n");
/* rollback_transaction(_con); */
return -2;
}
rv = pg_get_result(_con,&_r);
if (rv != 0) {
LOG(L_WARN, "PG[update]: Warning: %p Query: %s\n", _con, _s);
}
if (_r)
pg_free_result(_r);
/* commit_transaction(_con); */
return(rv);
}
syntax highlighted by Code2HTML, v. 0.9.1