""" virt execution module unit tests """ # pylint: disable=3rd-party-module-not-gated import datetime import os import shutil import tempfile import salt.config import salt.modules.config as config import salt.modules.virt as virt import salt.syspaths import salt.utils.yaml from salt._compat import ElementTree as ET from salt.exceptions import CommandExecutionError, SaltInvocationError # pylint: disable=import-error from salt.ext.six.moves import range # pylint: disable=redefined-builtin from tests.support.helpers import dedent from tests.support.mixins import LoaderModuleMockMixin from tests.support.mock import MagicMock, patch from tests.support.unit import TestCase # pylint: disable=invalid-name,protected-access,attribute-defined-outside-init,too-many-public-methods,unused-argument class LibvirtMock(MagicMock): # pylint: disable=too-many-ancestors """ Libvirt library mock """ class virDomain(MagicMock): """ virDomain mock """ class libvirtError(Exception): """ libvirtError mock """ def __init__(self, msg): super().__init__(msg) self.msg = msg def get_error_message(self): return self.msg class VirtTestCase(TestCase, LoaderModuleMockMixin): """ Test cases for salt.module.virt """ def setup_loader_modules(self): self.mock_libvirt = LibvirtMock() self.mock_conn = MagicMock() self.mock_conn.getStoragePoolCapabilities.return_value = ( "" ) self.mock_libvirt.openAuth.return_value = self.mock_conn self.mock_popen = MagicMock() self.addCleanup(delattr, self, "mock_libvirt") self.addCleanup(delattr, self, "mock_conn") self.addCleanup(delattr, self, "mock_popen") self.mock_subprocess = MagicMock() self.mock_subprocess.return_value = ( self.mock_subprocess ) # pylint: disable=no-member self.mock_subprocess.Popen.return_value = ( self.mock_popen ) # pylint: disable=no-member loader_globals = { "__salt__": {"config.get": config.get, "config.option": config.option}, "libvirt": self.mock_libvirt, "subprocess": self.mock_subprocess, } return {virt: loader_globals, config: loader_globals} def set_mock_vm(self, name, xml): """ Define VM to use in tests """ self.mock_conn.listDefinedDomains.return_value = [ name ] # pylint: disable=no-member mock_domain = self.mock_libvirt.virDomain() self.mock_conn.lookupByName.return_value = ( mock_domain # pylint: disable=no-member ) mock_domain.XMLDesc.return_value = xml # pylint: disable=no-member # Return state as shutdown mock_domain.info.return_value = [ 4, 2048 * 1024, 1024 * 1024, 2, 1234, ] # pylint: disable=no-member mock_domain.ID.return_value = 1 mock_domain.name.return_value = name return mock_domain def test_disk_profile_merge(self): """ Test virt._disk_profile() when merging with user-defined disks """ root_dir = os.path.join(salt.syspaths.ROOT_DIR, "srv", "salt-images") userdisks = [ {"name": "system", "image": "/path/to/image"}, {"name": "data", "size": 16384, "format": "raw"}, ] disks = virt._disk_profile(self.mock_conn, "default", "kvm", userdisks, "myvm") self.assertEqual( [ { "name": "system", "device": "disk", "size": 8192, "format": "qcow2", "model": "virtio", "filename": "myvm_system.qcow2", "image": "/path/to/image", "source_file": "{}{}myvm_system.qcow2".format(root_dir, os.sep), }, { "name": "data", "device": "disk", "size": 16384, "format": "raw", "model": "virtio", "filename": "myvm_data.raw", "source_file": "{}{}myvm_data.raw".format(root_dir, os.sep), }, ], disks, ) def test_boot_default_dev(self): """ Test virt._gen_xml() default boot device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64" ) root = ET.fromstring(xml_data) self.assertEqual(root.find("os/boot").attrib["dev"], "hd") self.assertEqual(root.find("os/type").attrib["arch"], "x86_64") self.assertEqual(root.find("os/type").text, "hvm") def test_boot_custom_dev(self): """ Test virt._gen_xml() custom boot device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", boot_dev="cdrom", ) root = ET.fromstring(xml_data) self.assertEqual(root.find("os/boot").attrib["dev"], "cdrom") def test_boot_multiple_devs(self): """ Test virt._gen_xml() multiple boot devices """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", boot_dev="cdrom network", ) root = ET.fromstring(xml_data) devs = root.findall(".//boot") self.assertTrue(len(devs) == 2) def test_gen_xml_no_nic(self): """ Test virt._gen_xml() serial console """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="pty", console=True, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "pty") self.assertEqual(root.find("devices/console").attrib["type"], "pty") def test_gen_xml_for_serial_console(self): """ Test virt._gen_xml() serial console """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="pty", console=True, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "pty") self.assertEqual(root.find("devices/console").attrib["type"], "pty") def test_gen_xml_for_telnet_console(self): """ Test virt._gen_xml() telnet console """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="tcp", console=True, telnet_port=22223, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "tcp") self.assertEqual(root.find("devices/console").attrib["type"], "tcp") self.assertEqual(root.find("devices/console/source").attrib["service"], "22223") def test_gen_xml_for_telnet_console_unspecified_port(self): """ Test virt._gen_xml() telnet console without any specified port """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="tcp", console=True, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "tcp") self.assertEqual(root.find("devices/console").attrib["type"], "tcp") self.assertIsInstance( int(root.find("devices/console/source").attrib["service"]), int ) def test_gen_xml_for_serial_no_console(self): """ Test virt._gen_xml() with no serial console """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="pty", console=False, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "pty") self.assertEqual(root.find("devices/console"), None) def test_gen_xml_for_telnet_no_console(self): """ Test virt._gen_xml() with no telnet console """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", serial_type="tcp", console=False, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/serial").attrib["type"], "tcp") self.assertEqual(root.find("devices/console"), None) def test_gen_xml_nographics_default(self): """ Test virt._gen_xml() with default no graphics device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64" ) root = ET.fromstring(xml_data) self.assertIsNone(root.find("devices/graphics")) def test_gen_xml_noloader_default(self): """ Test virt._gen_xml() with default no loader """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64" ) root = ET.fromstring(xml_data) self.assertIsNone(root.find("os/loader")) def test_gen_xml_vnc_default(self): """ Test virt._gen_xml() with default vnc graphics device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", graphics={ "type": "vnc", "port": 1234, "tlsPort": 5678, "listen": {"type": "address", "address": "myhost"}, }, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/graphics").attrib["type"], "vnc") self.assertEqual(root.find("devices/graphics").attrib["autoport"], "no") self.assertEqual(root.find("devices/graphics").attrib["port"], "1234") self.assertFalse("tlsPort" in root.find("devices/graphics").attrib) self.assertEqual(root.find("devices/graphics").attrib["listen"], "myhost") self.assertEqual(root.find("devices/graphics/listen").attrib["type"], "address") self.assertEqual( root.find("devices/graphics/listen").attrib["address"], "myhost" ) def test_gen_xml_spice_default(self): """ Test virt._gen_xml() with default spice graphics device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", graphics={"type": "spice"}, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/graphics").attrib["type"], "spice") self.assertEqual(root.find("devices/graphics").attrib["autoport"], "yes") self.assertEqual(root.find("devices/graphics").attrib["listen"], "0.0.0.0") self.assertEqual(root.find("devices/graphics/listen").attrib["type"], "address") self.assertEqual( root.find("devices/graphics/listen").attrib["address"], "0.0.0.0" ) def test_gen_xml_spice(self): """ Test virt._gen_xml() with spice graphics device """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", graphics={ "type": "spice", "port": 1234, "tls_port": 5678, "listen": {"type": "none"}, }, ) root = ET.fromstring(xml_data) self.assertEqual(root.find("devices/graphics").attrib["type"], "spice") self.assertEqual(root.find("devices/graphics").attrib["autoport"], "no") self.assertEqual(root.find("devices/graphics").attrib["port"], "1234") self.assertEqual(root.find("devices/graphics").attrib["tlsPort"], "5678") self.assertFalse("listen" in root.find("devices/graphics").attrib) self.assertEqual(root.find("devices/graphics/listen").attrib["type"], "none") self.assertFalse("address" in root.find("devices/graphics/listen").attrib) def test_default_disk_profile_hypervisor_esxi(self): """ Test virt._disk_profile() default ESXi profile """ mock = MagicMock(return_value={}) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._disk_profile( self.mock_conn, "nonexistent", "vmware", None, "test-vm" ) self.assertTrue(len(ret) == 1) found = [disk for disk in ret if disk["name"] == "system"] self.assertTrue(bool(found)) system = found[0] self.assertEqual(system["format"], "vmdk") self.assertEqual(system["model"], "scsi") self.assertTrue(int(system["size"]) >= 1) def test_default_disk_profile_hypervisor_kvm(self): """ Test virt._disk_profile() default KVM profile """ mock = MagicMock(side_effect=[{}, "/images/dir"]) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._disk_profile( self.mock_conn, "nonexistent", "kvm", None, "test-vm" ) self.assertTrue(len(ret) == 1) found = [disk for disk in ret if disk["name"] == "system"] self.assertTrue(bool(found)) system = found[0] self.assertEqual(system["format"], "qcow2") self.assertEqual(system["model"], "virtio") self.assertTrue(int(system["size"]) >= 1) def test_default_disk_profile_hypervisor_xen(self): """ Test virt._disk_profile() default XEN profile """ mock = MagicMock(side_effect=[{}, "/images/dir"]) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._disk_profile( self.mock_conn, "nonexistent", "xen", None, "test-vm" ) self.assertTrue(len(ret) == 1) found = [disk for disk in ret if disk["name"] == "system"] self.assertTrue(bool(found)) system = found[0] self.assertEqual(system["format"], "qcow2") self.assertEqual(system["model"], "xen") self.assertTrue(int(system["size"]) >= 1) def test_default_nic_profile_hypervisor_esxi(self): """ Test virt._nic_profile() default ESXi profile """ mock = MagicMock(return_value={}) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._nic_profile("nonexistent", "vmware") self.assertTrue(len(ret) == 1) eth0 = ret[0] self.assertEqual(eth0["name"], "eth0") self.assertEqual(eth0["type"], "bridge") self.assertEqual(eth0["source"], "DEFAULT") self.assertEqual(eth0["model"], "e1000") def test_default_nic_profile_hypervisor_kvm(self): """ Test virt._nic_profile() default KVM profile """ mock = MagicMock(return_value={}) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._nic_profile("nonexistent", "kvm") self.assertTrue(len(ret) == 1) eth0 = ret[0] self.assertEqual(eth0["name"], "eth0") self.assertEqual(eth0["type"], "bridge") self.assertEqual(eth0["source"], "br0") self.assertEqual(eth0["model"], "virtio") def test_default_nic_profile_hypervisor_xen(self): """ Test virt._nic_profile() default XEN profile """ mock = MagicMock(return_value={}) with patch.dict( virt.__salt__, {"config.get": mock} # pylint: disable=no-member ): ret = virt._nic_profile("nonexistent", "xen") self.assertTrue(len(ret) == 1) eth0 = ret[0] self.assertEqual(eth0["name"], "eth0") self.assertEqual(eth0["type"], "bridge") self.assertEqual(eth0["source"], "br0") self.assertFalse(eth0["model"]) def test_gen_vol_xml_esx(self): """ Test virt._get_vol_xml() for the ESX case """ xml_data = virt._gen_vol_xml("vmname/system.vmdk", 8192, format="vmdk") root = ET.fromstring(xml_data) self.assertIsNone(root.get("type")) self.assertEqual(root.find("name").text, "vmname/system.vmdk") self.assertEqual(root.find("capacity").attrib["unit"], "KiB") self.assertEqual(root.find("capacity").text, str(8192 * 1024)) self.assertEqual(root.find("allocation").text, str(0)) self.assertEqual(root.find("target/format").get("type"), "vmdk") self.assertIsNone(root.find("target/permissions")) self.assertIsNone(root.find("target/nocow")) self.assertIsNone(root.find("backingStore")) def test_gen_vol_xml_file(self): """ Test virt._get_vol_xml() for a file volume """ xml_data = virt._gen_vol_xml( "myvm_system.qcow2", 8192, format="qcow2", allocation=4096, type="file", permissions={ "mode": "0775", "owner": "123", "group": "456", "label": "sec_label", }, backing_store={"path": "/backing/image", "format": "raw"}, nocow=True, ) root = ET.fromstring(xml_data) self.assertEqual(root.get("type"), "file") self.assertEqual(root.find("name").text, "myvm_system.qcow2") self.assertIsNone(root.find("key")) self.assertIsNone(root.find("target/path")) self.assertEqual(root.find("target/format").get("type"), "qcow2") self.assertEqual(root.find("capacity").attrib["unit"], "KiB") self.assertEqual(root.find("capacity").text, str(8192 * 1024)) self.assertEqual(root.find("capacity").attrib["unit"], "KiB") self.assertEqual(root.find("allocation").text, str(4096 * 1024)) self.assertEqual(root.find("target/permissions/mode").text, "0775") self.assertEqual(root.find("target/permissions/owner").text, "123") self.assertEqual(root.find("target/permissions/group").text, "456") self.assertEqual(root.find("target/permissions/label").text, "sec_label") self.assertIsNotNone(root.find("target/nocow")) self.assertEqual(root.find("backingStore/path").text, "/backing/image") self.assertEqual(root.find("backingStore/format").get("type"), "raw") def test_gen_xml_for_kvm_default_profile(self): """ Test virt._gen_xml(), KVM default profile case """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) self.assertEqual(root.attrib["type"], "kvm") self.assertEqual(root.find("vcpu").text, "1") self.assertEqual(root.find("memory").text, str(512 * 1024)) self.assertEqual(root.find("memory").attrib["unit"], "KiB") disks = root.findall(".//disk") self.assertEqual(len(disks), 1) disk = disks[0] root_dir = salt.config.DEFAULT_MINION_OPTS.get("root_dir") self.assertTrue(disk.find("source").attrib["file"].startswith(root_dir)) self.assertTrue("hello_system" in disk.find("source").attrib["file"]) self.assertEqual(disk.find("target").attrib["dev"], "vda") self.assertEqual(disk.find("target").attrib["bus"], "virtio") self.assertEqual(disk.find("driver").attrib["name"], "qemu") self.assertEqual(disk.find("driver").attrib["type"], "qcow2") interfaces = root.findall(".//interface") self.assertEqual(len(interfaces), 1) iface = interfaces[0] self.assertEqual(iface.attrib["type"], "bridge") self.assertEqual(iface.find("source").attrib["bridge"], "br0") self.assertEqual(iface.find("model").attrib["type"], "virtio") def test_gen_xml_for_esxi_default_profile(self): """ Test virt._gen_xml(), ESXi/vmware default profile case """ diskp = virt._disk_profile(self.mock_conn, "default", "vmware", [], "hello") nicp = virt._nic_profile("default", "vmware") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "vmware", "hvm", "x86_64", ) root = ET.fromstring(xml_data) self.assertEqual(root.attrib["type"], "vmware") self.assertEqual(root.find("vcpu").text, "1") self.assertEqual(root.find("memory").text, str(512 * 1024)) self.assertEqual(root.find("memory").attrib["unit"], "KiB") disks = root.findall(".//disk") self.assertEqual(len(disks), 1) disk = disks[0] self.assertTrue("[0]" in disk.find("source").attrib["file"]) self.assertTrue("hello_system" in disk.find("source").attrib["file"]) self.assertEqual(disk.find("target").attrib["dev"], "sda") self.assertEqual(disk.find("target").attrib["bus"], "scsi") self.assertEqual(disk.find("address").attrib["unit"], "0") interfaces = root.findall(".//interface") self.assertEqual(len(interfaces), 1) iface = interfaces[0] self.assertEqual(iface.attrib["type"], "bridge") self.assertEqual(iface.find("source").attrib["bridge"], "DEFAULT") self.assertEqual(iface.find("model").attrib["type"], "e1000") def test_gen_xml_for_xen_default_profile(self): """ Test virt._gen_xml(), XEN PV default profile case """ diskp = virt._disk_profile(self.mock_conn, "default", "xen", [], "hello") nicp = virt._nic_profile("default", "xen") with patch.dict( virt.__grains__, {"os_family": "Suse"} # pylint: disable=no-member ): xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "xen", "xen", "x86_64", boot=None, ) root = ET.fromstring(xml_data) self.assertEqual(root.attrib["type"], "xen") self.assertEqual(root.find("vcpu").text, "1") self.assertEqual(root.find("memory").text, str(512 * 1024)) self.assertEqual(root.find("memory").attrib["unit"], "KiB") self.assertEqual( root.find(".//kernel").text, "/usr/lib/grub2/x86_64-xen/grub.xen" ) disks = root.findall(".//disk") self.assertEqual(len(disks), 1) disk = disks[0] root_dir = salt.config.DEFAULT_MINION_OPTS.get("root_dir") self.assertTrue(disk.find("source").attrib["file"].startswith(root_dir)) self.assertTrue("hello_system" in disk.find("source").attrib["file"]) self.assertEqual(disk.find("target").attrib["dev"], "xvda") self.assertEqual(disk.find("target").attrib["bus"], "xen") self.assertEqual(disk.find("driver").attrib["name"], "qemu") self.assertEqual(disk.find("driver").attrib["type"], "qcow2") interfaces = root.findall(".//interface") self.assertEqual(len(interfaces), 1) iface = interfaces[0] self.assertEqual(iface.attrib["type"], "bridge") self.assertEqual(iface.find("source").attrib["bridge"], "br0") self.assertIsNone(iface.find("model")) def test_gen_xml_for_esxi_custom_profile(self): """ Test virt._gen_xml(), ESXi/vmware custom profile case """ disks = { "noeffect": [ {"first": {"size": 8192, "pool": "datastore1"}}, {"second": {"size": 4096, "pool": "datastore2"}}, ] } nics = { "noeffect": [ {"name": "eth1", "source": "ONENET"}, {"name": "eth2", "source": "TWONET"}, ] } with patch.dict( virt.__salt__, # pylint: disable=no-member {"config.get": MagicMock(side_effect=[disks, nics])}, ): diskp = virt._disk_profile( self.mock_conn, "noeffect", "vmware", [], "hello" ) nicp = virt._nic_profile("noeffect", "vmware") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "vmware", "hvm", "x86_64", ) root = ET.fromstring(xml_data) self.assertEqual(root.attrib["type"], "vmware") self.assertEqual(root.find("vcpu").text, "1") self.assertEqual(root.find("memory").text, str(512 * 1024)) self.assertEqual(root.find("memory").attrib["unit"], "KiB") self.assertTrue(len(root.findall(".//disk")) == 2) self.assertTrue(len(root.findall(".//interface")) == 2) def test_gen_xml_for_kvm_custom_profile(self): """ Test virt._gen_xml(), KVM custom profile case """ disks = { "noeffect": [ {"first": {"size": 8192, "pool": "/var/lib/images"}}, {"second": {"size": 4096, "pool": "/var/lib/images"}}, ] } nics = { "noeffect": [ {"name": "eth1", "source": "b2"}, {"name": "eth2", "source": "b2"}, ] } with patch.dict( virt.__salt__, # pylint: disable=no-member {"config.get": MagicMock(side_effect=[disks, nics])}, ): diskp = virt._disk_profile(self.mock_conn, "noeffect", "kvm", [], "hello") nicp = virt._nic_profile("noeffect", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) self.assertEqual(root.attrib["type"], "kvm") self.assertEqual(root.find("vcpu").text, "1") self.assertEqual(root.find("memory").text, str(512 * 1024)) self.assertEqual(root.find("memory").attrib["unit"], "KiB") disks = root.findall(".//disk") self.assertTrue(len(disks) == 2) self.assertEqual(disks[0].find("target").get("dev"), "vda") self.assertEqual(disks[1].find("target").get("dev"), "vdb") self.assertTrue(len(root.findall(".//interface")) == 2) def test_disk_profile_kvm_disk_pool(self): """ Test virt._disk_profile(), KVM case with pools defined. """ disks = { "noeffect": [ {"first": {"size": 8192, "pool": "mypool"}}, {"second": {"size": 4096}}, ] } # pylint: disable=no-member with patch.dict( virt.__salt__, { "config.get": MagicMock( side_effect=[ disks, os.path.join(salt.syspaths.ROOT_DIR, "default", "path"), ] ) }, ): diskp = virt._disk_profile(self.mock_conn, "noeffect", "kvm", [], "hello") pools_path = ( os.path.join(salt.syspaths.ROOT_DIR, "pools", "mypool") + os.sep ) default_path = ( os.path.join(salt.syspaths.ROOT_DIR, "default", "path") + os.sep ) self.assertEqual(len(diskp), 2) self.assertTrue(diskp[1]["source_file"].startswith(default_path)) # pylint: enable=no-member def test_disk_profile_kvm_disk_external_image(self): """ Test virt._gen_xml(), KVM case with an external image. """ with patch.dict(os.path.__dict__, {"exists": MagicMock(return_value=True)}): diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "mydisk", "source_file": "/path/to/my/image.qcow2"}], "hello", ) self.assertEqual(len(diskp), 1) self.assertEqual(diskp[0]["source_file"], ("/path/to/my/image.qcow2")) def test_disk_profile_cdrom_default(self): """ Test virt._gen_xml(), KVM case with cdrom. """ with patch.dict(os.path.__dict__, {"exists": MagicMock(return_value=True)}): diskp = virt._disk_profile( self.mock_conn, None, "kvm", [ { "name": "mydisk", "device": "cdrom", "source_file": "/path/to/my.iso", } ], "hello", ) self.assertEqual(len(diskp), 1) self.assertEqual(diskp[0]["model"], "ide") self.assertEqual(diskp[0]["format"], "raw") def test_disk_profile_pool_disk_type(self): """ Test virt._disk_profile(), with a disk pool of disk type """ self.mock_conn.listStoragePools.return_value = ["test-vdb"] self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = """ test-vdb /dev """ # No existing disk case self.mock_conn.storagePoolLookupByName.return_value.listVolumes.return_value = ( [] ) diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "mydisk", "pool": "test-vdb"}], "hello", ) self.assertEqual(diskp[0]["filename"], ("vdb1")) # Append to the end case self.mock_conn.storagePoolLookupByName.return_value.listVolumes.return_value = [ "vdb1", "vdb2", ] diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "mydisk", "pool": "test-vdb"}], "hello", ) self.assertEqual(diskp[0]["filename"], ("vdb3")) # Hole in the middle case self.mock_conn.storagePoolLookupByName.return_value.listVolumes.return_value = [ "vdb1", "vdb3", ] diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "mydisk", "pool": "test-vdb"}], "hello", ) self.assertEqual(diskp[0]["filename"], ("vdb2")) # Reuse existing volume case diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "mydisk", "pool": "test-vdb", "source_file": "vdb1"}], "hello", ) self.assertEqual(diskp[0]["filename"], ("vdb1")) def test_gen_xml_volume(self): """ Test virt._gen_xml(), generating a disk of volume type """ self.mock_conn.listStoragePools.return_value = ["default"] self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = ( "" ) self.mock_conn.storagePoolLookupByName.return_value.listVolumes.return_value = [ "myvolume" ] diskp = virt._disk_profile( self.mock_conn, None, "kvm", [ {"name": "system", "pool": "default"}, {"name": "data", "pool": "default", "source_file": "myvolume"}, ], "hello", ) self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = ( "" ) nicp = virt._nic_profile(None, "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[0] self.assertEqual(disk.attrib["device"], "disk") self.assertEqual(disk.attrib["type"], "volume") source = disk.find("source") self.assertEqual("default", source.attrib["pool"]) self.assertEqual("hello_system", source.attrib["volume"]) self.assertEqual("myvolume", root.find(".//disk[2]/source").get("volume")) # RBD volume usage auth test case self.mock_conn.listStoragePools.return_value = ["test-rbd"] self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = """ test-rbd ede33e0a-9df0-479f-8afd-55085a01b244 526133493760 589928 515081306112 libvirt-pool """ self.mock_conn.getStoragePoolCapabilities.return_value = """ """ diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "system", "pool": "test-rbd"}], "test-vm", ) xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[0] self.assertDictEqual( { "type": "network", "device": "disk", "source": { "protocol": "rbd", "name": "libvirt-pool/test-vm_system", "host": [ {"name": "ses2.tf.local"}, {"name": "ses3.tf.local", "port": "1234"}, ], "auth": { "username": "libvirt", "secret": {"type": "ceph", "usage": "pool_test-rbd"}, }, }, "target": {"dev": "vda", "bus": "virtio"}, "driver": { "name": "qemu", "type": "raw", "cache": "none", "io": "native", }, }, salt.utils.xmlutil.to_dict(disk, True), ) # RBD volume UUID auth test case self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = """ test-rbd ede33e0a-9df0-479f-8afd-55085a01b244 526133493760 589928 515081306112 libvirt-pool """ self.mock_conn.secretLookupByUUIDString.return_value.usageID.return_value = ( "pool_test-rbd" ) diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "system", "pool": "test-rbd"}], "test-vm", ) xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) self.assertDictEqual( { "username": "libvirt", "secret": {"type": "ceph", "usage": "pool_test-rbd"}, }, salt.utils.xmlutil.to_dict(root.find(".//disk/source/auth"), True), ) self.mock_conn.secretLookupByUUIDString.assert_called_once_with("some-uuid") # Disk volume test case self.mock_conn.getStoragePoolCapabilities.return_value = """ none linux fat16 """ self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = """ test-vdb """ self.mock_conn.listStoragePools.return_value = ["test-vdb"] self.mock_conn.storagePoolLookupByName.return_value.listVolumes.return_value = [ "vdb1", ] diskp = virt._disk_profile( self.mock_conn, None, "kvm", [{"name": "system", "pool": "test-vdb"}], "test-vm", ) xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[0] self.assertEqual(disk.attrib["type"], "volume") source = disk.find("source") self.assertEqual("test-vdb", source.attrib["pool"]) self.assertEqual("vdb2", source.attrib["volume"]) self.assertEqual("raw", disk.find("driver").get("type")) def test_get_xml_volume_xen_dir(self): """ Test virt._gen_xml generating disks for a Xen hypervisor """ self.mock_conn.listStoragePools.return_value = ["default"] pool_mock = MagicMock() pool_mock.XMLDesc.return_value = ( "/path/to/images" ) volume_xml = "/path/to/images/hello_system" pool_mock.storageVolLookupByName.return_value.XMLDesc.return_value = volume_xml self.mock_conn.storagePoolLookupByName.return_value = pool_mock diskp = virt._disk_profile( self.mock_conn, None, "xen", [{"name": "system", "pool": "default"}], "hello", ) xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, [], "xen", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[0] self.assertEqual(disk.attrib["type"], "file") self.assertEqual( "/path/to/images/hello_system", disk.find("source").attrib["file"] ) def test_get_xml_volume_xen_block(self): """ Test virt._gen_xml generating disks for a Xen hypervisor """ self.mock_conn.listStoragePools.return_value = ["default"] pool_mock = MagicMock() pool_mock.listVolumes.return_value = ["vol01"] volume_xml = "/dev/to/vol01" pool_mock.storageVolLookupByName.return_value.XMLDesc.return_value = volume_xml self.mock_conn.storagePoolLookupByName.return_value = pool_mock for pool_type in ["logical", "disk", "iscsi", "scsi"]: pool_mock.XMLDesc.return_value = "".format( pool_type ) diskp = virt._disk_profile( self.mock_conn, None, "xen", [{"name": "system", "pool": "default", "source_file": "vol01"}], "hello", ) xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, [], "xen", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[0] self.assertEqual(disk.attrib["type"], "block") self.assertEqual("/dev/to/vol01", disk.find("source").attrib["dev"]) def test_gen_xml_cdrom(self): """ Test virt._gen_xml(), generating a cdrom device (different disk type, no source) """ self.mock_conn.storagePoolLookupByName.return_value.XMLDesc.return_value = ( "" ) diskp = virt._disk_profile( self.mock_conn, None, "kvm", [ {"name": "system", "pool": "default"}, { "name": "tested", "device": "cdrom", "source_file": None, "model": "ide", }, { "name": "remote", "device": "cdrom", "source_file": "http://myhost:8080/url/to/image?query=foo&filter=bar", "model": "ide", }, ], "hello", ) nicp = virt._nic_profile(None, "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) disk = root.findall(".//disk")[1] self.assertEqual(disk.get("type"), "file") self.assertEqual(disk.attrib["device"], "cdrom") self.assertIsNone(disk.find("source")) self.assertEqual(disk.find("target").get("dev"), "hda") disk = root.findall(".//disk")[2] self.assertEqual(disk.get("type"), "network") self.assertEqual(disk.attrib["device"], "cdrom") self.assertEqual( { "protocol": "http", "name": "/url/to/image", "query": "query=foo&filter=bar", "host": {"name": "myhost", "port": "8080"}, }, salt.utils.xmlutil.to_dict(disk.find("source"), True), ) def test_controller_for_esxi(self): """ Test virt._gen_xml() generated device controller for ESXi/vmware """ diskp = virt._disk_profile(self.mock_conn, "default", "vmware", [], "hello") nicp = virt._nic_profile("default", "vmware") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "vmware", "hvm", "x86_64", ) root = ET.fromstring(xml_data) controllers = root.findall(".//devices/controller") self.assertTrue(len(controllers) == 1) controller = controllers[0] self.assertEqual(controller.attrib["model"], "lsilogic") def test_controller_for_kvm(self): """ Test virt._gen_xml() generated device controller for KVM """ diskp = virt._disk_profile(self.mock_conn, "default", "kvm", [], "hello") nicp = virt._nic_profile("default", "kvm") xml_data = virt._gen_xml( self.mock_conn, "hello", 1, 512, diskp, nicp, "kvm", "hvm", "x86_64", ) root = ET.fromstring(xml_data) controllers = root.findall(".//devices/controller") # There should be no controller self.assertTrue(len(controllers) == 0) def test_diff_disks(self): """ Test virt._diff_disks() """ old_disks = ET.fromstring( """ """ ).findall("disk") new_disks = ET.fromstring( """ """ ).findall("disk") ret = virt._diff_disk_lists(old_disks, new_disks) self.assertEqual( [ disk.find("source").get("file") if disk.find("source") is not None else None for disk in ret["unchanged"] ], [], ) self.assertEqual( [ disk.find("source").get("file") if disk.find("source") is not None else None for disk in ret["new"] ], ["/path/to/img3.qcow2", "/path/to/img0.qcow2", "/path/to/img4.qcow2", None], ) self.assertEqual( [disk.find("target").get("dev") for disk in ret["sorted"]], ["vda", "vdb", "vdc", "hda"], ) self.assertEqual( [ disk.find("source").get("file") if disk.find("source") is not None else None for disk in ret["sorted"] ], ["/path/to/img3.qcow2", "/path/to/img0.qcow2", "/path/to/img4.qcow2", None], ) self.assertEqual(ret["new"][1].find("target").get("bus"), "virtio") self.assertEqual( [ disk.find("source").get("file") if disk.find("source") is not None else None for disk in ret["deleted"] ], [ "/path/to/img0.qcow2", "/path/to/img1.qcow2", "/path/to/img2.qcow2", "/path/to/img4.qcow2", None, ], ) def test_diff_nics(self): """ Test virt._diff_nics() """ old_nics = ET.fromstring( """
""" ).findall("interface") new_nics = ET.fromstring( """ """ ).findall("interface") ret = virt._diff_interface_lists(old_nics, new_nics) self.assertEqual( [nic.find("mac").get("address") for nic in ret["unchanged"]], ["52:54:00:39:02:b1"], ) self.assertEqual( [nic.find("mac").get("address") for nic in ret["new"]], ["52:54:00:39:02:b2", "52:54:00:39:02:b4"], ) self.assertEqual( [nic.find("mac").get("address") for nic in ret["deleted"]], ["52:54:00:39:02:b2", "52:54:00:39:02:b3"], ) def test_init(self): """ Test init() function """ xml = """ 44454c4c-3400-105a-8033-b3c04f4b344a x86_64 Nehalem Intel tcp rdma 12367120 3091780 0 apparmor 0 dac 0 +487:+486 +487:+486 hvm 32 /usr/bin/qemu-system-i386 pc-i440fx-2.6 pc pc-0.12 /usr/bin/qemu-kvm pc-i440fx-2.6 pc pc-0.12 hvm 64 /usr/bin/qemu-system-x86_64 pc-i440fx-2.6 pc pc-0.12 /usr/bin/qemu-kvm pc-i440fx-2.6 pc pc-0.12 """ self.mock_conn.getCapabilities.return_value = xml # pylint: disable=no-member root_dir = os.path.join(salt.syspaths.ROOT_DIR, "srv", "salt-images") defineMock = MagicMock(return_value=1) self.mock_conn.defineXML = defineMock mock_chmod = MagicMock() mock_run = MagicMock() with patch.dict( os.__dict__, {"chmod": mock_chmod, "makedirs": MagicMock()} ): # pylint: disable=no-member with patch.dict( virt.__salt__, {"cmd.run": mock_run} ): # pylint: disable=no-member # Ensure the init() function allows creating VM without NIC and disk virt.init( "test vm", 2, 1234, nic=None, disk=None, seed=False, start=False ) definition = defineMock.call_args_list[0][0][0] self.assertFalse(" my_vm 1048576 1048576 1 hvm