/* cdrdao - write audio CD-Rs in disc-at-once mode * * Copyright (C) 1998-2001 Andreas Mueller * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #include #include #include #include #include #include #include #include #include #include #ifdef linux #include #include #endif #ifdef HAVE_SYS_MMAN_H #include #endif #ifdef USE_POSIX_THREADS #include #else #include #include #endif #include "dao.h" #include "port.h" #include "util.h" #define DEBUG_WRITE 0 #if defined(__FreeBSD__) #define IPC_ARG_T void #else #define IPC_ARG_T msgbuf #endif struct ShmSegment { int id; char *buffer; }; struct Buffer { long bufLen; // number of blocks in buffer that should be written TrackData::Mode mode; // data mode for writing TrackData::Mode trackMode; // mode of track may differ from 'mode' if data // blocks must be encoded in audio blocks, // only used for message printing TrackData::SubChannelMode subChanMode; // sub-channel data mode int trackNr; // if != 0 a new track with given number has started int trackProgress; // reading progress of current track 0..1000 char *buffer; // address of buffer that should be written }; struct BufferHeader { long buffersRead; // number of blocks that are read and put to the buffer long buffersWritten; // number of blocks that were taken from the buffer int buffersFilled; // set to 1 by reader process when buffer is filled the // first time int readerFinished; int readerTerminated; int terminateReader; long nofBuffers; // number of available buffers Buffer *buffers; }; // buffer size in blocks int BUFFER_SIZE = 75; static int TERMINATE = 0; static int getSharedMemory(long nofBuffers, BufferHeader **header, long *nofSegments, ShmSegment **shmSegments); static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegments); static RETSIGTYPE terminationRequest(int sig) { if (sig == SIGQUIT || sig == SIGTERM) TERMINATE = 1; #if 0 if (sig == SIGCHLD) { message(0, "SIGCHLD received."); } #endif } #ifndef USE_POSIX_THREADS // Waits or polls for termination of a child process. // noHang: 0: wait until child terminates, 1: just poll if child terminated // status: filled with status information, only valid if 0 is returned // return: 0: child exited // 1: no child exited, can only happen if 'noHang' is 1 // 2: wait failed, 'errno' contains cause static int waitForChild(int noHang, int *status) { int ret; do { if (noHang) ret = wait3(status, WNOHANG, NULL); else ret = wait(status); if (ret > 0) return 0; if (ret < 0 && errno != EINTR #ifdef ERESTARTSYS && errno != ERESTARTSYS #endif ) { return 2; } } while (ret < 0); return 1; } #endif // Blocks all signals that are handled by this module. static void blockSignals() { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGCHLD); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGTERM); #ifdef USE_POSIX_THREADS #ifdef HAVE_PTHREAD_SIGMASK pthread_sigmask(SIG_BLOCK, &set, NULL); #endif #else sigprocmask(SIG_BLOCK, &set, NULL); #endif } // Blocks all signals that are handled by this module. static void unblockSignals() { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGCHLD); sigaddset(&set, SIGQUIT); sigaddset(&set, SIGTERM); #ifdef USE_POSIX_THREADS #ifdef HAVE_PTHREAD_SIGMASK pthread_sigmask(SIG_UNBLOCK, &set, NULL); #endif #else sigprocmask(SIG_UNBLOCK, &set, NULL); #endif } // return: 0: OK // 1: child process terminated and has been collected with 'wait()' // 2: error -> child process must be terminated static int writer(const Toc *toc, CdrDriver *cdr, BufferHeader *header, long lba, int speed) { long total = toc->length().lba() * AUDIO_BLOCK_LEN; long totalTracks = toc->nofTracks(); long cnt = 0; long blkCount = 0; long len = 0; long cntMb; long lastMb = 0; long buffered; int buffFill; int writerFill = 0; int minFill = 100; int maxFill = 0; int actTrackNr = 0; long actProgress; TrackData::Mode dataMode; TrackData::SubChannelMode subChanMode; #ifndef USE_POSIX_THREADS int status; #endif message (3, "Waiting for reader process"); while (header->buffersFilled == 0) { sleep(1); if (header->readerTerminated) { message(-2, "Reader process terminated abnormally."); return 1; } #ifndef USE_POSIX_THREADS // Check if child has terminated switch (waitForChild(1, &status)) { case 0: // Child exited message(-2, "Reader process terminated abnormally."); return 1; case 2: message(-2, "wait failed: %s", strerror(errno)); return 2; } #endif } #if DEBUG_WRITE FILE *fp = fopen("test.out", "w"); #endif message (3, "Awaken, will start writing"); if (cdr != NULL) { cdr->sendWriteCdProgressMsg(CdrDriver::WCD_LEADIN, totalTracks, 0, 0, 0, 100); blockSignals(); if (cdr->startDao() != 0) { unblockSignals(); return 2; } unblockSignals(); } do { //message(4, "Slave: waiting for master."); while (header->buffersWritten == header->buffersRead) { if (header->readerTerminated) { message(-2, "Reader process terminated abnormally."); return 1; } #ifndef USE_POSIX_THREADS // Check if child has terminated switch (waitForChild(1, &status)) { case 0: // Child exited message(-2, "Reader process terminated abnormally."); return 1; case 2: message(-2, "wait failed: %s", strerror(errno)); return 2; } #endif mSleep(10); } Buffer &buf = header->buffers[header->buffersWritten % header->nofBuffers]; len = buf.bufLen; dataMode = buf.mode; subChanMode = buf.subChanMode; if (header->readerFinished) { buffFill = 100; if (maxFill == 0) maxFill = 100; } else { buffered = header->buffersRead - header->buffersWritten; if (buffered == header->nofBuffers || buffered == header->nofBuffers - 1) { buffFill = 100; } else { buffFill = 100 * buffered; buffFill /= header->nofBuffers; } if (buffFill > maxFill) maxFill = buffFill; } if (buffFill < minFill) minFill = buffFill; if (len == 0) { // all data is written message(1, ""); if (cdr == NULL) message(1, "Read %ld blocks.", blkCount); else message(1, "Wrote %ld blocks. Buffer fill min %d%%/max %d%%.", blkCount, minFill, maxFill); #if DEBUG_WRITE if (fp != NULL) fclose(fp); #endif if (cdr != NULL) { cdr->sendWriteCdProgressMsg(CdrDriver::WCD_LEADOUT, totalTracks, 0xaa, 1000, 1000, 100); blockSignals(); if (cdr->finishDao() != 0) { unblockSignals(); return 2; } unblockSignals(); } return 0; } cnt += len * AUDIO_BLOCK_LEN; blkCount += len; if (buf.trackNr > 0) { message(1, "Writing track %02d (mode %s/%s %s)...", buf.trackNr, TrackData::mode2String(buf.trackMode), TrackData::mode2String(dataMode), TrackData::subChannelMode2String(subChanMode)); actTrackNr = buf.trackNr; } //message(4, "Slave: writing buffer %p (%ld).", buf, len); #if DEBUG_WRITE if (fp != NULL) { if (cdr != NULL) { message(0, "dao: blockSize: %ld", cdr->blockSize(dataMode, subChanMode)); fwrite(buf.buffer, cdr->blockSize(dataMode, subChanMode), len, fp); } else { fwrite(buf.buffer, 2352, len, fp); } } #endif // Write track data. if (cdr != NULL) { blockSignals(); if (cdr->writeData(dataMode, subChanMode, lba, buf.buffer, len) != 0) { message(-2, "Writing failed - buffer under run?"); unblockSignals(); return 2; } // Print stat line update every megabyte. cntMb = cnt >> 20; if (cntMb > lastMb) { long totalcap, availcap; if (cdr->readBufferCapacity(&totalcap, &availcap)) { writerFill = (int)((1.0 - ((double)availcap / (double)totalcap)) * 100.0); message(1, "Wrote %ld of %ld MB (Buffers %3d%% %3d%%).\r", cnt >> 20, total >> 20, buffFill, writerFill); } else { message(1, "Wrote %ld of %ld MB (Buffer %3d%%).\r", cnt >> 20, total >> 20, buffFill); } lastMb = cntMb; } unblockSignals(); actProgress = cnt; actProgress /= total / 1000; cdr->sendWriteCdProgressMsg(CdrDriver::WCD_DATA, totalTracks, actTrackNr, buf.trackProgress, actProgress, buffFill, writerFill); } else { if (speed > 0) { message(1, "Read %ld of %ld MB (Buffer %3d%%).\r", cnt >> 20, total >> 20, buffFill); mSleep(1000 / speed); } else { message(1, "Read %ld of %ld MB.\r", cnt >> 20, total >> 20); } } header->buffersWritten += 1; } while (!TERMINATE); message(-1, "Writing/simulation/read-test aborted on user request."); return 2; } struct ReaderArgs { const Toc *toc; CdrDriver *cdr; int swap; BufferHeader *header; long startLba; }; static void *reader(void *args) { const Toc *toc = ((ReaderArgs*)args)->toc; CdrDriver *cdr = ((ReaderArgs*)args)->cdr; int swap = ((ReaderArgs*)args)->swap; BufferHeader *header = ((ReaderArgs*)args)->header; long lba = ((ReaderArgs*)args)->startLba + 150; // used to encode the sector // header (MSF) long length = toc->length().lba(); long n, rn; int first = header->nofBuffers; const Track *track; int trackNr = 1; TrackData::Mode dataMode; TrackData::SubChannelMode subChanMode; int encodingMode = 0; int subChanEncodingMode = 1; int newTrack; long tact; // number of blocks already read from current track long tprogress; setRealTimeScheduling(4); giveUpRootPrivileges(); if (cdr != NULL) { if (cdr->bigEndianSamples() == 0) { // swap samples for little endian recorders swap = !swap; } encodingMode = cdr->encodingMode(); } message(4, "Swap: %d", swap); TrackIterator itr(toc); TrackReader reader; track = itr.first(); reader.init(track); if (reader.openData() != 0) { message(-2, "Opening of track data failed."); goto fail; } newTrack = 1; tact = 0; dataMode = (encodingMode == 0) ? TrackData::AUDIO : track->type(); subChanMode = track->subChannelType(); if (cdr != NULL) subChanEncodingMode = cdr->subChannelEncodingMode(subChanMode); do { n = (length > BUFFER_SIZE ? BUFFER_SIZE : length); Buffer &buf = header->buffers[header->buffersRead % header->nofBuffers]; do { rn = reader.readData(encodingMode, subChanEncodingMode, lba, buf.buffer, n); if (rn < 0) { message(-2, "Reading of track data failed."); goto fail; } if (rn == 0) { track = itr.next(); reader.init(track); if (reader.openData() != 0) { message(-2, "Opening of track data failed."); goto fail; } trackNr++; if (encodingMode != 0) dataMode = track->type(); subChanMode = track->subChannelType(); if (cdr != NULL) subChanEncodingMode = cdr->subChannelEncodingMode(subChanMode); newTrack = 1; tact = 0; } } while (rn == 0); lba += rn; tact += rn; if (cdr != NULL && ((track->type() == TrackData::AUDIO && swap) || (encodingMode == 0 && cdr->bigEndianSamples() == 0))) { // swap audio data long blockLen = cdr->blockSize(dataMode, subChanMode); char *brun = buf.buffer; int i; for (i = 0; i < rn; i++, brun += blockLen) swapSamples((Sample *)brun, SAMPLES_PER_BLOCK); } buf.bufLen = rn; buf.mode = dataMode; buf.trackMode = track->type(); buf.subChanMode = subChanMode; tprogress = tact * 1000; tprogress /= track->length().lba(); buf.trackProgress = tprogress; if (newTrack) { // inform write process that it should print message about new track buf.trackNr = trackNr; } else { buf.trackNr = 0; } header->buffersRead += 1; length -= rn; if (first > 0) { first--; if (first == 0 || length == 0) { message (3, "Buffer filled"); header->buffersFilled = 1; } } // wait for writing process to finish writing of previous buffer //message(4, "Reader: waiting for Writer."); while (header->buffersRead - header->buffersWritten == header->nofBuffers && header->terminateReader == 0) { mSleep(10); } newTrack = 0; } while (length > 0 && header->terminateReader == 0); header->readerFinished = 1; if (header->terminateReader == 0) { Buffer &buf1 = header->buffers[header->buffersRead % header->nofBuffers]; buf1.bufLen = 0; buf1.trackNr = 0; header->buffersRead += 1; } #ifndef USE_POSIX_THREADS // wait until we get killed while (1) sleep(1000); exit(0); #endif return NULL; fail: header->readerTerminated = 1; #ifndef USE_POSIX_THREADS exit(1); #endif return NULL; } int writeDiskAtOnce(const Toc *toc, CdrDriver *cdr, int nofBuffers, int swap, int testMode, int speed) { int err = 0; BufferHeader *header = NULL; long nofShmSegments = 0; ShmSegment *shmSegments = NULL; long startLba = 0; #ifdef USE_POSIX_THREADS pthread_t readerThread; pthread_attr_t readerThreadAttr; int threadStarted = 0; #else int pid = 0; int status; #endif #if 1 if (nofBuffers < 10) { nofBuffers = 10; message(-1, "Adjusted number of FIFO buffers to 10."); } #endif if (getSharedMemory(nofBuffers, &header, &nofShmSegments, &shmSegments) != 0) { releaseSharedMemory(nofShmSegments, shmSegments); return 1; } header->buffersRead = 0; header->buffersWritten = 0; header->buffersFilled = 0; header->readerFinished = 0; header->readerTerminated = 0; header->terminateReader = 0; TERMINATE = 0; installSignalHandler(SIGINT, SIG_IGN); installSignalHandler(SIGPIPE, SIG_IGN); installSignalHandler(SIGALRM, SIG_IGN); installSignalHandler(SIGCHLD, terminationRequest); installSignalHandler(SIGQUIT, terminationRequest); installSignalHandler(SIGTERM, terminationRequest); if (!testMode) { const DiskInfo *di; if (cdr->initDao(toc) != 0) { err = 1; goto fail; } if ((di = cdr->diskInfo()) != NULL) { startLba = di->thisSessionLba; } } // start reader process #ifdef USE_POSIX_THREADS if (pthread_attr_init(&readerThreadAttr) != 0) { message(-2, "pthread_attr_init failed: %s", strerror(errno)); err = 1; goto fail; } ReaderArgs rargs; rargs.toc = toc; rargs.cdr = cdr; rargs.swap = swap; rargs.header = header; rargs.startLba = startLba; if (pthread_create(&readerThread, &readerThreadAttr, reader, &rargs) != 0) { message(-2, "Cannot create thread: %s", strerror(errno)); pthread_attr_destroy(&readerThreadAttr); err = 1; goto fail; } else { threadStarted = 1; } #else /* USE_POSIX_THREADS */ if ((pid = fork()) == 0) { // we are the new process setsid(); // detach from controlling terminal #ifdef HAVE_MLOCKALL if (geteuid() == 0) { if (mlockall(MCL_CURRENT|MCL_FUTURE) != 0) { message(-1, "Cannot lock memory pages: %s", strerror(errno)); } message(4, "Reader process memory locked"); } #endif ReaderArgs rargs; rargs.toc = toc; rargs.cdr = cdr; rargs.swap = swap; rargs.header = header; rargs.startLba = startLba; reader(&rargs); } else if (pid < 0) { message(-2, "fork failed: %s", strerror(errno)); err = 1; goto fail; } #endif /* USE_POSIX_THREADS */ switch (setRealTimeScheduling(5)) { case 1: message(-1, "No super user permission to setup real time scheduling."); break; case 2: message(2, "Real time scheduling not available."); break; } #ifdef HAVE_MLOCKALL if (geteuid() == 0) { if (mlockall(MCL_CURRENT|MCL_FUTURE) != 0) { message(-1, "Cannot lock memory pages: %s", strerror(errno)); } message(4, "Memory locked"); } #endif giveUpRootPrivileges(); switch (writer(toc, cdr, header, startLba, speed)) { case 1: // error, reader process terminated abnormally #ifndef USE_POSIX_THREADS pid = 0; #endif err = 1; break; case 2: // error, reader process must be terminated err = 1; break; } if (err != 0 && cdr != NULL) cdr->abortDao(); // abort writing process fail: #ifdef HAVE_MUNLOCKALL munlockall(); #endif #ifdef USE_POSIX_THREADS if (threadStarted) { header->terminateReader = 1; if (pthread_join(readerThread, NULL) != 0) { message(-2, "pthread_join failed: %s", strerror(errno)); err = 1; } pthread_attr_destroy(&readerThreadAttr); } #else if (pid != 0) { if (kill(pid, SIGKILL) == 0) { waitForChild(0, &status); } } #endif releaseSharedMemory(nofShmSegments, shmSegments); installSignalHandler(SIGINT, SIG_DFL); installSignalHandler(SIGPIPE, SIG_DFL); installSignalHandler(SIGALRM, SIG_DFL); installSignalHandler(SIGCHLD, SIG_DFL); installSignalHandler(SIGQUIT, SIG_DFL); installSignalHandler(SIGTERM, SIG_DFL); return err; } #ifdef USE_POSIX_THREADS static int getSharedMemory(long nofBuffers, BufferHeader **header, long *nofSegments, ShmSegment **shmSegment) { long b; long bufferSize = BUFFER_SIZE * (AUDIO_BLOCK_LEN + PW_SUBCHANNEL_LEN); *header = NULL; *nofSegments = 0; *shmSegment = NULL; if (nofBuffers <= 0) { return 1; } *shmSegment = new ShmSegment; *nofSegments = 1; (*shmSegment)->id = -1; (*shmSegment)->buffer = new char[sizeof(BufferHeader) + nofBuffers * sizeof(Buffer) + nofBuffers * bufferSize]; if ( (*shmSegment)->buffer == NULL) { message(-2, "Cannot allocated memory for ring buffer."); return 1; } *header = (BufferHeader*)((*shmSegment)->buffer); (*header)->nofBuffers = nofBuffers; (*header)->buffers = (Buffer*)((*shmSegment)->buffer + sizeof(BufferHeader)); char *bufferBase = (*shmSegment)->buffer + sizeof(BufferHeader) + nofBuffers * sizeof(Buffer); for (b = 0; b < nofBuffers; b++) (*header)->buffers[b].buffer = bufferBase + b * bufferSize; return 0; } static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegment) { if (shmSegment == NULL || nofSegments == 0) return; if (shmSegment->buffer != NULL) { delete[] shmSegment->buffer; shmSegment->buffer = NULL; } delete shmSegment; } #else /* USE_POSIX_THREADS */ static int getSharedMemory(long nofBuffers, BufferHeader **header, long *nofSegments, ShmSegment **shmSegments) { long i, b; long bufferSize = BUFFER_SIZE * (AUDIO_BLOCK_LEN + PW_SUBCHANNEL_LEN); long maxSegmentSize = 0; long bcnt = 0; *header = NULL; *nofSegments = 0; *shmSegments = NULL; if (nofBuffers <= 0) { return 1; } #if defined(linux) && defined(IPC_INFO) struct shminfo info; if (shmctl(0, IPC_INFO, (struct shmid_ds*)&info) < 0) { message(-1, "Cannot get IPC info: %s", strerror(errno)); maxSegmentSize = 4 * 1024 * 1024; message(-1, "Assuming %ld MB shared memory segment size.", maxSegmentSize >> 20); } else { maxSegmentSize = info.shmmax; } #elif defined(__FreeBSD__) maxSegmentSize = 4 * 1024 * 1024; // 4 MB #else maxSegmentSize = 1 * 1024 * 1024; // 1 MB #endif message(4, "Shm max segement size: %ld (%ld MB)", maxSegmentSize, maxSegmentSize >> 20); if (maxSegmentSize < sizeof(BufferHeader) + nofBuffers * sizeof(Buffer)) { message(-2, "Shared memory segment cannot hold a single buffer."); return 1; } maxSegmentSize -= sizeof(BufferHeader) + nofBuffers * sizeof(Buffer); long buffersPerSegment = maxSegmentSize / bufferSize; if (buffersPerSegment == 0) { message(-2, "Shared memory segment cannot hold a single buffer."); return 1; } *nofSegments = nofBuffers / buffersPerSegment; if (nofBuffers % buffersPerSegment != 0) *nofSegments += 1; *shmSegments = new ShmSegment[*nofSegments]; message(4, "Using %ld shared memory segments.", *nofSegments); for (i = 0; i < *nofSegments; i++) { (*shmSegments)[i].id = -1; (*shmSegments)[i].buffer = NULL; } long bufCnt = nofBuffers; long n; long segmentLength; char *bufferBase; for (i = 0; i < *nofSegments; i++) { n = (bufCnt > buffersPerSegment ? buffersPerSegment : bufCnt); segmentLength = n * bufferSize; if (*header == NULL) { // first segment contains the buffer header segmentLength += sizeof(BufferHeader) + nofBuffers * sizeof(Buffer); } (*shmSegments)[i].id = shmget(IPC_PRIVATE, segmentLength, 0600|IPC_CREAT); if ((*shmSegments)[i].id < 0) { message(-2, "Cannot create shared memory segment: %s", strerror(errno)); message(-2, "Try to reduce the buffer count (option --buffers)."); return 1; } (*shmSegments)[i].buffer = (char *)shmat((*shmSegments)[i].id, 0, 0); if (((*shmSegments)[i].buffer) == NULL || ((*shmSegments)[i].buffer) == (char *)-1) { (*shmSegments)[i].buffer = NULL; message(-2, "Cannot get shared memory: %s", strerror(errno)); message(-2, "Try to reduce the buffer count (option --buffers)."); return 1; } if (*header == NULL) { bufferBase = (*shmSegments)[i].buffer + sizeof(BufferHeader) + nofBuffers * sizeof(Buffer); *header = (BufferHeader*)(*shmSegments)[i].buffer; (*header)->nofBuffers = nofBuffers; (*header)->buffers = (Buffer*)((*shmSegments)[i].buffer + sizeof(BufferHeader)); } else { bufferBase = (*shmSegments)[i].buffer; } for (b = 0; b < n; b++) (*header)->buffers[bcnt++].buffer = bufferBase + b * bufferSize; bufCnt -= n; } assert(bcnt == nofBuffers); return 0; } static void releaseSharedMemory(long nofSegments, ShmSegment *shmSegments) { long i; if (shmSegments == NULL || nofSegments == 0) return; for (i = 0; i < nofSegments; i++) { if (shmSegments[i].id >= 0) { if (shmSegments[i].buffer != NULL) { if (shmdt(shmSegments[i].buffer) != 0) { message(-2, "shmdt: %s", strerror(errno)); } } if (shmctl(shmSegments[i].id, IPC_RMID, NULL) != 0) { message(-2, "Cannot remove shared memory: %s", strerror(errno)); } } } delete[] shmSegments; } #endif /* USE_POSIX_THREADS */