/* * Sockets-based client/server class library * sockobj.cc Copyright (C) 1995, 96, 97 Alex Belits * * This source/code is public free; you can distribute it and/or modify * it under terms of the GNU General Library Public License (published * by the Free Software Foundation) either version two of this License, * or any later version. * */ #include #include #include #include #include #include #include #include #include #ifdef AIX4 #include #else #include #endif #ifdef SOLARIS extern "C"{ void bzero(void *s, size_t n); int gethostname(char *name, int namelen); } #endif #ifdef IRIX #ifdef SIGARGS #undef SIGARGS #endif #define SIGARGS int sig #ifdef SIGACTARGS #undef SIGACTARGS #endif #define SIGACTARGS int sig #endif #include #include #include #include #include #include /* for ALIGN macro needed by CMSG_DATA */ #include #include #include #include #include #include #include #ifdef PSEUDOPOLL #include "pseudopoll.h" #else #ifdef DIGITAL #include #else #include #endif #endif #include "sockobj.h" #ifndef SIGARGS #define SIGARGS int sig #endif #ifndef SIGACTARGS #define SIGACTARGS int sig #endif #define MY_PROTOCOL 0 /* TCP/IP */ /**************************************** *** errno definition ****************************************/ #ifndef ERRNO_DEFINED extern int sys_nerr; extern char *sys_errlist[]; extern int errno; #endif void cleanupserverdriver(SIGARGS); /**************************************** *** common daemon global variables ****************************************/ int logfd=-1; /* log file descriptor for daemon */ int tmplogfd=-1; /* temporary additional log fd */ pid_t g_pid=-1; /* daemon pid */ int maxhandles=256; /* max handles per process */ int global_nconnections=0; /* number of connections */ int global_npipes=0; /* number of pipes and sockets to processes */ /**************************************** *** fatal error message (stderr may be *** directed to /dev/null after daemon *** installation, so it will be useless) ****************************************/ void fatal(char *fmt,...){ int err; va_list ap; va_start(ap,fmt); err=errno; vfprintf(stderr,fmt,ap); if(!err){ fprintf(stderr," (Error information not available)"); }else{ if(err=0) write(tmplogfd,buffer,strlen(buffer)); va_end(ap); } /**************************************** *** log message ****************************************/ void log(char *fmt,...){ char buffer[10240]; int l; va_list ap; time_t t; va_start(ap,fmt); if(logfd<0) return; time(&t); memcpy(buffer,ctime(&t),27); sprintf(buffer+24,"[%d] ",getpid()); l=strlen(buffer); #ifdef NO_SNPRINTF vsprintf(buffer+l,fmt,ap); #else vsnprintf(buffer+l,10238-l,fmt,ap); #endif buffer[10238]=0; strcat(buffer,"\n"); write(logfd,buffer,strlen(buffer)); if(tmplogfd>=0) write(tmplogfd,buffer,strlen(buffer)); va_end(ap); } /**************************************** *** daemon cleanup function, executed *** after any kind of killing except *** SIGKILL ****************************************/ void cleanup(SIGARGS){ int i; log("Exiting"); for(i=0;igr_gid; strcpy(filename,"/tmp/"); }else{ gid=getgid(); filename[0]=0; if(seteuid(uid)<0){ fatal("%s: Can\'t set effective user ID in non-root mode",cmd); return -1; } } if(!logfilename){ filename[255]=0; strncat(filename,cmd,255-strlen(filename)); strncat(filename,".log",255-strlen(filename)); logfd=open(filename,O_WRONLY|O_APPEND|O_CREAT,S_IRUSR|S_IWUSR); }else{ logfd=open(logfilename,O_WRONLY|O_APPEND|O_CREAT,S_IRUSR|S_IWUSR); } if(logfd<0){ fatal("%s: Can\'t create log file",cmd); return -1; } logfd=fdge3(logfd); tmplogfd=fdge3(tmplogfd); fl=fcntl(logfd,F_GETFD,0); if(fl<0||fcntl(logfd,F_SETFD,fl|FD_CLOEXEC)<0){ logfd=-1; fatal("%s: Can\'t set log file flags",cmd); close(logfd); return -1; } if(fstat(logfd,&sbuf)<0){ fatal("%s: Can\'t get log file ownership",cmd); close(logfd); return -1; } if(sbuf.st_uid||sbuf.st_gid!=gid){ if(fchown(logfd,uid,gid)<0){ if(!uid){ fatal("%s: Can\'t set log file ownership",cmd); close(logfd); return -1; }else{ errlog("Warning: Can\'t set log file ownership"); } } } #ifdef AIX maxhandles=OPEN_MAX; #else if(getrlimit(RLIMIT_NOFILE,&rl)<0){ fatal("%s: Can\'t get file limit",cmd); close(logfd); return -1; } if(rl.rlim_max==RLIM_INFINITY) maxhandles=1024; else maxhandles=rl.rlim_max; #endif struct sigaction tmpsigaction; bzero((char*)&tmpsigaction,sizeof(struct sigaction)); tmpsigaction.sa_handler=mywait; sigaddset(&tmpsigaction.sa_mask,SIGCHLD); tmpsigaction.sa_flags=SA_RESTART; sigaction(SIGCHLD,&tmpsigaction,NULL); tmpsigaction.sa_handler=droppipes; sigemptyset(&tmpsigaction.sa_mask); sigaddset(&tmpsigaction.sa_mask,SIGPIPE); tmpsigaction.sa_flags=SA_RESTART; sigaction(SIGPIPE,&tmpsigaction,NULL); if((pid=fork())<0){ fatal("%s: Can\'t fork()",cmd); close(logfd); return -1; } if(pid){ if(tmplogfd>=0) close(tmplogfd); if(doexit) doexit(); _exit(0); } setsid(); if((pid=fork())<0){ fatal("%s: Can\'t fork",cmd); close(logfd); return -1; } if(pid){ _exit(0); } g_pid=getpid(); signal(SIGHUP,SIG_IGN); signal(SIGUSR1,SIG_IGN); signal(SIGUSR2,SIG_IGN); signal(SIGINT,cleanup); signal(SIGQUIT,cleanupabort); //signal(SIGSEGV,cleanupabort); //signal(SIGILL,cleanupabort); //signal(SIGTRAP,cleanupabort); //signal(SIGFPE,cleanupabort); //signal(SIGBUS,cleanupabort); signal(SIGTERM,cleanup); signal(SIGCONT,SIG_DFL); signal(SIGTTIN,SIG_IGN); signal(SIGTTOU,SIG_IGN); log("%s started, process %d",cmd,g_pid); for(i=0;ih_addr))->s_addr); #ifndef DOMAINNAME_ABUSE strncpy(hostname,host->h_name,255); hostname[255]=0; #endif return 0; } ServerConnectionApp::ServerConnectionApp(ServerConnection *xconnection){ connection=xconnection; } ServerConnectionApp::~ServerConnectionApp(void){ } int _bcounter=0; // global buffers counter Wheel *glob_wheel=NULL; // global wheel pointer /**************************************** *** virtual functions ****************************************/ void ServerSocket::emptyfn(ServerConnection *client){ } void ServerSocket::connectfn(ServerConnection *client){ } void ServerSocket::disconnectfn(ServerConnection *client){ } void ServerSocket::splitfn(ServerConnection *client,ServerProcess *process){ } void ServerSocket::splitparentfn(ServerConnection *client,ServerProcess *process){ } void ServerSocket::unsplitfn(ServerConnection *client,ServerProcess *process){ } void ServerSocket::afterunsplitfn(ServerConnection *client,ServerProcess *process){ } int ServerSocket::prioritydatafn(ServerConnection *client){ return -1; } int ServerSocket::datafn(ServerConnection *client){ return 0; } int ServerSocket::pipeprioritydatafn(ServerConnection *client,ServerProcess *process){ return 0; } int ServerSocket::pipedatafn(ServerConnection *client,ServerProcess *process){ return 0; } ServerSocket::ServerSocket(char *xname,int xport,int xmaxclients,__s32 xlocaladdr){ if(!this) return; maxclients=xmaxclients; localaddr=xlocaladdr; if(localaddr==0) localaddr=INADDR_ANY; name=(char*)malloc(strlen(xname)+1); if(!name){ h=-1; return; } wheel=NULL; indexforwheel=-1; handleindexforwheel=-1; strcpy(name,xname); port=xport; bzero((char*)&myaddress,sizeof(myaddress)); myaddress.sin_family=AF_INET; myaddress.sin_addr.s_addr=htonl(localaddr); myaddress.sin_port=htons(port); if(xmaxclients>0){ status=0; h=socket(AF_INET,SOCK_STREAM,MY_PROTOCOL); if(h<0){ errlog("ServerSocket::ServerSocket: Can\'t create socket at port %d",port); free(name); name=NULL; h=-1; return; } setsockopt(h,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse)); if(bind(h,(struct sockaddr*)&myaddress,sizeof(myaddress))){ errlog("ServerSocket::ServerSocket: Can\'t bind socket at port %d",port); close(h); free(name); name=NULL; h=-1; return; } if(listen(h,maxclients)){ errlog("ServerSocket::ServerSocket: Can\'t listen socket at port %d",port); close(h); free(name); name=NULL; h=-1; return; } fcntl(h,F_SETFL,FNDELAY); // to avoid blocking on failed accept() #ifdef DEBUG log("ServerSocket::ServerSocket: created, handle %d",h); #endif }else{ status=1; h=-1; } } ServerSocket::~ServerSocket(void){ #ifdef DEBUG log("ServerSocket::~ServerSocket: deleting"); #endif if(name) free(name); if(h>=0) close(h); if(wheel&&indexforwheel>=0) wheel->removeserversocket(indexforwheel); } #ifndef BUFFERSPERCONNECTION #define BUFFERSPERCONNECTION 64 #endif ServerConnection::ServerConnection(ServerSocket *xsocket,ClientConnection *xclient){ char s[128]; if(!this) return; passingwritebuffers=NULL; npassingwritebuffers=0; destr=0; app=NULL; wheel=NULL; indexforwheel=-1; socket=xsocket; global_nconnections++; if(xsocket->status==0){ hisaddresslength=128; h=accept(xsocket->gethandle(),(struct sockaddr*)s,(socklen_t*)&hisaddresslength); if(h<0) return; hisaddress=(struct sockaddr*)malloc(hisaddresslength); if(hisaddress) memcpy(hisaddress,s,hisaddresslength); else{ close(h); h=-1; return; } #ifdef DEBUG log("ServerConnection::ServerConnection: connected"); #endif #ifdef SOCKOPTS_NEEDED setsockopt(h,SOL_SOCKET,SO_KEEPALIVE,(char*)&alive,sizeof(alive)); setsockopt(h,SOL_SOCKET,SO_LINGER,(char*)&llinger,sizeof(struct linger)); #endif }else{ if(!xclient){ #ifdef DEBUG log("ServerConnection::ServerConnection: no client connection"); #endif hisaddress=NULL; h=-1; return; } h=dup(xclient->h); if(h>=0){ hisaddresslength=sizeof(struct sockaddr_in); hisaddress=(struct sockaddr*)malloc(hisaddresslength); if(hisaddress) memcpy(hisaddress,&(xclient->address),hisaddresslength); else{ #ifdef DEBUG log("ServerConnection::ServerConnection: insufficient memory"); #endif close(h); h=-1; return; } #ifdef SOCKOPTS_NEEDED setsockopt(h,SOL_SOCKET,SO_KEEPALIVE,(char*)&alive,sizeof(alive)); setsockopt(h,SOL_SOCKET,SO_LINGER,(char*)&llinger,sizeof(struct linger)); #endif }else{ #ifdef DEBUG log("ServerConnection::ServerConnection: client connection not opened"); #endif h=-1; hisaddress=NULL; } /*attach client as new server here*/ } if(h>=0){ npassingwritebuffers=BUFFERSPERCONNECTION; passingwritebuffers=(ServerConnection***)malloc(npassingwritebuffers*sizeof(ServerConnection**)); if(passingwritebuffers) bzero((char*)passingwritebuffers,npassingwritebuffers*sizeof(ServerConnection**)); else npassingwritebuffers=0; } } /**************************************** *** constructor for existing connections ****************************************/ ServerConnection::ServerConnection(int xh,struct sockaddr *xhisaddress,int xhisaddresslength,ServerSocket *xsocket){ if(!this) return; passingwritebuffers=NULL; npassingwritebuffers=0; destr=0; app=NULL; h=xh; wheel=NULL; indexforwheel=-1; socket=xsocket; global_nconnections++; if(xhisaddress){ hisaddress=(struct sockaddr*)malloc(xhisaddresslength); if(!hisaddress){ h=-1; return; } memcpy(hisaddress,xhisaddress,xhisaddresslength); hisaddresslength=xhisaddresslength; } npassingwritebuffers=BUFFERSPERCONNECTION; passingwritebuffers=(ServerConnection***)malloc(npassingwritebuffers*sizeof(ServerConnection**)); if(passingwritebuffers) bzero((char*)passingwritebuffers,npassingwritebuffers*sizeof(ServerConnection**)); else npassingwritebuffers=0; #ifdef DEBUG log("ServerConnection::ServerConnection: address copied"); #endif } ServerConnection::~ServerConnection(void){ int i; ServerProcess *p; #ifdef DEBUG log("ServerConnection::~ServerConnection: called"); #endif if(destr){ log("Error: ServerConnection::~ServerConnection: destructor recursion"); _exit(1); } destr=1; if(app) delete app; if(socket) socket->disconnectfn(this); if(h>=0){ #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); } if(hisaddress) free(hisaddress); if(wheel){ #ifdef DEBUG log("ServerConnection::~ServerConnection: Client disconnected, line %d",indexforwheel); #endif for(i=0;ifdarraysize;i++){ if((p=wheel->getprocess(i))){ #ifdef DEBUG log("ServerConnection::~ServerConnection: process exists, %ld",p); #endif if(p->clientconnection==this){ if(!(p->destr)){ #ifdef DEBUG log("ServerConnection::~ServerConnection: deleting process, %ld line %d",p,i); #endif delete p; }else{ #ifdef DEBUG log("ServerConnection::~ServerConnection: not deleting process, %ld line %d",p,i); #endif } } } } if(wheel->gethandle(indexforwheel)>=0){ if(wheel->getbufferbynumber(indexforwheel)) delete wheel->getbufferbynumber(indexforwheel); wheel->setbufferbynumber(indexforwheel,NULL); if(wheel->gethandle(indexforwheel)!=h){ log("Error: Connection handle mismatch"); #ifdef SHUTDOWN_NEEDED shutdown(wheel->gethandle(indexforwheel),2); #endif close(wheel->gethandle(indexforwheel)); } wheel->sethandle(indexforwheel,-1); wheel->setevents(indexforwheel,0); } wheel->setclient(indexforwheel,NULL); } #ifdef DEBUG log("ServerConnection::~ServerConnection: connection killed"); #endif if(npassingwritebuffers){ for(i=0;imaxexternalbuffers){ tmpbuf=realloc(externalbuffers,sizeof(struct iovec)*i); if(!tmpbuf){ return -1; } externalbuffers=(struct iovec*)tmpbuf; tmpbuf=realloc(freebuffers,sizeof(char*)*i); if(!tmpbuf){ return -1; } freebuffers=(char**)tmpbuf; } } externalbuffers[nexternalbuffers].iov_base=extbuffer; externalbuffers[nexternalbuffers].iov_len=extbuffersize; freebuffers[nexternalbuffers]=freebuffer; totalextbufferssize+=extbuffersize; nexternalbuffers++; if(nexternalbuffers>maxexternalbuffers) maxexternalbuffers=nexternalbuffers; return extbuffersize; } #ifndef MAX_CONNECTIONS_IN_ONE_SEND #define MAX_CONNECTIONS_IN_ONE_SEND 24 /* R. Stevens, TCP/IP Illustrated, Volume 3, Addison-Wesley 1994, p. 272 */ #endif /**************************************** *** send data from buffer ****************************************/ int WriteBuffer::writeout(int h){ int l; struct iovec iov[2]; struct msghdr msg; #ifdef BSD43_MSGHDR char fdtmpbuffer[MAX_CONNECTIONS_IN_ONE_SEND*sizeof(int)]; #else struct cmsghdr *cm; char fdtmpbuffer[sizeof(struct cmsghdr)+MAX_CONNECTIONS_IN_ONE_SEND*sizeof(int)]; cm=(struct cmsghdr*)fdtmpbuffer; cm->cmsg_level=SOL_SOCKET; cm->cmsg_type=SCM_RIGHTS; #endif int maxsendconnections; // no, it's always initialized when used, //disregard the compiler warning int i,tmptail; errno=0; if(nbytes){ if(rptr-buff+nbytes>size){ iov[0].iov_base=rptr; iov[0].iov_len=size-(rptr-buff); iov[1].iov_base=buff; iov[1].iov_len=nbytes-iov[0].iov_len; #ifdef DEBUG log("sending %d",nbytes); #endif if(connections){ msg.msg_iov=iov; msg.msg_iovlen=2; msg.msg_name=(caddr_t)0; msg.msg_namelen=0; #ifdef BSD43_MSGHDR msg.msg_accrights=NULL; msg.msg_accrightslen=0; #else msg.msg_control=NULL; msg.msg_controllen=0; msg.msg_flags=0; #endif if(connectionsinbuffer){ maxsendconnections=connectionsinbuffer; if(maxsendconnections>MAX_CONNECTIONS_IN_ONE_SEND){ maxsendconnections=MAX_CONNECTIONS_IN_ONE_SEND; } /* // Don't pass more bytes than connections if connections are present if(maxsendconnections=maxsendconnections){ msg.msg_iovlen=1; iov[0].iov_len=maxsendconnections; }else{ iov[1].iov_len=maxsendconnections-iov[0].iov_len; } }else{ //??? } */ // If connections are left beyond the chunk that can be sent, // leave the number of bytes enough to pass those connections, if possible if(maxsendconnections=nbytes-lbytes){ msg.msg_iovlen=1; iov[0].iov_len=nbytes-lbytes; }else{ iov[1].iov_len=nbytes-lbytes-iov[0].iov_len; } }else{ //??? } } #ifdef BSD43_MSGHDR msg.msg_accrightslen=maxsendconnections*sizeof(int); msg.msg_accrights=(caddr_t)fdtmpbuffer; tmptail=connectionstail; #ifdef DEBUG log("sending %d connections...",maxsendconnections); #endif for(i=0;igethandle(); }else{ ((int*)msg.msg_accrights)[i]=0; /*stdin*/ } #else cm->cmsg_len=maxsendconnections*sizeof(int)+sizeof(struct cmsghdr); msg.msg_control=(caddr_t)cm; msg.msg_controllen=cm->cmsg_len; tmptail=connectionstail; #ifdef DEBUG log("sending %d connections...",maxsendconnections); #endif for(i=0;igethandle(); }else{ ((int*)CMSG_DATA(cm))[i]=0; /*stdin*/ } #else if(connections[tmptail]){ ((int*)cm->cmsg_data)[i]=connections[tmptail]->gethandle(); }else{ ((int*)cm->cmsg_data)[i]=0; /*stdin*/ } #endif #endif tmptail++; if(tmptail>=connectionssize) tmptail=0; } } l=sendmsg(h,&msg,0); if(l>=0&&connectionsinbuffer){ for(i=0;i=connectionssize) connectionstail=0; } } }else{ l=writev(h,iov,2); } #ifdef DEBUG log("sent %d",l); #endif if(errno&&errno!=EAGAIN&&errno!=EMSGSIZE) errno1=EPIPE; if(l<0) l=0; nbytes-=l; rptr+=l; if(rptr>=buff+size) rptr-=size; if(errno==EMSGSIZE){ log("writeout: buffer error, fallback"); errno=0; l=write(h,rptr,size-(rptr-buff)); if(errno&&errno!=EAGAIN) errno1=EPIPE; if(l<0) l=0; nbytes-=l; rptr+=l; if(rptr>=buff+size){ rptr=buff; l=write(h,rptr,nbytes); if(errno&&errno!=EAGAIN) errno1=EPIPE; if(l<0) l=0; nbytes-=l; rptr+=l; } } }else{ #ifdef DEBUG log("sending %d",nbytes); #endif if(connections){ iov[0].iov_base=rptr; iov[0].iov_len=nbytes; msg.msg_iov=iov; msg.msg_iovlen=1; msg.msg_name=(caddr_t)0; msg.msg_namelen=0; #ifdef BSD43_MSGHDR msg.msg_accrights=NULL; msg.msg_accrightslen=0; #else msg.msg_control=NULL; msg.msg_controllen=0; msg.msg_flags=0; #endif if(connectionsinbuffer){ maxsendconnections=connectionsinbuffer; if(maxsendconnections>MAX_CONNECTIONS_IN_ONE_SEND){ maxsendconnections=MAX_CONNECTIONS_IN_ONE_SEND; } /* // Don't pass more bytes than connections if connections are present if(maxsendconnections=maxsendconnections){ iov[0].iov_len=maxsendconnections; } }else{ //??? } */ // If connections are left beyond the chunk that can be sent, // leave the number of bytes enough to pass those connections, if possible if(maxsendconnectionsgethandle(); }else{ ((int*)msg.msg_accrights)[i]=0; /*stdin*/ } #else cm->cmsg_len=maxsendconnections*sizeof(int)+sizeof(struct cmsghdr); msg.msg_control=(caddr_t)cm; msg.msg_controllen=cm->cmsg_len; tmptail=connectionstail; #ifdef DEBUG log("sending %d connections...",maxsendconnections); #endif for(i=0;igethandle(); }else{ ((int*)CMSG_DATA(cm))[i]=0; /*stdin*/ } #else if(connections[tmptail]){ ((int*)cm->cmsg_data)[i]=connections[tmptail]->gethandle(); }else{ ((int*)cm->cmsg_data)[i]=0; /*stdin*/ } #endif #endif tmptail++; if(tmptail>=connectionssize) tmptail=0; } } l=sendmsg(h,&msg,0); if(l>=0&&connectionsinbuffer){ for(i=0;i=connectionssize) connectionstail=0; } } }else{ l=write(h,rptr,nbytes); } #ifdef DEBUG log("sent %d",l); #endif if(errno&&errno!=EAGAIN&&errno!=EMSGSIZE) errno1=EPIPE; if(l<0) l=0; nbytes-=l; rptr+=l; if(errno==EMSGSIZE){ log("writeout: buffer error, fallback"); errno=0; l=write(h,rptr,nbytes); if(errno&&errno!=EAGAIN) errno1=EPIPE; if(l<0) l=0; nbytes-=l; rptr+=l; } } if(rptr>=buff+size) rptr=buff; } if(!nbytes&&nexternalbuffers){ int deleteextbuffers=0; l=writev(h,externalbuffers,nexternalbuffers); if(l<0) l=0; totalextbufferssize-=l; if(totalextbufferssize<0){ l+=totalextbufferssize; totalextbufferssize=0; } for(i=0;isize){ l=size-(rptr-buff); memcpy(xbuff,rptr,l); p=xbuff+l; memcpy(p,buff,nbytes-l); }else{ memcpy(xbuff,rptr,nbytes); } } free(buff); buff=xbuff; size=xsize; rptr=buff; wptr=buff+nbytes; return 0; } } return -1; } /**************************************** *** purge buffer ****************************************/ void Wheel::purgebuffer(int index){ if(buffers[index]) buffers[index]->empty(); } /**************************************** *** flush buffer ****************************************/ int Wheel::flushbuffer(int index){ int n=0; if(!buffers[index]) return -1; errno=EAGAIN; while(buffers[index]->query()&&errno==EAGAIN){ n=buffers[index]->writeout(clients[index]->gethandle()); } return n; } /**************************************** *** add server socket to wheel ****************************************/ int Wheel::addserversocket(ServerSocket *s){ int i,j; i=searchempty(sockets,socketarraysize); if(i>=0){ sockets[i]=s; s->wheel=this; s->indexforwheel=i; j=searchempty(fdarray,fdarraysize); if(j>=0){ if(j>=nindex) nindex=j+1; fdarray[j].fd=s->gethandle(); fdarray[j].events=POLLIN; fdarray[j].revents=0; pollout_allowed[j]=0; s->handleindexforwheel=j; } } return i; } /**************************************** *** add server connection to wheel ****************************************/ int Wheel::addserverconnection(ServerConnection *s){ int h,position; h=s->gethandle(); if(h>=0){ fcntl(h,F_SETFL,FNDELAY); position=addnewhandle(h,POLLIN|POLLPRI|POLLOUT_ALLOWED,buffsize); if(position>=0){ clients[position]=s; if(clients[position]->gethandle()<0){ if(!clients[position]->destr) delete clients[position]; #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); } clients[position]->wheel=this; clients[position]->indexforwheel=position; }else{ #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); #ifdef DEBUG log("Wheel::addserverconnection: no space"); #endif return -1; } }else{ #ifdef DEBUG log("Wheel::addserverconnection: not connected"); #endif return -1; } s->socket->connectfn(s); return 0; } /**************************************** *** remove server socket from wheel ****************************************/ void Wheel::removeserversocket(int i){ int j; if(sockets[i]){ for(j=0;jsocket==sockets[i]){ if(!clients[j]->destr) delete clients[j]; } } } if(sockets[i]->handleindexforwheel>=0){ fdarray[sockets[i]->handleindexforwheel].fd=-1; fdarray[sockets[i]->handleindexforwheel].events=0; fdarray[sockets[i]->handleindexforwheel].revents=0; pollout_allowed[sockets[i]->handleindexforwheel]=0; if(sockets[i]->handleindexforwheel+1==nindex){ nindex--; while(nindex>0&&fdarray[nindex-1].fd<0) nindex--; } } sockets[i]->wheel=NULL; sockets[i]->indexforwheel=-1; sockets[i]->handleindexforwheel=-1; sockets[i]=NULL; } } /**************************************** *** stop loop ****************************************/ void Wheel::stoploop(void){ lflag=0; } ServerProcess::ServerProcess(Wheel *xwheel,int i,int xbuffsize,int keeppipes, int xcloseonexit,int xnevercloseonexit){ int ch0,newfdpos,k,j; pid_t r; int fd1[2]; int fd2[2]; if(!this) return; destr=0; writehandle=-1; readhandle=-1; writehandlepos=-1; readhandlepos=-1; wheel=xwheel; indexforwheel=i; buffsize=xbuffsize; whoami=PARENT; pid=-1; clientconnection=NULL; clientbuffer=NULL; if(wheel){ if(wheel->processes){ if(wheel->processes[indexforwheel]) indexforwheel=wheel->searchempty(wheel->processes,wheel->fdarraysize); }else indexforwheel=-1; }else indexforwheel=-1; closeonexit=xcloseonexit; nevercloseonexit=xnevercloseonexit; ch0=wheel->getclientbynumber(i)->gethandle(); switch(keeppipes){ case 1: if(pipe(fd1)){ buffsize=0; r=-1; return; } if(pipe(fd2)){ close(fd1[0]); close(fd1[1]); buffsize=0; r=-1; return; } break; case 2: if(socketpair(AF_UNIX,SOCK_STREAM,0,fd1)){ buffsize=0; r=-1; return; } fd2[0]=fd1[1]; fd2[1]=fd1[0]; break; default:; } clientconnection=wheel->getclientbynumber(i); clientbuffer=wheel->getbufferbynumber(i); wheel->stoppolling(i); if((r=fork())>0){ pid=r; whoami=PARENT; if(keeppipes){ readhandle=fd1[0]; writehandle=fd2[1]; close(fd1[1]); if(keeppipes==1){ close(fd2[0]); newfdpos=wheel->addnewhandle(readhandle,POLLIN,0); if(newfdpos<0){ close(readhandle); close(writehandle); readhandle=-1; writehandle=-1; buffsize=0; kill(r,SIGKILL); wheel->startpolling(i); r=-1; return; } readhandlepos=newfdpos; wheel->setreadhandle(this,readhandlepos); newfdpos=wheel->addnewhandle(writehandle,POLLOUT_ALLOWED,buffsize); if(newfdpos<0) wheel->removepipes(readhandlepos,-1); }else{ newfdpos=wheel->addnewhandle(writehandle,POLLOUT_ALLOWED|POLLIN,buffsize,buffsize); readhandlepos=newfdpos; wheel->setreadhandle(this,readhandlepos); } if(newfdpos<0){ wheel->setreadhandle(NULL,readhandlepos); close(readhandle); close(writehandle); readhandle=-1; writehandle=-1; readhandlepos=-1; buffsize=0; kill(r,SIGKILL); wheel->startpolling(i); r=-1; return; } writehandlepos=newfdpos; } wheel->purgebuffer(i); wheel->setprocess(indexforwheel,this); if(keeppipes){ fcntl(readhandle,F_SETFL,FNDELAY); global_npipes++; if(readhandle!=writehandle){ fcntl(writehandle,F_SETFL,FNDELAY); global_npipes++; } } #ifdef DEBUG log("ServerProcess::ServerProcess: Process %ld created",r); #endif if(clientconnection) clientconnection->socket->splitparentfn(clientconnection,this); return; } if(r<0){ wheel->startpolling(i); errlog("ServerProcess::ServerProcess: Can\'t fork()"); r=-1; buffsize=0; return; } pid=getpid(); whoami=ITSELF; wheel->current_process=this; wheel->setprocess(indexforwheel,this); clientconnection=wheel->getclientbynumber(i); clientbuffer=wheel->getbufferbynumber(i); if(keeppipes){ close(fd1[0]); if(keeppipes==1){ close(fd2[1]); } readhandle=fd2[0]; writehandle=fd1[1]; #ifdef DEBUG log("readhandle %d, writehandle %d",readhandle,writehandle); #endif } fcntl(ch0,F_SETFL,0); for(k=wheel->nindex-1;k>=0;k--){ j=wheel->fdarray[k].fd; if(j>=0){ if(j!=readhandle&&j!=writehandle&&j!=ch0){ close(j); wheel->fdarray[k].fd=-1; wheel->fdarray[k].events=0; wheel->fdarray[k].revents=0; wheel->pollout_allowed[k]=0; if(k+1==wheel->nindex){ wheel->nindex=k; } } } } close(tmplogfd); tmplogfd=-1; errno=EAGAIN; wheel->flushbuffer(i); if(errno&&errno==EPIPE){ _exit(3); } if(clientconnection) clientconnection->socket->splitfn(clientconnection,this); mainfunction(); #ifdef SHUTDOWN_NEEDED if(closeonexit&&!nevercloseonexit) shutdown(ch0,2); #endif close(ch0); _exit(closeonexit?3:2); } int ServerProcess::killme(int sig){ if(pid<=0) return -1; return kill(pid,sig); } /**************************************** *** main user function ****************************************/ void ServerProcess::mainfunction(void){ } ServerProcess::~ServerProcess(void){ ServerProcess *tprocess; int i,dontstoppoll; #ifdef DEBUG log("ServerProcess::~ServerProcess: called"); #endif if(destr){ log("Error: ServerProcess::~ServerProcess: destructor recursion"); _exit(1); } destr=1; if(whoami==ITSELF){ if(clientconnection) clientconnection->socket->unsplitfn(clientconnection,this); #ifdef DEBUG log("ServerProcess::~ServerProcess: Process %d at line %d exiting from itself",pid,indexforwheel); #endif if(clientconnection){ #ifdef SHUTDOWN_NEEDED if(closeonexit&&!nevercloseonexit) shutdown(clientconnection->h,2); #endif close(clientconnection->h); } _exit(closeonexit?3:2); } #ifdef DEBUG log("ServerProcess::~ServerProcess: Process %d at line %d killed by destructor from process %d",pid,indexforwheel,getpid()); #endif if(pid>0) kill(pid,SIGTERM); if(readhandle>=0){ global_npipes--; if(readhandle!=writehandle){ global_npipes--; close(writehandle); } close(readhandle); wheel->removepipes(writehandlepos,readhandlepos); } if(clientconnection){ if(closeonexit&&!nevercloseonexit){ clientconnection->socket->afterunsplitfn(clientconnection,this); if(!(clientconnection->destr)){ #ifdef DEBUG log("ServerProcess::~ServerProcess: killing connection"); #endif delete clientconnection; }else{ #ifdef DEBUG log("ServerProcess::~ServerProcess: not killing connection at line %d",clientconnection->indexforwheel); #endif } }else{ #ifdef DEBUG log("ServerProcess::~ServerProcess: start polling"); #endif dontstoppoll=0; for(i=0;ifdarraysize;i++){ if(i!=indexforwheel){ tprocess=wheel->getprocess(i); if(tprocess){ if(tprocess->clientconnection==clientconnection){ dontstoppoll=1; } } } } if(!dontstoppoll) wheel->startpolling(clientconnection->indexforwheel); clientconnection->socket->afterunsplitfn(clientconnection,this); } } wheel->setprocess(indexforwheel,NULL); } Wheel::Wheel(int nsockets,int nclients,int xbuffsize){ int i; struct rlimit rl; if(!this) return; #ifdef DEBUG log("Wheel::Wheel: called"); #endif glob_wheel=this; critical=1; buffsize=xbuffsize; fdarraysize=nclients; socketarraysize=nsockets; current_process=NULL; int currmaxhandles; #ifdef AIX maxhandles=OPEN_MAX; #else if(getrlimit(RLIMIT_NOFILE,&rl)<0){ fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't get file limit"); #endif return; } if(rl.rlim_max==RLIM_INFINITY){ maxhandles=1024; currmaxhandles=1024; }else{ maxhandles=rl.rlim_max; currmaxhandles=rl.rlim_cur; } #endif fdarraysize=(fdarraysize==-1||fdarraysize>maxhandles)?maxhandles:fdarraysize; #ifndef AIX if(fdarraysize>currmaxhandles){ rl.rlim_cur=fdarraysize; setrlimit(RLIMIT_NOFILE,&rl); } #endif log("%d handles available",fdarraysize); sockets=(ServerSocket**)malloc(socketarraysize*sizeof(ServerSocket*)); if(!sockets){ fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate sockets"); #endif return; } fdarray=(struct pollfd*)malloc(fdarraysize*sizeof(struct pollfd)); if(!fdarray){ fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate fdarray"); #endif return; } pollout_allowed=(int*)malloc(fdarraysize*sizeof(int)); if(!pollout_allowed){ free(fdarray); free(sockets); fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate pollout_allowed"); #endif return; } processes=(ServerProcess**)malloc(fdarraysize*sizeof(ServerProcess*)*2); if(!processes){ free(fdarray); free(pollout_allowed); free(sockets); fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate processes"); #endif return; }else{ readhandlepositions=processes+fdarraysize; } buffers=(WriteBuffer**)malloc(fdarraysize*sizeof(WriteBuffer*)); if(!buffers){ free(fdarray); free(pollout_allowed); free(sockets); free(processes); fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate buffers"); #endif return; } clients=(ServerConnection**)malloc(fdarraysize*sizeof(ServerConnection*)); if(!clients){ free(fdarray); free(pollout_allowed); free(sockets); free(buffers); free(processes); fdarraysize=0; socketarraysize=0; #ifdef DEBUG log("Wheel::Wheel: can't allocate clients"); #endif return; } for(i=0;i=0&&buffers[i]){ buffers[i]->empty(); buffers[i]->setcloseonexit(1);; } } } /**************************************** *** pass ****************************************/ void Wheel::onepass(void){ char s[128]; int i,jj,h,position,l,a,b,c; sockaddr_size_type hisaddresslength; do{ c=waitpid(-1,&a,WNOHANG); if(c>0){ b=a>>8; #ifdef DEBUG log("Wheel::onepass: (cleanup): Process cleanup, argument %d, process %d",b,c); #endif if(processes){ a=1; for(i=0;igetprocid()==c){ a=0; } } } if(!a){ i--; #ifdef DEBUG log("Wheel::onepass: (cleanup): Server cleanup, argument %d, line %d",b,i); #endif switch(b){ case 2: processes[i]->closeonexit=0; break; case 3: #ifdef DEBUG log("Wheel::onepass: (cleanup): Process must be disconnected, line %d",i); #endif processes[i]->closeonexit=1; break; } if(!(processes[i]->destr)) delete processes[i]; }else{ #ifdef DEBUG log("Wheel::onepass: (cleanup): Unknown process %d",c); #endif } } } }while(c>0); for(i=0;i=0&&buffers[i]){ if(buffers[i]->query() #ifdef SHUTDOWN_NEEDED ||buffers[i]->getcloseonexit()||buffers[i]->getstartpollonexit() #endif ){ if(pollout_allowed[i]) fdarray[i].events|=POLLOUT; }else{ fdarray[i].events&=~POLLOUT; } } } #ifdef PSEUDOPOLL if(pseudopoll(fdarray,nindex,10000)<0) #else if(poll(fdarray,nindex,10000)<0) #endif { for(i=0;istatus==0&&sockets[i]->handleindexforwheel>=0){ if(fdarray[sockets[i]->handleindexforwheel].revents&POLLIN){ fdarray[sockets[i]->handleindexforwheel].revents&=~POLLIN; hisaddresslength=128; h=accept(sockets[i]->gethandle(),(struct sockaddr*)s,(socklen_t*)&hisaddresslength); if(h>=0){ #ifdef DEBUG log("Wheel::onepass: accepting connection"); #endif fcntl(h,F_SETFL,FNDELAY); position=addnewhandle(h,POLLIN|POLLPRI|POLLOUT_ALLOWED,buffsize); if(position>=0){ clients[position]=new ServerConnection(h,(struct sockaddr*)s,hisaddresslength,sockets[i]); if(clients[position]->gethandle()<0){ #ifdef DEBUG log("Wheel::onepass: can't create connection"); #endif delete clients[position]; #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); }else{ #ifdef DEBUG log("Wheel::onepass: connection created, position %d",position); #endif clients[position]->wheel=this; clients[position]->indexforwheel=position; sockets[i]->connectfn(clients[position]); } }else{ #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); } #ifdef DEBUG log("Wheel::onepass: connection accepted"); #endif } } } } } for(i=0;i=0&&buffers[i]){ if(fdarray[i].revents&POLLOUT){ buffers[i]->writeout(fdarray[i].fd); if(errno&&errno!=EAGAIN) errno1=EPIPE; } #ifndef PSEUDOPOLL if(fdarray[i].revents&(POLLPRI)){ #ifdef DEBUG log("Wheel::onepass: Priority data received, line %d",i); #endif if(clients[i]){ l=clients[i]->socket->prioritydatafn(clients[i]); if(errno&&errno!=EAGAIN) errno1=EPIPE; }else{ if(readhandlepositions[i]){ #ifdef DEBUG log("Wheel::onepass: Priority data received from internal pipe at %d",i); #endif if(readhandlepositions[i]->clientconnection){ l=readhandlepositions[i]->clientconnection->socket->pipeprioritydatafn(readhandlepositions[i]->clientconnection,readhandlepositions[i]); }else{ #ifdef DEBUG log("Wheel::onepass: Priority data received from internal pipe at %d - no client connection to serve",i); #endif while(read(fdarray[i].fd,&jj,sizeof(jj))); l=-1; } }else{ #ifdef DEBUG log("Wheel::onepass: Unknown priority data received at line %d",i); #endif while(read(fdarray[i].fd,&jj,sizeof(jj))); } } } #endif // PSEUDOPOLL if(fdarray[i].revents&(POLLIN)){ #ifdef DEBUG log("Wheel::onepass: Data received, line %d",i); #endif if(clients[i]){ l=clients[i]->socket->datafn(clients[i]); if(errno&&errno!=EAGAIN) errno1=EPIPE; }else{ if(readhandlepositions[i]){ if(readhandlepositions[i]->clientconnection){ l=readhandlepositions[i]->clientconnection->socket->pipedatafn(readhandlepositions[i]->clientconnection,readhandlepositions[i]); }else{ #ifdef DEBUG log("Wheel::onepass: Data received from internal pipe at %d - no client connection to serve",i); #endif while(read(fdarray[i].fd,&jj,sizeof(jj))); l=-1; } }else{ #ifdef DEBUG log("Wheel::onepass: Unknown data received at line %d",i); #endif while(read(fdarray[i].fd,&jj,sizeof(jj))); } } } if(fdarray[i].revents&(POLLHUP|POLLNVAL|POLLERR) ||(fdarray[i].revents&(POLLIN|POLLPRI))&&!l||errno1==EPIPE){ #ifdef DEBUG log("Wheel::onepass: Client disconnected, line %d",i); #endif if(clients[i]){ if(!clients[i]->destr) delete clients[i]; } } if(fdarray[i].fd>=0&&buffers[i]){ if(!buffers[i]->query() #ifdef SHUTDOWN_NEEDED &&(fdarray[i].revents&POLLOUT) #endif ){ if(buffers[i]->getcloseonexit()){ buffers[i]->setcloseonexit(0); fdarray[i].revents&=~POLLOUT; #ifdef DEBUG log("Wheel::onepass: Client disconnected by request, line %d",i); #endif if(clients[i]){ if(!clients[i]->destr) delete clients[i]; } }else{ if(buffers[i]->getstartpollonexit()){ buffers[i]->setstartpollonexit(0); fdarray[i].revents&=~POLLOUT; #ifdef DEBUG log("Wheel::onepass: Client re-enabled polling, line %d",i); #endif if(clients[i]){ startpolling(i); } } } } } if(clients[i]) clients[i]->socket->emptyfn(clients[i]); } } for(i=0;i=0&&buffers[i]){ if(pollout_allowed[i]&&buffers[i]->query()&&!(fdarray[i].events&POLLOUT)){ buffers[i]->writeout(fdarray[i].fd); if(errno&&errno!=EAGAIN) errno1=EPIPE; } } } } } /**************************************** *** main program loop ****************************************/ void Wheel::mainloop(void){ lflag=1; do{ onepass(); }while(lflag); } Wheel::~Wheel(void){ int i; if(fdarraysize){ if(processes){ for(i=0;icloseonexit=1; processes[i]->nevercloseonexit=0; //override nevercloseonexit if(!processes[i]->destr) delete processes[i]; } free(processes); } if(clients){ for(i=0;idestr) delete clients[i]; } free(clients); } if(buffers){ for(i=0;iwheel=NULL; sockets[i]->indexforwheel=-1; } free(sockets); } } } /**************************************** *** daemon cleanup function, executed *** after SIGPIPE ****************************************/ void droppipes(SIGACTARGS){ int i; log("Connection dropped, process %d",getpid()); if(getpid()!=g_pid){ for(i=0;ipid){ #ifdef DEBUG log("cleanupserverdriver: Main server exiting"); #endif delete glob_wheel; glob_wheel=NULL; } if(c==g_pid) cleanup(0); #ifdef DEBUG if(glob_wheel->processes){ a=1; for(j=0;jfdarraysize&&a;j++){ if(glob_wheel->processes[j]){ if(glob_wheel->processes[j]->getprocid()==c){ a=0; } } } if(a) _exit(0); j--; log("cleanupserverdriver: Server killed, line %d",j); } #endif _exit(0); } /**************************************** *** add handle and create buffer ****************************************/ int Wheel::addnewhandle(int handle,int flags,int buffsize,int nconnections){ int i; i=searchempty(fdarray,fdarraysize); if(i<0) return i; buffers[i]=new WriteBuffer(buffsize,nconnections); if(!buffers[i]){ return -1; } if((buffers[i]->getsize()<=0)&&buffsize){ delete buffers[i]; buffers[i]=NULL; return -1; } if((buffers[i]->getsize()<0)){ delete buffers[i]; buffers[i]=NULL; return -1; } fdarray[i].fd=handle; fdarray[i].events=flags&~POLLOUT_ALLOWED; fdarray[i].revents=0; pollout_allowed[i]=((flags&POLLOUT_ALLOWED)==POLLOUT_ALLOWED); if(i>=nindex) nindex=i+1; return i; } ClientConnection::ClientConnection(char *xname,int xport,int xlocalport,__s32 xlocaladdr){ if(!this) return; resolved=0; h=-1; if(xname){ name=(char*)malloc(strlen(xname)+1); if(name){ strcpy(name,xname); } }else name=NULL; port=xport; localport=xlocalport; localaddr=xlocaladdr; h=socket(AF_INET,SOCK_STREAM,MY_PROTOCOL); if(h<0){ fatal("Can't create socket"); return; } if(localport>0){ if(localaddr==0) localaddr=INADDR_ANY; struct sockaddr_in myaddress; bzero((char*)&myaddress,sizeof(myaddress)); myaddress.sin_family=AF_INET; myaddress.sin_addr.s_addr=htonl(localaddr); myaddress.sin_port=htons(localport); setsockopt(h,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse)); if(bind(h,(struct sockaddr*)&myaddress,sizeof(myaddress))){ fatal("Can't bind socket"); #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); h=-1; return; } } } ClientConnection::~ClientConnection(void){ if(h>=0){ #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); } if(name) free(name); } int ClientConnection::lookupname(char *xname){ struct hostent *host; unsigned n1,n2,n3,n4; resolved=0; if(xname){ if(name) free(name); name=(char*)malloc(strlen(xname)+1); if(name){ strcpy(name,xname); } } if(name){ bzero((char*)&address,sizeof(struct sockaddr_in)); address.sin_family=AF_INET; if(*name<'0'||*name>'9'){ host=gethostbyname(name); if(!host){ fatal("Can't resolve server hostname"); return -1; } address.sin_addr.s_addr=((struct in_addr*)(host->h_addr))->s_addr; }else{ if(sscanf(name,"%u.%u.%u.%u",&n4,&n3,&n2,&n1)==4){ if(n1>255||n2>255||n3>255|n4>255){ fatal("Can't resolve server address with wrong numbers"); return -1; } address.sin_addr.s_addr=htonl((__s32)n1|(__s32)n2<<8|(__s32)n3<<16|(__s32)n4<<24); }else{ fatal("Can't resolve server address"); return -1; } } }else return -1; address.sin_port=htons(port); resolved=1; return 0; } int ClientConnection::connecttoserver(char *xname,int xport){ if(!resolved||xname) lookupname(xname); if(xport>=0) port=xport; address.sin_port=htons(port); if(connect(h,(struct sockaddr*)&address,sizeof(struct sockaddr_in))){ fatal("Can't connect socket"); #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); h=-1; return -1; } #ifdef SOCKOPTS_NEEDED setsockopt(h,SOL_SOCKET,SO_KEEPALIVE,(char*)&alive,sizeof(alive)); setsockopt(h,SOL_SOCKET,SO_LINGER,(char*)&llinger,sizeof(struct linger)); #endif return 0; } int ClientConnection::disconnectfromserver(void){ if(h>=0){ #ifdef SHUTDOWN_NEEDED shutdown(h,2); #endif close(h); h=-1; return 0; } return 0; } /* End of library */