diff --git a/tests/image-fuzzer/runner.py b/tests/image-fuzzer/runner.py new file mode 100755 index 0000000000..58079d331e --- /dev/null +++ b/tests/image-fuzzer/runner.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python + +# Tool for running fuzz tests +# +# Copyright (C) 2014 Maria Kustova +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import sys +import os +import signal +import subprocess +import random +import shutil +from itertools import count +import getopt +import StringIO +import resource + +try: + import json +except ImportError: + try: + import simplejson as json + except ImportError: + print >>sys.stderr, \ + "Warning: Module for JSON processing is not found.\n" \ + "'--config' and '--command' options are not supported." + +# Backing file sizes in MB +MAX_BACKING_FILE_SIZE = 10 +MIN_BACKING_FILE_SIZE = 1 + + +def multilog(msg, *output): + """ Write an object to all of specified file descriptors.""" + for fd in output: + fd.write(msg) + fd.flush() + + +def str_signal(sig): + """ Convert a numeric value of a system signal to the string one + defined by the current operational system. + """ + for k, v in signal.__dict__.items(): + if v == sig: + return k + + +def run_app(fd, q_args): + """Start an application with specified arguments and return its exit code + or kill signal depending on the result of execution. + """ + devnull = open('/dev/null', 'r+') + process = subprocess.Popen(q_args, stdin=devnull, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = process.communicate() + fd.write(out) + fd.write(err) + return process.returncode + + +class TestException(Exception): + """Exception for errors risen by TestEnv objects.""" + pass + + +class TestEnv(object): + + """Test object. + + The class sets up test environment, generates backing and test images + and executes application under tests with specified arguments and a test + image provided. + + All logs are collected. + + The summary log will contain short descriptions and statuses of tests in + a run. + + The test log will include application (e.g. 'qemu-img') logs besides info + sent to the summary log. + """ + + def __init__(self, test_id, seed, work_dir, run_log, + cleanup=True, log_all=False): + """Set test environment in a specified work directory. + + Path to qemu-img and qemu-io will be retrieved from 'QEMU_IMG' and + 'QEMU_IO' environment variables. + """ + if seed is not None: + self.seed = seed + else: + self.seed = str(random.randint(0, sys.maxint)) + random.seed(self.seed) + + self.init_path = os.getcwd() + self.work_dir = work_dir + self.current_dir = os.path.join(work_dir, 'test-' + test_id) + self.qemu_img = os.environ.get('QEMU_IMG', 'qemu-img')\ + .strip().split(' ') + self.qemu_io = os.environ.get('QEMU_IO', 'qemu-io').strip().split(' ') + self.commands = [['qemu-img', 'check', '-f', 'qcow2', '$test_img'], + ['qemu-img', 'info', '-f', 'qcow2', '$test_img'], + ['qemu-io', '$test_img', '-c', 'read $off $len'], + ['qemu-io', '$test_img', '-c', 'write $off $len'], + ['qemu-io', '$test_img', '-c', + 'aio_read $off $len'], + ['qemu-io', '$test_img', '-c', + 'aio_write $off $len'], + ['qemu-io', '$test_img', '-c', 'flush'], + ['qemu-io', '$test_img', '-c', + 'discard $off $len'], + ['qemu-io', '$test_img', '-c', + 'truncate $off']] + for fmt in ['raw', 'vmdk', 'vdi', 'cow', 'qcow2', 'file', + 'qed', 'vpc']: + self.commands.append( + ['qemu-img', 'convert', '-f', 'qcow2', '-O', fmt, + '$test_img', 'converted_image.' + fmt]) + + try: + os.makedirs(self.current_dir) + except OSError, e: + print >>sys.stderr, \ + "Error: The working directory '%s' cannot be used. Reason: %s"\ + % (self.work_dir, e[1]) + raise TestException + self.log = open(os.path.join(self.current_dir, "test.log"), "w") + self.parent_log = open(run_log, "a") + self.failed = False + self.cleanup = cleanup + self.log_all = log_all + + def _create_backing_file(self): + """Create a backing file in the current directory. + + Return a tuple of a backing file name and format. + + Format of a backing file is randomly chosen from all formats supported + by 'qemu-img create'. + """ + # All formats supported by the 'qemu-img create' command. + backing_file_fmt = random.choice(['raw', 'vmdk', 'vdi', 'cow', 'qcow2', + 'file', 'qed', 'vpc']) + backing_file_name = 'backing_img.' + backing_file_fmt + backing_file_size = random.randint(MIN_BACKING_FILE_SIZE, + MAX_BACKING_FILE_SIZE) * (1 << 20) + cmd = self.qemu_img + ['create', '-f', backing_file_fmt, + backing_file_name, str(backing_file_size)] + temp_log = StringIO.StringIO() + retcode = run_app(temp_log, cmd) + if retcode == 0: + temp_log.close() + return (backing_file_name, backing_file_fmt) + else: + multilog("Warning: The %s backing file was not created.\n\n" + % backing_file_fmt, sys.stderr, self.log, self.parent_log) + self.log.write("Log for the failure:\n" + temp_log.getvalue() + + '\n\n') + temp_log.close() + return (None, None) + + def execute(self, input_commands=None, fuzz_config=None): + """ Execute a test. + + The method creates backing and test images, runs test app and analyzes + its exit status. If the application was killed by a signal, the test + is marked as failed. + """ + if input_commands is None: + commands = self.commands + else: + commands = input_commands + + os.chdir(self.current_dir) + backing_file_name, backing_file_fmt = self._create_backing_file() + img_size = image_generator.create_image('test.img', + backing_file_name, + backing_file_fmt, + fuzz_config) + for item in commands: + shutil.copy('test.img', 'copy.img') + # 'off' and 'len' are multiple of the sector size + sector_size = 512 + start = random.randrange(0, img_size + 1, sector_size) + end = random.randrange(start, img_size + 1, sector_size) + + if item[0] == 'qemu-img': + current_cmd = list(self.qemu_img) + elif item[0] == 'qemu-io': + current_cmd = list(self.qemu_io) + else: + multilog("Warning: test command '%s' is not defined.\n" \ + % item[0], sys.stderr, self.log, self.parent_log) + continue + # Replace all placeholders with their real values + for v in item[1:]: + c = (v + .replace('$test_img', 'copy.img') + .replace('$off', str(start)) + .replace('$len', str(end - start))) + current_cmd.append(c) + + # Log string with the test header + test_summary = "Seed: %s\nCommand: %s\nTest directory: %s\n" \ + "Backing file: %s\n" \ + % (self.seed, " ".join(current_cmd), + self.current_dir, backing_file_name) + + temp_log = StringIO.StringIO() + try: + retcode = run_app(temp_log, current_cmd) + except OSError, e: + multilog(test_summary + "Error: Start of '%s' failed. " \ + "Reason: %s\n\n" % (os.path.basename( + current_cmd[0]), e[1]), + sys.stderr, self.log, self.parent_log) + raise TestException + + if retcode < 0: + self.log.write(temp_log.getvalue()) + multilog(test_summary + "FAIL: Test terminated by signal " + + "%s\n\n" % str_signal(-retcode), sys.stderr, self.log, + self.parent_log) + self.failed = True + else: + if self.log_all: + self.log.write(temp_log.getvalue()) + multilog(test_summary + "PASS: Application exited with" + + " the code '%d'\n\n" % retcode, sys.stdout, + self.log, self.parent_log) + temp_log.close() + os.remove('copy.img') + + def finish(self): + """Restore the test environment after a test execution.""" + self.log.close() + self.parent_log.close() + os.chdir(self.init_path) + if self.cleanup and not self.failed: + shutil.rmtree(self.current_dir) + +if __name__ == '__main__': + + def usage(): + print """ + Usage: runner.py [OPTION...] TEST_DIR IMG_GENERATOR + + Set up test environment in TEST_DIR and run a test in it. A module for + test image generation should be specified via IMG_GENERATOR. + Example: + runner.py -c '[["qemu-img", "info", "$test_img"]]' /tmp/test qcow2 + + Optional arguments: + -h, --help display this help and exit + -c, --command=JSON run tests for all commands specified in + the JSON array + -s, --seed=STRING seed for a test image generation, + by default will be generated randomly + --config=JSON take fuzzer configuration from the JSON + array + -k, --keep_passed don't remove folders of passed tests + -v, --verbose log information about passed tests + + JSON: + + '--command' accepts a JSON array of commands. Each command presents + an application under test with all its paramaters as a list of strings, + e.g. + ["qemu-io", "$test_img", "-c", "write $off $len"] + + Supported application aliases: 'qemu-img' and 'qemu-io'. + Supported argument aliases: $test_img for the fuzzed image, $off + for an offset, $len for length. + + Values for $off and $len will be generated based on the virtual disk + size of the fuzzed image + Paths to 'qemu-img' and 'qemu-io' are retrevied from 'QEMU_IMG' and + 'QEMU_IO' environment variables + + '--config' accepts a JSON array of fields to be fuzzed, e.g. + '[["header"], ["header", "version"]]' + Each of the list elements can consist of a complex image element only + as ["header"] or ["feature_name_table"] or an exact field as + ["header", "version"]. In the first case random portion of the element + fields will be fuzzed, in the second one the specified field will be + fuzzed always. + + If '--config' argument is specified, fields not listed in + the configuration array will not be fuzzed. + """ + + def run_test(test_id, seed, work_dir, run_log, cleanup, log_all, + command, fuzz_config): + """Setup environment for one test and execute this test.""" + try: + test = TestEnv(test_id, seed, work_dir, run_log, cleanup, + log_all) + except TestException: + sys.exit(1) + + # Python 2.4 doesn't support 'finally' and 'except' in the same 'try' + # block + try: + try: + test.execute(command, fuzz_config) + except TestException: + sys.exit(1) + finally: + test.finish() + + try: + opts, args = getopt.gnu_getopt(sys.argv[1:], 'c:hs:kv', + ['command=', 'help', 'seed=', 'config=', + 'keep_passed', 'verbose']) + except getopt.error, e: + print >>sys.stderr, \ + "Error: %s\n\nTry 'runner.py --help' for more information" % e + sys.exit(1) + + command = None + cleanup = True + log_all = False + seed = None + config = None + for opt, arg in opts: + if opt in ('-h', '--help'): + usage() + sys.exit() + elif opt in ('-c', '--command'): + try: + command = json.loads(arg) + except (TypeError, ValueError, NameError), e: + print >>sys.stderr, \ + "Error: JSON array of test commands cannot be loaded.\n" \ + "Reason: %s" % e + sys.exit(1) + elif opt in ('-k', '--keep_passed'): + cleanup = False + elif opt in ('-v', '--verbose'): + log_all = True + elif opt in ('-s', '--seed'): + seed = arg + elif opt == '--config': + try: + config = json.loads(arg) + except (TypeError, ValueError, NameError), e: + print >>sys.stderr, \ + "Error: JSON array with the fuzzer configuration cannot" \ + " be loaded\nReason: %s" % e + sys.exit(1) + + if not len(args) == 2: + print >>sys.stderr, \ + "Expected two parameters\nTry 'runner.py --help'" \ + " for more information." + sys.exit(1) + + work_dir = os.path.realpath(args[0]) + # run_log is created in 'main', because multiple tests are expected to + # log in it + run_log = os.path.join(work_dir, 'run.log') + + # Add the path to the image generator module to sys.path + sys.path.append(os.path.realpath(os.path.dirname(args[1]))) + # Remove a script extension from image generator module if any + generator_name = os.path.splitext(os.path.basename(args[1]))[0] + + try: + image_generator = __import__(generator_name) + except ImportError, e: + print >>sys.stderr, \ + "Error: The image generator '%s' cannot be imported.\n" \ + "Reason: %s" % (generator_name, e) + sys.exit(1) + + # Enable core dumps + resource.setrlimit(resource.RLIMIT_CORE, (-1, -1)) + # If a seed is specified, only one test will be executed. + # Otherwise runner will terminate after a keyboard interruption + for test_id in count(1): + try: + run_test(str(test_id), seed, work_dir, run_log, cleanup, + log_all, command, config) + except (KeyboardInterrupt, SystemExit): + sys.exit(1) + + if seed is not None: + break