feat: start implementing VM editing and configuration

Introduced a new set of Python classes under `classes/` directory,
enabling functionality to edit and configure virtual machines (VMs)
through a text-based user interface (TUI). Key components include
handling for Linux distribution-specific configurations, partition
mounting, and VM disk manipulation using QEMU tools. This structure
allows for scalable VM management tasks such as user modification,
network configuration, hostname setting, and disk conversion from VMDK
to raw image format.

Additionally, basic infrastructure such as `.gitignore`, `README.md`,
and `requirements.txt` setup is included to ensure a clean development
environment and provide necessary instructions and dependencies for
running the script.

The TUI, powered by npyscreen, facilitates user interaction for
configuring newly cloned VMs, focusing on ease of use and automation of
repetitive tasks.

This comprehensive suite marks a pivotal step towards automating VM
management and customization processes, significantly reducing manual
overhead for system administrators and developers dealing with virtual
environments.
This commit is contained in:
Kumi 2024-03-29 13:54:37 +01:00
commit b09bec7a70
Signed by: kumi
GPG key ID: ECBCC9082395383F
11 changed files with 321 additions and 0 deletions

5
.gitignore vendored Normal file
View file

@ -0,0 +1,5 @@
.venv/
venv/
*.pyc
__pycache__/
.vscode/

1
README.md Normal file
View file

@ -0,0 +1 @@
To use this script, you need the `qemu-img` tool, which is included with `qemu`. You also need `fdisk`, which is usually preinstalled anyway, as well as the `losetup` and `mount` utilities which should also come preinstalled.

0
__init__.py Normal file
View file

0
classes/__init__.py Normal file
View file

28
classes/linux.py Normal file
View file

@ -0,0 +1,28 @@
from abc import ABC, abstractmethod
class LinuxDistroHandler(ABC):
@abstractmethod
def list_modify_users(self, mount_point):
pass
@abstractmethod
def configure_networking(self, mount_point):
pass
@abstractmethod
def set_hostname(self, mount_point, hostname):
pass
class GenericLinuxHandler(LinuxDistroHandler):
def list_modify_users(self, mount_point):
# TODO: List /etc/passwd contents and allow modifications
pass
def configure_networking(self, mount_point):
# TODO: Allow network interface configuration
pass
def set_hostname(self, mount_point, hostname):
# TODO: Set the system hostname
pass

50
classes/mount.py Normal file
View file

@ -0,0 +1,50 @@
import subprocess
import tempfile
import os
import sys
class Mountpoint(tempfile.TemporaryDirectory):
pass
class PartitionMounter:
def __init__(self, img_file, partition_number):
self.img_file = img_file
self.partition_number = partition_number
self.mountpoint = None
self.loop_device = None
def setup_loop_device(self):
self.loop_device = subprocess.check_output(['losetup', '--show', '-fP', self.img_file], text=True).strip()
print(f"Loop device created: {self.loop_device}")
def mount_partition(self):
if self.loop_device is None:
self.setup_loop_device()
# Use partprobe to make the kernel aware of the partition
subprocess.run(['partprobe', self.loop_device], check=True)
# Calculate partition path
partition_path = f"{self.loop_device}p{self.partition_number}"
# Create a temporary directory to mount the partition
self.mountpoint = Mountpoint()
# Mount the partition to the temporary directory
subprocess.run(['mount', partition_path, self.mountpoint.name], check=True)
def cleanup(self):
if self.mountpoint:
subprocess.run(['umount', self.mountpoint.name], check=True)
self.mountpoint.cleanup()
if self.loop_device:
subprocess.run(['losetup', '-d', self.loop_device], check=True)
def __enter__(self):
self.mount_partition()
return self.mountpoint.name
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()

91
classes/qemu.py Normal file
View file

@ -0,0 +1,91 @@
from pathlib import Path
from typing import Tuple
import os
import shutil
import subprocess
import utils.fdisk
from .mount import PartitionMounter
class VMEditor:
def __init__(self, source: os.PathLike, destination: os.PathLike):
self.source_path = Path(source)
self.destination_path = Path(destination)
self.system_partition: Tuple[str, int] = None
self.mount: PartitionMounter = None
def create_copy(self):
shutil.copytree(self.source_path, self.destination_path)
def update_filenames(self):
for file in self.destination_path.iterdir():
if self.source_path.name in file.name:
file.rename(
self.destination_path
/ file.name.replace(
self.source_path.name, self.destination_path.name
)
)
def update_file_contents(self):
vmxpath = self.destination_path / f"{self.destination_path.name}.vmx"
content = ""
with (vmxpath).open("r") as vmx_in:
content = vmx_in.read()
content = content.replace(self.source_path.name, self.destination_path.name)
with vmxpath.open("w") as vmx_out:
vmx_out.write(content)
for vmdk in self.destination_path.glob("*.vmdk"):
if vmdk.stat().st_size < 1e5:
with vmdk.open("r") as vmdk_in:
content = vmdk_in.read()
content = content.replace(
self.source_path.name, self.destination_path.name
)
with vmdk.open("w") as vmdk_out:
content = vmdk_out.write(content)
def vmdk_to_img(self):
for vmdk in self.destination_path.glob("*.vmdk"):
if vmdk.stem.endswith("-flat"):
continue
img = self.destination_path / f"{vmdk.stem}.img"
subprocess.run(
[
"qemu-img",
"convert",
"-O",
"raw",
str(vmdk),
str(img),
],
check=True,
)
vmdk.unlink()
if (flat := self.destination_path / f"{vmdk.stem}-flat.vmdk").exists():
flat.unlink()
def get_partitions(self):
files = []
for img in self.destination_path.glob("*.img"):
files += utils.fdisk.get_partitions(img)
return files
def mount_system_partition(self):
self.mount = PartitionMounter(*self.system_partition)
self.mount.mount_partition()

104
classes/tui.py Normal file
View file

@ -0,0 +1,104 @@
import npyscreen
import time
import sys
from .qemu import VMEditor
class VMApp(npyscreen.NPSAppManaged):
editor: VMEditor = None
def onStart(self):
self.addForm("MAIN", VMConfiguratorForm, name="VM Configurator")
self.addForm("PARTITION", PartitionSelectorForm, name="Partition Selector")
class VMConfiguratorForm(npyscreen.FormBaseNew):
def create(self):
self.sourceDir = self.add(npyscreen.TitleFilename, name="Source Directory:")
self.newName = self.add(npyscreen.TitleText, name="New Name:")
self.button = self.add(
npyscreen.ButtonPress, name="Start", when_pressed_function=self.on_start
)
def on_start(self):
source_dir = self.sourceDir.value
new_name = self.newName.value
if npyscreen.notify_ok_cancel(
f"Directory: {source_dir}\nNew Name: {new_name}",
title="Info",
form_color="CONTROL",
):
self.process()
def process(self):
self.parentApp.editor = VMEditor(self.sourceDir.value, self.newName.value)
message = "Processing:\n\n"
npyscreen.notify_wait(
message := message + "- Copying files\n", title="Please wait..."
)
# self.parentApp.editor.create_copy()
npyscreen.notify_wait(
message := message + "- Updating file names\n", title="Please wait..."
)
# self.parentApp.editor.update_filenames()
npyscreen.notify_wait(
message := message + "- Updating configuration file\n",
title="Please wait...",
)
# self.parentApp.editor.update_file_contents()
npyscreen.notify_wait(
message := message + "- Converting disk to .img\n", title="Please wait..."
)
# self.parentApp.editor.vmdk_to_img()
npyscreen.notify_wait(
message + "- Preparing partition selector", title="Please wait..."
)
self.parentApp.switchForm("PARTITION")
class PartitionSelectorForm(npyscreen.FormBaseNew):
def create(self):
self.partition = self.add(npyscreen.SelectOne, values=[], max_height=10)
self.button = self.add(
npyscreen.ButtonPress, name="Select", when_pressed_function=self.on_select
)
def pre_edit_loop(self):
self.partition.values = [
f'{partition["device"]} {partition["size"]} ({partition["type"]}{partition["boot"]})'
for partition in self.parentApp.editor.get_partitions()
]
def on_select(self):
partition = self.partition.values[self.partition.value[0]]
path = partition.split(" ")[0]
file = ".img".join(path.split(".img")[:-1]) + ".img"
number = path.split(".img")[-1]
message = "Processing:\n\n"
npyscreen.notify_wait(
message := message + "- Mounting system partition\n", title="Please wait..."
)
self.parentApp.editor.system_partition = (file, number)
self.parentApp.editor.mount_system_partition()
if npyscreen.notify_ok_cancel(
f"Mountpoint: {self.parentApp.editor.mount.mountpoint.name}",
title="Info",
form_color="CONTROL",
):
sys.exit()

1
requirements.txt Normal file
View file

@ -0,0 +1 @@
npyscreen

11
run.py Normal file
View file

@ -0,0 +1,11 @@
from classes.tui import VMApp
def main():
try:
app = VMApp()
app.run()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()

30
utils/fdisk.py Normal file
View file

@ -0,0 +1,30 @@
import subprocess
def get_partitions(image_file):
result = subprocess.run(
["fdisk", "-l", image_file], stdout=subprocess.PIPE, text=True
)
output = result.stdout
headers = []
for line in output.splitlines():
if line.startswith("Device"):
headers = line.split()
partitions = []
for line in output.splitlines():
if line.startswith(str(image_file)):
parts = line.split()
boot_flag = "*" if "*" in parts else ""
corr = int(bool(boot_flag)) - 1
partition_info = {
"device": parts[0],
"boot": boot_flag,
"size": parts[headers.index("Size") + corr],
"type": " ".join(parts[headers.index("Type") + corr :]),
}
partitions.append(partition_info)
return partitions