/*
* $Id: pua.c 1919 2007-03-27 16:15:46Z anca_vamanu $
*
* pua module - presence user agent module
*
* Copyright (C) 2006 Voice Sistem S.R.L.
*
* This file is part of openser, a free SIP server.
*
* openser is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version
*
* openser is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* History:
* --------
* 2006-11-29 initial version (anca)
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <libxml/parser.h>
#include <time.h>
#include "../../sr_module.h"
#include "../../parser/parse_expires.h"
#include "../../dprint.h"
#include "../../mem/shm_mem.h"
#include "../../parser/msg_parser.h"
#include "../../str.h"
#include "../../mem/mem.h"
#include "../../pt.h"
#include "../../db/db.h"
#include "../tm/tm_load.h"
#include "pua.h"
#include "send_publish.h"
#include "send_subscribe.h"
#include "pua_bind.h"
MODULE_VERSION
struct tm_binds tmb;
htable_t* HashT= NULL;
int HASH_SIZE=4;
extern int bind_pua(pua_api_t* api);
int min_expires= 0;
int default_expires=3600;
str db_url = {0, 0};
char* db_table= "pua";
int update_period= 100;
int startup_time = 0;
/* database connection */
db_con_t *pua_db = NULL;
db_func_t pua_dbf;
/* module functions */
static int mod_init(void);
static int child_init(int);
static void destroy(void);
int send_subscribe(subs_info_t*);
int send_publish(publ_info_t*);
int update_pua(ua_pres_t* p);
int db_store();
int db_restore();
void db_update(unsigned int ticks,void *param);
void hashT_clean(unsigned int ticks,void *param);
static cmd_export_t cmds[]=
{
{"bind_pua", (cmd_function)bind_pua, 1, 0, 0},
{"send_publish", (cmd_function)send_publish, 1, 0, 0},
{"send_subscribe", (cmd_function)send_subscribe, 1, 0, 0},
{0, 0, 0, 0, 0}
};
static param_export_t params[]={
{"hash_size" , INT_PARAM, &HASH_SIZE },
{"db_url" , STR_PARAM, &db_url.s },
{"db_table" , STR_PARAM, &db_table },
{"min_expires", INT_PARAM, &min_expires },
{"default_expires", INT_PARAM, &default_expires },
{"update_period", INT_PARAM, &update_period },
{0, 0, 0 }
};
/** module exports */
struct module_exports exports= {
"pua", /* module name */
DEFAULT_DLFLAGS, /* dlopen flags */
cmds, /* exported functions */
params, /* exported parameters */
0, /* exported statistics */
0, /* exported MI functions */
0, /* exported pseudo-variables */
mod_init, /* module initialization function */
(response_function) 0, /* response handling function */
destroy, /* destroy function */
child_init /* per-child init function */
};
/**#include "../../db/db.h"
* init module function
*/
static int mod_init(void)
{
load_tm_f load_tm;
DBG("PUA: initializing module ...\n");
if(min_expires< 0)
min_expires= 0;
if(default_expires< 600)
default_expires= 3600;
/* import the TM auto-loading function */
if((load_tm=(load_tm_f)find_export("load_tm", 0, 0))==NULL)
{
LOG(L_ERR, "PUA:mod_init:ERROR:can't import load_tm\n");
return -1;
}
/* let the auto-loading function load all TM stuff */
if(load_tm(&tmb)==-1)
{
LOG(L_ERR, "PUA:mod_init:ERROR can't load tm functions\n");
return -1;
}
db_url.len = db_url.s ? strlen(db_url.s) : 0;
DBG("PUA:mod_init: db_url=%s/%d/%p\n", ZSW(db_url.s), db_url.len, db_url.s);
/* binding to mysql module */
if (bind_dbmod(db_url.s, &pua_dbf))
{
DBG("PUA:mod_init: ERROR: Database module not found\n");
return -1;
}
if (!DB_CAPABILITY(pua_dbf, DB_CAP_ALL)) {
LOG(L_ERR,"PUA:mod_init: ERROR Database module does not implement "
"all functions needed by the module\n");
return -1;
}
pua_db = pua_dbf.init(db_url.s);
if (!pua_db)
{
LOG(L_ERR,"PUA:mod_init: Error while connecting database\n");
return -1;
}
if(HASH_SIZE<=1)
HASH_SIZE= 512;
else
HASH_SIZE = 1<<HASH_SIZE;
HashT= new_htable();
if(HashT== NULL)
{
LOG(L_ERR, "PUA:mod_init: ERROR while creating new hash table\n");
return -1;
}
if(db_restore()< 0)
{
LOG(L_ERR, "PUA:mod_init: ERROR while restoring hash_table\n");
return -1;
}
if(update_period<=0)
{
DBG("PUA: ERROR: mod_init: wrong clean_period \n");
return -1;
}
startup_time = (int) time(NULL);
register_timer(hashT_clean, 0, update_period);
register_timer(db_update, 0, update_period);
if(pua_db)
pua_dbf.close(pua_db);
pua_db = NULL;
return 0;
}
static int child_init(int rank)
{
DBG("PUA: init_child [%d] pid [%d]\n", rank, getpid());
if (pua_dbf.init==0)
{
LOG(L_CRIT, "Pua:child_init: database not bound\n");
return -1;
}
pua_db = pua_dbf.init(db_url.s);
if (!pua_db)
{
LOG(L_ERR,"Pua:child %d: Error while connecting database\n",
rank);
return -1;
}
else
{
if (pua_dbf.use_table(pua_db, db_table) < 0)
{
LOG(L_ERR, "Pua:child %d: Error in use_table pua\n", rank);
return -1;
}
DBG("Pua:child %d: Database connection opened successfully\n", rank);
}
return 0;
}
static void destroy(void)
{
DBG("PUA: destroying module ...\n");
db_update(0,0);
if(HashT)
destroy_htable();
if(pua_db)
pua_dbf.close(pua_db);
return ;
}
int db_restore()
{
ua_pres_t* p= NULL;
db_key_t result_cols[12];
db_res_t *res= NULL;
db_row_t *row = NULL;
db_val_t *row_vals= NULL;
str pres_uri, pres_id;
str etag, tuple_id;
str watcher_uri, call_id;
str to_tag, from_tag;
int size= 0, i;
result_cols[0] ="pres_uri";
result_cols[1] ="pres_id";
result_cols[2] ="expires";
result_cols[3] ="flag";
result_cols[4] ="etag";
result_cols[5] ="tuple_id";
result_cols[6] ="watcher_uri";
result_cols[7] ="call_id";
result_cols[8] ="to_tag";
result_cols[9] ="from_tag";
result_cols[10]="cseq";
if(!pua_db)
{
LOG(L_ERR,"PUA: db_restore: ERROR null database connection\n");
return -1;
}
if(pua_dbf.use_table(pua_db, db_table)< 0)
{
LOG(L_ERR, "PUA: db_restore:ERROR in use table\n");
return -1;
}
if(pua_dbf.query(pua_db,0, 0, 0, result_cols,0, 11, 0,&res)< 0)
{
LOG(L_ERR, "PUA: db_restore:ERROR while querrying table\n");
if(res)
{
pua_dbf.free_result(pua_db, res);
res = NULL;
}
return -1;
}
if(res== NULL)
return -1;
if(res->n<=0)
{
LOG(L_INFO, "PUA: db_restore:the query returned no result\n");
pua_dbf.free_result(pua_db, res);
res = NULL;
return 0;
}
DBG("PUA: db_restore: found %d db entries\n", res->n);
for(i =0 ; i< res->n ; i++)
{
row = &res->rows[i];
row_vals = ROW_VALUES(row);
pres_uri.s= row_vals[0].val.str_val.s;
pres_uri.len = strlen(pres_uri.s);
pres_id.s= row_vals[1].val.str_val.s;
pres_id.len = strlen(pres_id.s);
memset(&etag, 0, sizeof(str));
memset(&tuple_id, 0, sizeof(str));
memset(&watcher_uri, 0, sizeof(str));
memset(&call_id, 0, sizeof(str));
memset(&to_tag, 0, sizeof(str));
memset(&from_tag, 0, sizeof(str));
if(row_vals[4].val.str_val.s)
{
etag.s= row_vals[4].val.str_val.s;
etag.len = strlen(etag.s);
tuple_id.s= row_vals[5].val.str_val.s;
tuple_id.len = strlen(tuple_id.s);
}
if(row_vals[6].val.str_val.s)
{
watcher_uri.s= row_vals[6].val.str_val.s;
watcher_uri.len = strlen(watcher_uri.s);
call_id.s= row_vals[7].val.str_val.s;
call_id.len = strlen(call_id.s);
to_tag.s= row_vals[8].val.str_val.s;
to_tag.len = strlen(to_tag.s);
from_tag.s= row_vals[9].val.str_val.s;
from_tag.len = strlen(from_tag.s);
}
size= sizeof(ua_pres_t)+ sizeof(str)+ pres_uri.len+ pres_id.len;
if(etag.len)
size+= etag.len+ tuple_id.len;
else
size+= sizeof(str)+ watcher_uri.len+ call_id.len+ to_tag.len+
from_tag.len;
p= (ua_pres_t*)shm_malloc(size);
if(p== NULL)
{
LOG(L_ERR, "PUA: db_restore: Error no more share memmory");
goto error;
}
memset(p, 0, size);
size= sizeof(ua_pres_t);
p->pres_uri= (str*)((char*)p+ size);
size+= sizeof(str);
p->pres_uri->s= (char*)p + size;
memcpy(p->pres_uri->s, pres_uri.s, pres_uri.len);
p->pres_uri->len= pres_uri.len;
size+= pres_uri.len;
p->id.s= (char*)p + size;
memcpy(p->id.s, pres_id.s, pres_id.len);
p->id.len= pres_id.len;
size+= pres_id.len;
p->expires= row_vals[2].val.int_val;
p->flag|= row_vals[3].val.int_val;
p->db_flag|= INSERTDB_FLAG;
if(etag.len)
{
p->etag.s= (char*)p+ size;
memcpy(p->etag.s, etag.s, etag.len);
p->etag.len= etag.len;
size+= etag.len;
p->tuple_id.s= (char*)p + size;
memcpy(p->tuple_id.s, tuple_id.s, tuple_id.len);
p->tuple_id.len= tuple_id.len;
size+= tuple_id.len;
}
else
{
p->watcher_uri= (str*)((char*)p+ size);
size+= sizeof(str);
p->watcher_uri->s= (char*)p+ size;
memcpy(p->watcher_uri->s, watcher_uri.s, watcher_uri.len);
p->watcher_uri->len= watcher_uri.len;
size+= watcher_uri.len;
p->to_tag.s= (char*)p+ size;
memcpy(p->to_tag.s, to_tag.s, to_tag.len);
p->to_tag.len= to_tag.len;
size+= to_tag.len;
p->from_tag.s= (char*)p+ size;
memcpy(p->from_tag.s, from_tag.s, from_tag.len);
p->from_tag.len= from_tag.len;
size+= from_tag.len;
p->call_id.s= (char*)p + size;
memcpy(p->call_id.s, call_id.s, call_id.len);
p->call_id.len= call_id.len;
size+= call_id.len;
p->cseq= row_vals[10].val.int_val;
}
insert_htable(p);
}
if(res)
{
pua_dbf.free_result(pua_db, res);
res = NULL;
}
if(pua_dbf.delete(pua_db, 0, 0 , 0, 0) < 0)
{
LOG(L_ERR,"pua:db_restore:ERROR while deleting information from db\n");
goto error;
}
return 0;
error:
if(res)
pua_dbf.free_result(pua_db, res);
if(p)
shm_free(p);
return -1;
}
void hashT_clean(unsigned int ticks,void *param)
{
int i;
ua_pres_t* p= NULL, *q= NULL;
DBG("PUA: hashT_clean ..\n");
for(i= 0;i< HASH_SIZE; i++)
{
lock_get(&HashT->p_records[i].lock);
p= HashT->p_records[i].entity->next;
while(p)
{
if(p->expires< (int)(time)(NULL)+10)
{
if(p->desired_expires> p->expires+ 10 || (p->desired_expires && p->watcher_uri))
{
if(update_pua(p)< 0)
{
LOG(L_ERR, "PUA: hashT_clean: Error while updating\n");
lock_release(&HashT->p_records[i].lock);
return;
}
}
p= p->next;
}
else
if(p->expires< (int)(time)(NULL))
{
q= p->next;
delete_htable(p);
p= q;
}
else
p= p->next;
}
lock_release(&HashT->p_records[i].lock);
}
}
int update_pua(ua_pres_t* p)
{
int size= 0;
hentity_t* hentity= NULL;
str* str_hdr= NULL;
size= sizeof(hentity_t)+ sizeof(str)+ (p->pres_uri->len+ 1)*sizeof(char);
if(p->watcher_uri)
size+= sizeof(str)+ p->watcher_uri->len;
else
size+= p->id.len;
hentity= (hentity_t*)shm_malloc(size);
if(hentity== NULL)
{
LOG(L_ERR, "PUA: update_pua: ERROR no more share memory\n");
goto error;
}
memset(hentity, 0, size);
size = sizeof(hentity_t);
hentity->pres_uri = (str*)((char*)hentity + size);
size+= sizeof(str);
hentity->pres_uri->s = (char*)hentity+ size;
memcpy(hentity->pres_uri->s, p->pres_uri->s ,
p->pres_uri->len ) ;
hentity->pres_uri->len= p->pres_uri->len;
size+= p->pres_uri->len;
if(p->watcher_uri)
{
hentity->watcher_uri=(str*)((char*)hentity+ size);
size+= sizeof(str);
hentity->watcher_uri->s= (char*)hentity+ size;
memcpy(hentity->watcher_uri->s, p->watcher_uri->s,p->watcher_uri->len);
hentity->watcher_uri->len= p->watcher_uri->len;
size+= p->watcher_uri->len;
}
else
{
hentity->id.s = ((char*)hentity+ size);
memcpy(hentity->id.s, p->id.s, p->id.len);
hentity->id.len= p->id.len;
size+= p->id.len;
}
hentity->flag|= p->flag;
if(p->watcher_uri== NULL)
{
str met= {"PUBLISH", 7};
str_hdr = publ_build_hdr(p->desired_expires- (int)time(NULL), &p->etag, 0);
if(str_hdr == NULL)
{
LOG(L_ERR, "PUA: update_pua: ERROR while building extra_headers\n");
goto error;
}
DBG("PUA: update_pua: str_hdr:\n%.*s\n ", str_hdr->len, str_hdr->s);
tmb.t_request(&met, /* Type of the message */
p->pres_uri, /* Request-URI */
p->pres_uri, /* To */
p->pres_uri, /* From */
str_hdr, /* Optional headers */
0, /* Message body */
publ_cback_func, /* Callback function */
(void*)hentity /* Callback parameter */
);
}
else
{
int expires;
str met= {"SUBSCRIBE", 9};
dlg_t* td= NULL;
td= pua_build_dlg_t(p);
if(td== NULL)
{
LOG(L_ERR, "PUA:update_pua: Error while building tm dlg_t"
"structure");
goto error;
}
if(p->desired_expires== 0)
expires= 3600;
else
expires= p->desired_expires- (int)time(NULL);
str_hdr= subs_build_hdr(p->watcher_uri, expires, p->event);
if(str_hdr== NULL || str_hdr->s== NULL)
{
LOG(L_ERR, "PUA:send_subscribe: Error while building extra headers\n");
return -1;
}
tmb.t_request_within
(&met,
str_hdr,
0,
td,
subs_cback_func,
(void*)hentity
);
pkg_free(td);
td= NULL;
}
pkg_free(str_hdr);
return 0;
error:
if(str_hdr)
pkg_free(str_hdr);
if(hentity)
pkg_free(hentity);
return -1;
}
void db_update(unsigned int ticks,void *param)
{
ua_pres_t* p= NULL;
db_key_t q_cols[13];
db_res_t *res= NULL;
db_key_t db_cols[3];
db_val_t q_vals[13], db_vals[3];
db_op_t db_ops[1] ;
int n_query_cols= 0;
int n_update_cols= 0;
int i;
/* cols and values used for insert */
q_cols[0] ="pres_uri";
q_vals[0].type = DB_STR;
q_vals[0].nul = 0;
q_cols[1] ="pres_id";
q_vals[1].type = DB_STR;
q_vals[1].nul = 0;
q_cols[2] ="flag";
q_vals[2].type = DB_INT;
q_vals[2].nul = 0;
q_cols[3] ="watcher_uri";
q_vals[3].type = DB_STR;
q_vals[3].nul = 0;
q_cols[4] ="tuple_id";
q_vals[4].type = DB_STR;
q_vals[4].nul = 0;
q_cols[5] ="etag";
q_vals[5].type = DB_STR;
q_vals[5].nul = 0;
q_cols[6] ="call_id";
q_vals[6].type = DB_STR;
q_vals[6].nul = 0;
q_cols[7] ="to_tag";
q_vals[7].type = DB_STR;
q_vals[7].nul = 0;
q_cols[8] ="from_tag";
q_vals[8].type = DB_STR;
q_vals[8].nul = 0;
q_cols[9]="cseq";
q_vals[9].type = DB_INT;
q_vals[9].nul = 0;
q_cols[10] ="expires";
q_vals[10].type = DB_INT;
q_vals[10].nul = 0;
/* cols and values used for update */
db_cols[0]= "expires";
db_vals[0].type = DB_INT;
db_vals[0].nul = 0;
db_cols[1]= "cseq";
db_vals[1].type = DB_INT;
db_vals[1].nul = 0;
DBG("PUA:db_update ...\n");
if(pua_db== NULL)
{
LOG(L_ERR,"PUA: db_update: ERROR null database connection\n");
return;
}
if(pua_dbf.use_table(pua_db, db_table)< 0)
{
LOG(L_ERR, "PUA: db_update:ERROR in use table\n");
return ;
}
for(i=0; i<HASH_SIZE; i++)
{
lock_get(&HashT->p_records[i].lock);
p = HashT->p_records[i].entity->next;
while(p)
{
if(p->expires - (int)time(NULL)< 0)
{
p= p->next;
continue;
}
switch(p->db_flag)
{
case NO_UPDATEDB_FLAG:
{
DBG("PUA: db_update: NO_UPDATEDB_FLAG\n");
break;
}
case UPDATEDB_FLAG:
{
DBG("PUA: db_update: UPDATEDB_FLAG\n ");
n_update_cols= 1;
n_query_cols= 3;
q_vals[0].val.str_val = *(p->pres_uri);
q_vals[1].val.str_val = p->id;
q_vals[2].val.int_val = p->flag;
db_vals[0].val.int_val= p->expires;
if(p->watcher_uri) /* for subscribe */
{
q_vals[n_query_cols].val.str_val = *(p->watcher_uri);
n_query_cols= 4;
db_vals[1].val.int_val= p->cseq ;
n_update_cols= 2;
}
DBG("PUA: db_update: Updating ..n_query_cols= %d\t"
" n_update_cols= %d\n", n_query_cols, n_update_cols);
if(pua_dbf.query(pua_db, q_cols, 0, q_vals,
0, n_query_cols, 0, 0, &res)< 0)
{
LOG(L_ERR, "PUA: db_update:ERROR while querying"
" database");
lock_release(&HashT->p_records[i].lock);
if(res)
pua_dbf.free_result(pua_db, res);
return ;
}
if(res && res->n> 0)
{
if(pua_dbf.update(pua_db, q_cols, 0, q_vals, db_cols,
db_vals, n_query_cols, n_update_cols)<0)
{
LOG(L_ERR, "PUA: db_update: ERROR while updating"
" in database");
lock_release(&HashT->p_records[i].lock);
pua_dbf.free_result(pua_db, res);
res= NULL;
return ;
}
pua_dbf.free_result(pua_db, res);
res= NULL;
}
else
{
if(res)
{
pua_dbf.free_result(pua_db, res);
res= NULL;
}
DBG("PUA:db_update: UPDATEDB_FLAG and no record"
" found\n");
}
break;
}
case INSERTDB_FLAG:
{
DBG("PUA: db_update: INSERTDB_FLAG\n ");
q_vals[0].val.str_val = *(p->pres_uri);
q_vals[1].val.str_val = p->id;
q_vals[2].val.int_val = p->flag;
if((p->watcher_uri))
q_vals[3].val.str_val = *(p->watcher_uri);
else
memset(& q_vals[3].val.str_val ,0, sizeof(str));
q_vals[4].val.str_val = p->tuple_id;
q_vals[5].val.str_val = p->etag;
q_vals[6].val.str_val = p->call_id;
q_vals[7].val.str_val = p->to_tag;
q_vals[8].val.str_val = p->from_tag;
q_vals[9].val.int_val= p->cseq;
q_vals[10].val.int_val = p->expires;
if(pua_dbf.insert(pua_db, q_cols, q_vals, 11)<0)
{
LOG(L_ERR, "PUA: db_update: ERROR while inserting"
" into table pua\n");
lock_release(&HashT->p_records[i].lock);
return ;
}
break;
}
}
if(!(p->db_flag & NO_UPDATEDB_FLAG))
{
p->db_flag= NO_UPDATEDB_FLAG;
}
p= p->next;
}
lock_release(&HashT->p_records[i].lock);
}
db_vals[0].val.int_val= (int)time(NULL);
db_ops[0]= OP_LT;
if(pua_dbf.delete(pua_db, db_cols, db_ops, db_vals, 1) < 0)
{
LOG(L_ERR,"PUA: db_update: ERROR cleaning expired"
" information\n");
}
return ;
}
syntax highlighted by Code2HTML, v. 0.9.1