# test_builder.py vi:ts=4:sw=4:expandtab:
#
# Copyright (c) 2006 Three Rings Design, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. Neither the name of the copyright owner nor the names of contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
""" Builder Unit Tests """
import os, shutil
from twisted.trial import unittest
from twisted.internet import reactor, defer
import farb
from farb import builder, utils
# Useful Constants
from farb.test import DATA_DIR, CMD_DIR, rewrite_config
FREEBSD_REL_PATH = os.path.join(DATA_DIR, 'buildtest')
PROCESS_LOG = os.path.join(FREEBSD_REL_PATH, 'process.log')
PROCESS_OUT = os.path.join(FREEBSD_REL_PATH, 'process.out')
BUILDROOT = os.path.join(DATA_DIR, 'buildtest')
RELEASEROOT = os.path.join(BUILDROOT, 'releaseroot')
PKGROOT = os.path.join(BUILDROOT, 'pkgroot')
CVSROOT = os.path.join(DATA_DIR, 'fakencvs')
INSTALLROOT = os.path.join(DATA_DIR, 'netinstall')
TFTPROOT = os.path.join(DATA_DIR, 'test_tftproot')
CVSTAG = 'RELENG_6_0'
CVSTAG_OLD = 'RELENG_5_3'
EXPORT_FILE = os.path.join(BUILDROOT, 'newvers.sh')
INSTALL_CFG = os.path.join(DATA_DIR, 'test_configs', 'install.cfg')
ISO_MOUNTPOINT = os.path.join(DATA_DIR, 'fake_iso_mount')
CDROM_INF_IN = os.path.join(DATA_DIR, 'test_configs', 'cdrom.inf.in')
CDROM_INF = os.path.join(ISO_MOUNTPOINT, 'cdrom.inf')
MDCONFIG_PATH = os.path.join(CMD_DIR, 'mdconfig.sh')
CHROOT_PATH = os.path.join(CMD_DIR, 'chroot.sh')
MOUNT_PATH = os.path.join(CMD_DIR, 'mount.sh')
UMOUNT_PATH = os.path.join(CMD_DIR, 'umount.sh')
PORTSNAP_PATH = os.path.join(CMD_DIR, 'portsnap.sh')
TAR_PATH = os.path.join(CMD_DIR, 'tar.sh')
CHFLAGS_PATH = os.path.join(CMD_DIR, 'chflags.sh')
ECHO_PATH = '/bin/echo'
SH_PATH = '/bin/sh'
ROOT_PATH = os.path.join(DATA_DIR, 'fake_root')
# Reach in and tweak various path constants
builder.FREEBSD_REL_PATH = FREEBSD_REL_PATH
builder.MDCONFIG_PATH = MDCONFIG_PATH
builder.CHROOT_PATH = CHROOT_PATH
builder.MOUNT_PATH = MOUNT_PATH
builder.UMOUNT_PATH = UMOUNT_PATH
builder.PORTSNAP_PATH = PORTSNAP_PATH
builder.TAR_PATH = TAR_PATH
builder.CHFLAGS_PATH = CHFLAGS_PATH
builder.ROOT_PATH = ROOT_PATH
class LoggingProcessProtocolTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _processResult(self, result):
self.log.seek(0)
self.assertEquals('hello\n', self.log.read())
self.assertEquals(result, 0)
def test_spawnProcess(self):
d = defer.Deferred()
pp = builder.LoggingProcessProtocol(d, self.log)
d.addCallback(self._processResult)
reactor.spawnProcess(pp, ECHO_PATH, [ECHO_PATH, 'hello'])
return d
def _processSuccess(self, result):
self.fail("This call should not have succeeded")
def _processError(self, result):
self.assertNotEqual(result, 0)
def test_processError(self):
d = defer.Deferred()
pp = builder.LoggingProcessProtocol(d, self.log)
d.addCallbacks(self._processSuccess, self._processError)
reactor.spawnProcess(pp, SH_PATH, [SH_PATH, '-c', 'exit 5'])
return d
class CVSCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
os.unlink(EXPORT_FILE)
os.unlink(PROCESS_LOG)
def _cvsResult(self, result):
self.assert_(os.path.exists(EXPORT_FILE))
self.assertEquals(result, 0)
def test_cvsCheckout(self):
cvs = builder.CVSCommand(CVSROOT)
d = cvs.checkout(CVSTAG, builder.NEWVERS_PATH, BUILDROOT, self.log)
d.addCallback(self._cvsResult)
return d
# Make sure cvs.checkout can handle updating an older version of an already
# checked out file.
def test_cvsUpdate(self):
cvs = builder.CVSCommand(CVSROOT)
d = cvs.checkout(CVSTAG_OLD, builder.NEWVERS_PATH, BUILDROOT, self.log)
d = cvs.checkout(CVSTAG, builder.NEWVERS_PATH, BUILDROOT, self.log)
d.addCallback(self._cvsResult)
return d
class MountCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.mc = builder.MountCommand('/dev/md0', '/mnt/md0')
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _cbMountResult(self, result):
self.log.seek(0)
# take just the first line as make tells us what directories it
# is entering and exiting on certain platforms
self.assertEquals(self.log.read(), '/dev/md0\n/mnt/md0\n')
self.assertEquals(result, 0)
def test_mount(self):
d = self.mc.mount(self.log)
d.addCallback(self._cbMountResult)
return d
def _cbUmountResult(self, result):
self.assertEquals(result, 0)
def test_umount(self):
d = self.mc.umount(self.log)
d.addCallback(self._cbUmountResult)
return d
class MountCommandTypeTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.mc = builder.MountCommand('devfs', '/dev', fstype='devfs')
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _cbMountResult(self, result):
self.log.seek(0)
# take just the first line as make tells us what directories it
# is entering and exiting on certain platforms
self.assertEquals(self.log.read(), 'devfs\n/dev\ndevfs\n')
self.assertEquals(result, 0)
def test_mount(self):
d = self.mc.mount(self.log)
d.addCallback(self._cbMountResult)
return d
class MDMountCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.mdc = builder.MDConfigCommand('/nonexistent')
self.mc = builder.MDMountCommand(self.mdc, '/mnt/md0')
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _cbMountResult(self, result):
self.assertEquals(result, 0)
def test_mount(self):
d = self.mc.mount(self.log)
d.addCallback(self._cbMountResult)
return d
def _cbUmountResult(self, result):
self.assertEquals(result, 0)
def _cbAttach(self, result):
# md device attached, try to
# umount it
d = self.mc.umount(self.log)
d.addCallback(self._cbUmountResult)
return d
def test_umount(self):
# The mdconfig device needs to be attached
# before we can detach it.
d = self.mdc.attach()
d.addCallback(self._cbAttach)
return d
class MakeCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_OUT)):
os.unlink(PROCESS_OUT)
os.unlink(PROCESS_LOG)
def _makeResult(self, result):
o = open(PROCESS_OUT, 'r')
self.assertEquals('MakeCommand 1 2\n', o.read())
o.close()
self.assertEquals(result, 0)
def test_make(self):
makeOptions = {
'TEST1' : '1',
'TEST2' : '2'
}
mc = builder.MakeCommand(BUILDROOT, ('makecommand',), makeOptions)
d = mc.make(self.log)
d.addCallback(self._makeResult)
return d
def _makeMultipleResult(self, result):
o = open(PROCESS_OUT, 'r')
self.assertEquals('MakeCommand 1 2\nMakeCommand 1 2\n', o.read())
o.close()
self.assertEquals(result, 0)
def test_makeMultiple(self):
makeOptions = {
'TEST1' : '1',
'TEST2' : '2'
}
mc = builder.MakeCommand(BUILDROOT, ('makecommand','makecommand'), makeOptions)
d = mc.make(self.log)
d.addCallback(self._makeResult)
return d
def _makeChrootResult(self, result):
self.log.seek(0)
# take just the first line as make tells us what directories it
# is entering and exiting on certain platforms
self.assertEquals(self.log.read().splitlines(1)[0], '%s /nonexistant %s -C %s makecommand\n' % (CHROOT_PATH, builder.MAKE_PATH, BUILDROOT))
self.assertEquals(result, 0)
def test_makeChroot(self):
mc = builder.MakeCommand(BUILDROOT, ('makecommand',), chrootdir='/nonexistant')
d = mc.make(self.log)
d.addCallback(self._makeChrootResult)
return d
class PortsnapCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.pc = builder.PortsnapCommand()
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _cbPortsnapResult(self, result):
self.log.seek(0)
self.assertEquals(result, 0)
def test_fetch(self):
d = self.pc.fetch(self.log)
d.addCallback(self._cbPortsnapResult)
return d
def test_extract(self):
d = self.pc.extract('/nonexistent', self.log)
d.addCallback(self._cbPortsnapResult)
return d
class ChflagsCommandTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.cc = builder.ChflagsCommand('/nonexistent')
def tearDown(self):
self.log.close()
os.unlink(PROCESS_LOG)
def _cbChflagsResult(self, result):
self.log.seek(0)
self.assertEquals(result, 0)
def test_removeAll(self):
d = self.cc.removeAll(self.log)
d.addCallback(self._cbChflagsResult)
return d
class NCVSBuildnameProcessProtocolTestCase(unittest.TestCase):
def _cvsResult(self, result):
self.assertEquals(result, '6.0-RELEASE-p4')
def test_spawnProcess(self):
d = defer.Deferred()
pp = builder.NCVSBuildnameProcessProtocol(d)
d.addCallback(self._cvsResult)
reactor.spawnProcess(pp, builder.CVS_PATH, [builder.CVS_PATH, '-d', CVSROOT, 'co', '-p', '-r', CVSTAG, builder.NEWVERS_PATH])
return d
def _cvsFailure(self, failure):
self.assert_(isinstance(failure.value, builder.CVSCommandError))
def test_failureHandling(self):
d = defer.Deferred()
pp = builder.NCVSBuildnameProcessProtocol(d)
d.addErrback(self._cvsFailure)
reactor.spawnProcess(pp, builder.CVS_PATH, [builder.CVS_PATH, 'die horribly'])
return d
class MDConfigProcessProtocolTestCase(unittest.TestCase):
def _mdResult(self, result):
self.assertEquals(result, 'md0')
def test_attach(self):
d = defer.Deferred()
pp = builder.MDConfigProcessProtocol(d)
d.addCallback(self._mdResult)
reactor.spawnProcess(pp, MDCONFIG_PATH, [MDCONFIG_PATH, '-a', '-t', 'vnode', '-f', '/nonexistent'])
return d
def _mdDetachResult(self, result):
pass
def test_detach(self):
d = defer.Deferred()
pp = builder.MDConfigProcessProtocol(d)
d.addCallback(self._mdDetachResult)
reactor.spawnProcess(pp, MDCONFIG_PATH, [MDCONFIG_PATH, '-d', '-u', 'md0'])
return d
def _mdFailure(self, failure):
self.assert_(isinstance(failure.value, builder.MDConfigCommandError))
def test_failureHandling(self):
d = defer.Deferred()
pp = builder.MDConfigProcessProtocol(d)
d.addErrback(self._mdFailure)
reactor.spawnProcess(pp, MDCONFIG_PATH, [MDCONFIG_PATH, 'die horribly'])
return d
class MDConfigCommandTestCase(unittest.TestCase):
def setUp(self):
self.mdc = builder.MDConfigCommand('/nonexistent')
def _cbAttachResult(self, result):
self.assertEquals(self.mdc.md, 'md0')
def test_attach(self):
d = self.mdc.attach()
d.addCallback(self._cbAttachResult)
return d
def _cbAttachDetachResult(self, result):
d = self.mdc.detach()
return d
def test_detach(self):
d = self.mdc.attach()
d.addCallback(self._cbAttachDetachResult)
return d
class ChrootCleanerTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.cleaner = builder.ChrootCleaner(RELEASEROOT)
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def _dirCreated(self, result):
self.assert_(os.path.isdir(RELEASEROOT))
self.assertEquals(len(os.listdir(RELEASEROOT)), 0)
def test_cleanNonexistent(self):
# Make sure we don't choke if the chroot doesn't already exist, and
# that a new empty one gets created
d = self.cleaner.clean(self.log)
d.addCallback(self._dirCreated)
return d
def test_clean(self):
# Now try the same thing with a chroot that isn't empty
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
utils.copyRecursive(ISO_MOUNTPOINT, os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH))
d = self.cleaner.clean(self.log)
d.addCallback(self._dirCreated)
return d
class ReleaseBuilderTestCase(unittest.TestCase):
def setUp(self):
self.builder = builder.ReleaseBuilder(CVSROOT, CVSTAG, RELEASEROOT, makecds=True)
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
if (os.path.exists(PROCESS_OUT)):
os.unlink(PROCESS_OUT)
def _buildResult(self, result):
o = open(PROCESS_OUT, 'r')
self.assertEquals(o.read(), 'ReleaseBuilder: 6.0-RELEASE-p4 %s %s %s no no yes\n' % (RELEASEROOT, CVSROOT, CVSTAG))
o.close()
self.assertEquals(result, 0)
def test_build(self):
d = self.builder.build(self.log)
d.addCallback(self._buildResult)
return d
def _buildSuccess(self, result):
self.fail("This call should not have succeeded")
def _buildError(self, failure):
failure.trap(builder.ReleaseBuildError)
def test_buildFailure(self):
# Reach into our builder and force an implosion
self.builder.makeTarget = ('error',)
d = self.builder.build(self.log)
d.addCallbacks(self._buildSuccess, self._buildError)
return d
def _buildCVSSuccess(self, result):
self.fail("This call should not have succeeded")
def _buildCVSError(self, failure):
failure.trap(builder.ReleaseBuildError)
def test_cvsFailure(self):
# Reach into our builder and force a CVS implosion
self.builder.cvsroot = 'nonexistent'
d = self.builder.build(self.log)
d.addCallbacks(self._buildCVSSuccess, self._buildCVSError)
return d
class ISOReaderTestCase(unittest.TestCase):
def setUp(self):
self.reader = builder.ISOReader(ISO_MOUNTPOINT, RELEASEROOT)
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
if (os.path.exists(PROCESS_OUT)):
os.unlink(PROCESS_OUT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
def _copyResult(self, result):
distdir = os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH, '6.2-RELEASE')
bootdir = os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH, 'boot')
self.assert_(os.path.exists(os.path.join(distdir, 'base', 'base.aa')))
self.assert_(os.path.exists(os.path.join(distdir, 'base', 'base.ab')))
self.assert_(os.path.exists(os.path.join(distdir, 'base', 'base.ac')))
self.assert_(os.path.exists(os.path.join(distdir, 'src', 'swtf.aa')))
self.assert_(os.path.exists(os.path.join(distdir, 'src', 'swtf.ab')))
self.assert_(os.path.exists(os.path.join(distdir, 'src', 'szomg.aa')))
self.assert_(os.path.exists(os.path.join(distdir, 'src', 'szomg.ab')))
self.assert_(os.path.exists(os.path.join(bootdir, 'mfsroot.gz')))
self.assert_(os.path.exists(os.path.join(bootdir, 'kernel', 'kernel')))
self.assert_(not os.path.exists(os.path.join(RELEASEROOT, 'afile')))
def test_copy(self):
# Try copying the dists from the CD into the release root
d = self.reader.copy(self.log)
d.addCallback(self._copyResult)
return d
def test_copyReplace(self):
# Now try copying when there is already something in RELEASEROOT
os.mkdir(RELEASEROOT)
f = open(os.path.join(RELEASEROOT, 'afile'), 'w')
f.close()
d = self.reader.copy(self.log)
d.addCallback(self._copyResult)
return d
class PackageChrootAssemblerTestCase(unittest.TestCase):
def setUp(self):
self.assembler = builder.PackageChrootAssembler(RELEASEROOT, PKGROOT)
self.log = open(PROCESS_LOG, 'w+')
self.dists = {'base' : ['base'], 'src' : ['szomg', 'swtf']}
# Copy in a release to RELEASEROOT
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
utils.copyRecursive(ISO_MOUNTPOINT, os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH))
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
if (os.path.exists(PROCESS_OUT)):
os.unlink(PROCESS_OUT)
if (os.path.exists(PKGROOT)):
shutil.rmtree(PKGROOT)
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def _extractResult(self, result):
self.assert_(os.path.exists(os.path.join(PKGROOT, 'usr', 'src', 'wtf.c')))
self.assert_(os.path.exists(os.path.join(PKGROOT, 'usr', 'src', 'zomg.c')))
self.assert_(os.path.exists(os.path.join(PKGROOT, 'usr', 'bin', 'foo.sh')))
self.assert_(os.path.exists(os.path.join(PKGROOT, 'usr', 'bin', 'bar.sh')))
self.assert_(os.path.exists(os.path.join(PKGROOT, builder.RESOLV_CONF)))
self.assert_(not os.path.exists(os.path.join(PKGROOT, 'afile')))
def test_extract(self):
# Try actually extracting a fake base dist into the chroot.
d = self.assembler.extract(self.dists, self.log)
d.addCallback(self._extractResult)
return d
def test_extractReplace(self):
# Try extracting when the chroot already exists
os.mkdir(PKGROOT)
f = open(os.path.join(PKGROOT, 'afile'), 'w')
f.close()
d = self.assembler.extract(self.dists, self.log)
d.addCallback(self._extractResult)
return d
class PackageBuilderTestCase(unittest.TestCase):
def setUp(self):
buildOptions = {
'TEST1' : '1',
'TEST2' : '2'
}
self.builder = builder.PackageBuilder(PKGROOT, BUILDROOT, buildOptions)
self.log = open(PROCESS_LOG, 'w+')
def tearDown(self):
self.log.close()
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
if (os.path.exists(PROCESS_OUT)):
os.unlink(PROCESS_OUT)
def _buildResult(self, result):
o = open(PROCESS_OUT, 'r')
self.assertEquals(o.read(), 'PackageBuilder: 1 2\n')
o.close()
self.assertEquals(result, 0)
def test_build(self):
d = self.builder.build(self.log)
d.addCallback(self._buildResult)
return d
def _buildSuccess(self, result):
self.fail("This call should not have succeeded")
def _buildError(self, failure):
failure.trap(builder.PackageBuildError)
def test_buildFailure(self):
# Reach into our builder and force an implosion
self.builder.makeTarget = ('error',)
d = self.builder.build(self.log)
d.addCallbacks(self._buildSuccess, self._buildError)
return d
class InstallAssemblerTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.destdir = os.path.join(TFTPROOT, 'testinstall')
self.mfsroot = os.path.join(self.destdir, 'mfsroot')
self.installCfg = os.path.join(self.destdir, 'mnt', 'install.cfg')
self.bootConf = os.path.join(self.destdir, 'boot.conf')
self.builder = builder.InstallAssembler('testinstall', 'Test Install', RELEASEROOT, INSTALL_CFG)
os.mkdir(TFTPROOT)
os.mkdir(self.destdir)
# Create dummy "release" containing the kernel and mfsroot needed
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
utils.copyRecursive(ISO_MOUNTPOINT, os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH))
def tearDown(self):
self.log.close()
# Clean up process log
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
# Clean up builder output
if (os.path.exists(TFTPROOT)):
shutil.rmtree(TFTPROOT)
if (os.path.exists(self.destdir)):
shutil.rmtree(self.destdir)
# Remove dummy release
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def _buildResult(self, result):
# Make sure the gunzip worked
o = open(self.mfsroot, 'r')
self.assertEquals(o.read(), 'Uncompress worked.\n')
o.close()
# Check to see if the install.cfg got copied to the mountPoint
self.assert_(os.path.exists(self.installCfg))
# Check to see if the kernel module was copied
kmod = os.path.join(self.destdir, 'kernel', 'righthook.ko')
self.assert_(os.path.exists(kmod))
# Check to see if boot.conf was created
self.assert_(os.path.exists(self.bootConf))
def test_build(self):
d = self.builder.build(self.destdir, self.log)
d.addCallback(self._buildResult)
return d
def _buildSuccess(self, result):
self.fail("This call should not have succeeded")
def _buildError(self, failure):
failure.trap(builder.InstallAssembleError)
def test_buildFailure(self):
# Reach into our builder and force an implosion
self.builder.mfsCompressed = '/nonexistent'
d = self.builder.build(self.destdir, self.log)
d.addCallbacks(self._buildSuccess, self._buildError)
return d
class ReleaseAssemblerTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.destdir = os.path.join(INSTALLROOT, 'buildtest')
# Create dummy "release"
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
utils.copyRecursive(ISO_MOUNTPOINT, os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH))
def tearDown(self):
self.log.close()
# Clean up process log
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
# Clean up the install root
if (os.path.exists(INSTALLROOT)):
shutil.rmtree(INSTALLROOT)
# Remove dummy release
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def _cbBuild(self, result):
# Verify that the release data was copied over
self.assert_(os.path.exists(os.path.join(self.destdir, 'base', 'base.aa')))
# Verify that the package installation script was copied and is
# executable
self.assert_(os.path.exists(os.path.join(self.destdir, os.path.basename(farb.INSTALL_PACKAGE_SH))))
self.assert_(os.access(os.path.join(self.destdir, os.path.basename(farb.INSTALL_PACKAGE_SH)), os.X_OK))
# Verify that the local directory was not created
self.assert_(not os.path.exists(os.path.join(self.destdir, 'local')))
def test_build(self):
rib = builder.ReleaseAssembler('6.2', RELEASEROOT, PKGROOT)
d = rib.build(self.destdir, self.log)
d.addCallback(self._cbBuild)
return d
def _cbBuildLocalData(self, result):
# Verify that the localdata file was copied
self.assert_(os.path.exists(os.path.join(self.destdir, 'local', os.path.basename(INSTALL_CFG))))
# Verify that the localdata directory was copied
self.assert_(os.path.exists(os.path.join(self.destdir, 'local', RELEASEROOT, builder.RELEASE_CD_PATH, '6.2-RELEASE', 'base', 'base.aa')))
def test_buildLocalData(self):
# Copy in a regular file and a directory
localData = [RELEASEROOT, INSTALL_CFG]
rib = builder.ReleaseAssembler('6.2', RELEASEROOT, PKGROOT, localData)
d = rib.build(self.destdir, self.log)
d.addCallback(self._cbBuildLocalData)
return d
class NetInstallAssemblerTestCase(unittest.TestCase):
def setUp(self):
self.log = open(PROCESS_LOG, 'w+')
self.installs = [builder.InstallAssembler('testinstall', 'Test Install', RELEASEROOT, INSTALL_CFG),]
self.releaseInstalls = [builder.ReleaseAssembler('6.2', RELEASEROOT, PKGROOT),]
self.irb = builder.NetInstallAssembler(INSTALLROOT, self.releaseInstalls, self.installs)
# Create dummy "release" to copy to netinstall directory
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
utils.copyRecursive(ISO_MOUNTPOINT, os.path.join(RELEASEROOT, builder.RELEASE_CD_PATH))
def tearDown(self):
self.log.close()
# Clean up process log
if (os.path.exists(PROCESS_LOG)):
os.unlink(PROCESS_LOG)
# Clean up builder output
if (os.path.exists(INSTALLROOT)):
shutil.rmtree(INSTALLROOT)
# Remove dummy release
if (os.path.exists(RELEASEROOT)):
shutil.rmtree(RELEASEROOT)
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def _cbBuild(self, result):
## Verify Per-Release Data
# Verify that the release data was copied over
self.assert_(os.path.exists(os.path.join(INSTALLROOT, '6.2', 'base', 'base.aa')))
tftproot = os.path.join(INSTALLROOT, 'tftproot')
## Verify Per-Install Data
# Check to see if the install kernel module was copied
kmod = os.path.join(tftproot, 'testinstall', 'kernel', 'righthook.ko')
self.failUnless(os.path.exists(kmod), msg='The pre-install kernel was not copied to the tftproot directory.')
# Check to see if boot.conf was created
self.failUnless(os.path.exists(os.path.join(tftproot, 'testinstall', 'boot.conf')), msg='The per-install boot.conf file was not created.')
## Verify shared boot loader data
# Check to see if the bootloader was copied over
self.failUnless(os.path.exists(os.path.join(tftproot, 'boot')), msg='The shared boot loader was not copied to the tftproot directory.')
# Check for netinstall.4th, loader.conf, and loader.rc
self.failUnless(os.path.exists(os.path.join(tftproot, 'boot', 'netinstall.4th')), msg='The netinstall.4th file was not generated in the tftproot directory.')
self.failUnless(os.path.exists(os.path.join(tftproot, 'boot', 'loader.conf')), msg='The FarBot loader.conf file was not copied to the tftproot directory.')
self.failUnless(os.path.exists(os.path.join(tftproot, 'boot', 'loader.rc')), msg='The FarBot loader.rc file was not copied to the tftproot directory.')
def test_build(self):
d = self.irb.build(self.log)
d.addCallback(self._cbBuild)
return d
class GetCDReleaseTestCase(unittest.TestCase):
def tearDown(self):
if (os.path.exists(CDROM_INF)):
os.unlink(CDROM_INF)
def test_getCDRelease(self):
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'CD_VERSION = 6.2-RELEASE'})
self.assertEquals(builder._getCDRelease(os.path.dirname(CDROM_INF)), '6.2-RELEASE')
def test_badCDInf(self):
# Make sure we have an exception when CD_VERSION line looks wrong
rewrite_config(CDROM_INF_IN, CDROM_INF, {'@CD_VERSION_LINE@' : 'hi I am a CD!'})
self.assertRaises(builder.CDReleaseError, builder._getCDRelease, os.path.dirname(CDROM_INF))
def test_missingCDInf(self):
# Also make sure we get an exception also when there is no cdrom.inf
self.assertRaises(builder.CDReleaseError, builder._getCDRelease, os.path.dirname(CDROM_INF))
syntax highlighted by Code2HTML, v. 0.9.1