# vim:fileencoding=utf-8 # (c) 2011-2012 Michał Górny # Released under the terms of the 2-clause BSD license. import copy, itertools, random, re from gentoopm.util import ABCObject, BoolCompat import dbus.service from abc import ABCMeta, abstractmethod, abstractproperty from pmstestsuite.dbus_handler import dbus_interface_name, dbus_object_prefix # XXX: move to some consts module? phase_func_names = [ 'pkg_pretend', 'pkg_setup', 'src_unpack', 'src_prepare', 'src_configure', 'src_compile', 'src_install', 'pkg_preinst', 'pkg_postinst' ] """ Names of all phase functions supported in EAPIs. """ known_eapis = frozenset((0, 1, 2, 3, 4)) """ All known EAPIs. """ ebuild_header = '''# Copyright 1999-2011 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 # $Header: $ EAPI=%d inherit %s ''' """ A common ebuild header. """ pn_re = re.compile('([^A-Z])([A-Z])') def cleanup_test_case_name(classname): """ Create the ebuild PN from classname. >>> cleanup_test_case_name('Testzor') 'testzor' >>> cleanup_test_case_name('TestzorTest') 'testzor' >>> cleanup_test_case_name('TestZorTest') 'test-zor' >>> cleanup_test_case_name('veryRandomCamelCaseTest') 'very-random-camel-case' >>> cleanup_test_case_name('RDependTest') 'rdepend' @param classname: the class name to clean up @type classname: string """ if classname.endswith('Test'): classname = classname[:-4] return pn_re.sub('\\1-\\2', classname).lower() class AssertionResult(ABCObject, BoolCompat): """ Base class for assertion results. """ def __init__(self, name): self._name = name self._undefined = False @property def name(self): """ The assertion name. @type: str """ return self._name @property def prefix(self): """ The assertion name prefix. @type: str """ return None @property def unprefixed_name(self): """ The assertion name without prefix. @type: str """ return self.name @property def undefined(self): """ Whether the assertion has undefined result. @type: bool """ return self._undefined @abstractproperty def expected(self): """ Expected value. @type: any """ pass @abstractproperty def actual(self): """ Actual value. @type: any """ pass @abstractmethod def __bool__(self): """ Check whether the assertion succeeds. @return: whether the assertion matches @rtype: bool """ pass def __str__(self): """ Return the stringified assertion description. @return: stringified assertion @rtype: str """ return '%s == %s' % (self.actual, self.expected) class BoolAssertionResult(AssertionResult): """ Assertion for a boolean match. """ def __init__(self, name, expect, cond): AssertionResult.__init__(self, name) self._expect = bool(expect) self._cond = bool(cond) if cond is not None else None @property def expected(self): return self._expect @property def actual(self): return self._cond if self._cond is not None else '?' def __bool__(self): if self._cond is None: return False return self._expect == self._cond class ContainsAssertionResult(AssertionResult): """ Assertion checking whether a value is on the list. """ def __init__(self, name, needle, container): AssertionResult.__init__(self, name) self._cont = container self._need = needle @property def expected(self): return 'contains %s' % repr(self._need) @property def actual(self): return repr(self._cont) if self._cont is not None else '?' def __str__(self): return '%s %s' % (self.actual, self.expected) def __bool__(self): if self._cont is None: return False return self._need in self._cont class EqualAssertionResult(AssertionResult): """ Assertion checking universal equality. """ def __init__(self, name, expect, value): AssertionResult.__init__(self, name) self._expect = expect self._value = value @property def expected(self): return repr(self._expect) @property def actual(self): return repr(self._value) if self._value is not None else '?' def __bool__(self): return self._expect == self._value class NotEqualAssertionResult(EqualAssertionResult): """ Assertion checking universal non-equality. """ def __bool__(self): if self._value is None: return False return self._expect != self._value @property def expected(self): return 'not %s' % repr(self._expect) def __str__(self): return '%s != %s' % (self.actual, repr(self._expect)) class TestCase(dbus.service.Object): # was: ABCObject """ Base class for a test case. @ivar _finalized: has the case initialization been finished yet? Set by L{_finalize()}. @type _finalized: bool """ _finalized = False def __init__(self, short_name, dbus_hdlr): """ Initialize the test class and the D-Bus interface for it. """ self.assertions = [] self._short_name = short_name dbus.service.Object.__init__( self, dbus_hdlr.bus, '%s/%s' % (dbus_object_prefix, self.p.replace('-', '_')) ) @property def short_name(self): return self._short_name @property def _stripped_docstring(self): descdoc = ' '.join(self.__doc__.split()) return descdoc.rstrip('.') def __str__(self): """ Return freetext test description. """ return '%s (%s)' % (self.short_name, self._stripped_docstring) def _finalize(self): """ Do any final modifications to test case data. Mark it finalized. Ensure that C{pkg_setup()} will be called. This function shall be called at most once per object. """ self._finalized = True @abstractmethod def get_output_files(self): """ Get a dict of files to output in the repository for the test case. @return: a dict where keys are file paths (relative to the repository root), and values evaluate to file contents @rtype: dict (string -> stringifiable) """ pass @abstractmethod def clean(self, pm): """ Schedule cleaning the test. @param pm: the package manager instance @type pm: L{PackageManager} """ pass @abstractmethod def start(self, pm): """ Schedule starting the test. @param pm: the package manager instance @type pm: L{PackageManager} """ pass def _append_assert(self, a, undefined = False): all_eapis = itertools.chain.from_iterable(self.supported_eapis) if undefined or self.eapi not in all_eapis: a._undefined = True self.assertions.append(a) if not a.undefined and not a: raise AssertionError(str(a)) def assertTrue(self, cond, msg, undefined = False): """ Assert that the condition evaluates to True. @param cond: the condition @type cond: bool @param msg: assertion description @type msg: string @param undefined: whether the result is undefined @type undefined: bool """ self.assertBool(True, cond, msg, undefined) def assertFalse(self, cond, msg, undefined = False): """ Assert that the condition evaluates to False. @param cond: the condition @type cond: bool @param msg: assertion description @type msg: string @param undefined: whether the result is undefined @type undefined: bool """ self.assertBool(False, cond, msg, undefined) def assertBool(self, expect, cond, msg, undefined = False): """ Assert that the condition evaluates to expected boolean result. @param expect: expected result @type expect: bool @param cond: the condition @type cond: bool @param msg: assertion description @type msg: string @param undefined: whether the result is undefined @type undefined: bool """ self._append_assert(BoolAssertionResult(msg, expect, cond), undefined = undefined) def assertContains(self, needle, container, msg = None, undefined = False): """ Assert the following condition: C{needle in container}. @param needle: the needle to look for @type needle: any @param container: the container to look for needle in @type container: iterable @param msg: assertion description or C{None} for default one @type msg: string/C{None} @param undefined: whether the result is undefined @type undefined: bool """ if msg is None: msg = '%s in %s' % (repr(needle), repr(container)) self._append_assert(ContainsAssertionResult(msg, needle, container), undefined = undefined) def assertEqual(self, value, expect, msg, undefined = False): """ Assert that the value is equal to expected one. @param value: the actual value @type value: any @param expect: the expected value @type expect: any @param msg: assertion description @type msg: string @param undefined: whether the result is undefined @type undefined: bool """ self._append_assert(EqualAssertionResult(msg, expect, value), undefined = undefined) def assertNotEqual(self, value, unallowed, msg, undefined = False): """ Assert that the value is other than an unallowed one. @param value: the actual value @type value: any @param unallowed: the unallowed value @type unallowed: any @param msg: assertion description @type msg: string @param undefined: whether the result is undefined @type undefined: bool """ self._append_assert(NotEqualAssertionResult(msg, unallowed, value), undefined = undefined) @abstractmethod def check_result(self, pm): """ Check the correctness of the result of test execution. @param pm: the package manager instance @type pm: L{PackageManager} @return: True if the test succeeded, False otherwise @rtype: bool """ pass def pop_assertions(self): """ Get a copy of the assertion list and clear its container afterwards. This way, the test case can be reused with other PM. @return: assertion list @rtype: list(L{AssertionResult}) """ ret = self.assertions self.assertions = [] return ret class EbuildTestCase(TestCase): """ Test case using a single ebuild (per EAPI). The __init__() function is going to clone (deepcopy()) all the instance variables to allow modifying them easily. @ivar ebuild_vars: additional global variables @type ebuild_vars: dict (string -> string) @ivar expect_failure: if set to False (the default), the test is supposed to merge successfully. Otherwise, the test ebuild is supposed to fail to merge (die) @type expect_failure: bool @ivar expect_started: if set to True (the default), the test is supposed to be started (ack via D-Bus). Otherwise, the test ebuild is supposed to not ever start @type expect_started: bool @ivar inherits: additional eclasses to inherit @type inherits: list (string) @ivar phase_funcs: phase function contents @type phase_funcs: dict (string -> list(string)) """ ebuild_vars = {} expect_failure = False expect_started = True inherits = [] phase_funcs = {} @classmethod def _eval_prop(cls, prop_or_obj): """ Evaluate and return the value of a property when passed a property object, or return the passed value otherwise. @param prop_or_obj: the object to process @type prop_or_obj: property/object @return: value of the property @rtype: object """ if isinstance(prop_or_obj, property): return prop_or_obj.fget(cls) else: return prop_or_obj @property def supported_eapis(cls): """ A list of EAPI groups for which the test can be run and gives predictible results. The EAPIs are supposed to be grouped by consistent behavior. Defaults to a single group with all available EAPIs. For example, a value of C{((1, 2), (3, 4))} would mean the test is available since EAPI 1, and it gives same results for EAPI 1 & 2, and for EAPI 3 & 4. @type: iterable(iterable(string)) """ return (tuple(known_eapis),) @classmethod def inst_all(cls, short_name, dbus_hdlr, thorough = False, undefined = False): """ Instantiate the test case, choosing a single EAPI from each EAPI group listed in L{supported_eapis}. If thorough mode is enabled, all EAPIs from each group will be used instead. @param thorough: whether to use the thorough mode @type thorough: bool @param undefined: whether to run tests on undefined-behavior EAPIs @type undefined: bool @return: an iterable over test case instances @rtype: generator(L{EbuildTestCase}) """ supported_eapis = cls._eval_prop(cls.supported_eapis) if undefined: all_supp_eapis = set(itertools.chain.from_iterable(supported_eapis)) remaining = known_eapis - all_supp_eapis if remaining: supported_eapis = tuple(supported_eapis) + (tuple(remaining),) if thorough: eapis = itertools.chain.from_iterable(supported_eapis) else: eapis = [random.choice(x) for x in supported_eapis] for eapi in eapis: yield cls(eapi = eapi, short_name = short_name, dbus_hdlr = dbus_hdlr) @property def pn(self): return cleanup_test_case_name(self.__class__.__name__) @property def pv(self): return self.eapi @property def p(self): return '%s-%s' % (self.pn, self.pv) @property def cpv(self): """ Return CPV for the test. """ return 'pms-test/%s' % self.p def atom(self, pm): """ Return an exact-match atom for the test. """ return pm.Atom('=%s' % self.cpv) def _finalize(self): TestCase._finalize(self) if self.phase_funcs['pkg_setup']: self.phase_funcs['pkg_setup'].insert(0, 'pms-test_pkg_setup') if 'DESCRIPTION' not in self.ebuild_vars: self.ebuild_vars['DESCRIPTION'] = self._stripped_docstring def __str__(self): """ Return freetext test description. """ return '%s:%s (%s)' % (self.short_name, self.eapi, self._stripped_docstring) def __init__(self, eapi, *args, **kwargs): """ Instiantate the test case for a particular EAPI. @param eapi: the EAPI @type eapi: string """ self.eapi = eapi self.reset() TestCase.__init__(self, *args, **kwargs) for v in ('ebuild_vars', 'inherits', 'phase_funcs'): setattr(self, v, copy.deepcopy(getattr(self, v))) for pf in phase_func_names: if pf not in self.phase_funcs: self.phase_funcs[pf] = [] # add KEYWORDS to the ebuild self.ebuild_vars['KEYWORDS'] = 'alpha amd64 arm hppa ia64 ' + \ 'm68k ~mips ppc ppc64 s390 sh sparc x86' def reset(self): """ Reset (D-Bus) test results. """ self.dbus_output = [] self.dbus_started = False @dbus.service.method( dbus_interface=dbus_interface_name, in_signature='', out_signature='') def test_started(self): """ Notify the test suite that a particular test has been started. """ self.dbus_started = True @dbus.service.method( dbus_interface=dbus_interface_name, in_signature='s', out_signature='') def append_output(self, l): """ Append the string to the test output. @param l: result string @type l: C{dbus.UTF8String} """ self.dbus_output.append(str(l)) def get_output_files(self): class EbuildTestCaseEbuildFile(object): """ Lazy ebuild contents evaluator for EbuildTestCase. """ def __init__(self, parent): """ Instantiate the evaluator for test case. @param parent: relevant test case @type parent: L{EbuildTestCase} """ assert(isinstance(parent, EbuildTestCase)) self._parent = parent def __str__(self): """ Return the ebuild contents as string. @return: ebuild contents @rtype: str """ contents = [ebuild_header % (self._parent.eapi, ' '.join(['pms-test'] + self._parent.inherits))] for k, v in self._parent.ebuild_vars.items(): contents.append('%s="%s"\n' % (k, v)) # XXX: escaping for f, lines in self._parent.phase_funcs.items(): if not lines: continue contents.append('\n%s() {\n' % f) for l in lines: contents.append('\t%s\n' % l) # XXX: smarter tabs contents.append('}\n') return ''.join(contents) if not self._finalized: self._finalize() fn = 'pms-test/%s/%s.ebuild' % (self.pn, self.p) return {fn: EbuildTestCaseEbuildFile(self)} def clean(self, pm): if self.atom(pm) in pm.installed: pm.unmerge(self.cpv) def start(self, pm): pm.merge(self.cpv) def check_dbus_result(self, output, pm): """ Check whether the output sent through D-Bus matches expected test output. The default implementation simply checks whether the test was merged alike L{EbuildTestCase.check_result()}. @param output: the D-Bus output @type output: list(str) @param pm: the package manager instance @type pm: L{PackageManager} @return: C{True} if output matches expected test result, C{False} otherwise @rtype: bool """ pass def _pop_dbus_output(self): ret = self.dbus_output self.reset() return ret def check_result(self, pm): """ Check the correctness of the result of test execution. By default, checks whether the ebuild was actually merged. @param pm: the package manager instance @type pm: L{PackageManager} """ exc = None try: merged = self.atom(pm) in pm.installed self.assertBool(not self.expect_failure, merged, 'package merged') except AssertionError as e: exc = e try: self.assertBool(self.expect_started, self.dbus_started, 'build started') except AssertionError as e: exc = e self.check_dbus_result(self._pop_dbus_output(), pm) if exc is not None: raise exc