diff --git a/semver/__init__.py b/semver/__init__.py index e0075ee..ae27fa8 100644 --- a/semver/__init__.py +++ b/semver/__init__.py @@ -3,176 +3,93 @@ import re import subprocess import sys import traceback -from enum import IntEnum -from semver.utils import get_tag_version +from typing import List, Union + +import toml + from semver.logger import logging, logger, console_logger -from semver.bump import bump_version +from semver.scm import SCM +from semver.scm.git import Git -try: - from configparser import ConfigParser -except ImportError: - # Python < 3 - from ConfigParser import ConfigParser +from semver.semver import SemVer -version = '0.0.0' +from semver.exceptions import ( + NoMergeFoundException, + NotMainBranchException, + NoGitFlowException, + SemverException, +) -class VersionType(IntEnum): - MAJOR = 0 - MINOR = 1 - PATCH = 2 +version = "0.0.0" -# Define common exceptions; -NO_MERGE_FOUND = Exception('No merge found') -NOT_MAIN_BRANCH = Exception('Not merging into a main branch') -NO_GIT_FLOW = Exception('No git flow branch found') -# Important regex -GET_COMMIT_MESSAGE = re.compile(r"Merge (branch|pull request) '?([^']+)'? (into|from) (?:'(.+)'|[^\/]+\/([^\n\\]+))") +def _setting_to_array(setting) -> List[str]: + """ + Get a setting from the config file and return it as a list + :param setting: The setting to get from the config file + :return: The setting as a list + """ + config: dict = toml.load("./.bumpversion.cfg") + semver: dict = config.get("semver", {}) + value: str = semver.get(setting, "") -class SemVer(object): + return [v.strip() for v in value.split(",") if v.strip()] - # Merge pull request #1 from RightBrain-Networks/feature/PLAT-185-versioning - - def __init__(self,global_user=False): - self.global_user = '--local' if global_user else '--global' - self.merged_branch = None - self.main_branch = None - self.version_type = None - - self.main_branches = self._setting_to_array('main_branches') - self.major_branches = self._setting_to_array('major_branches') - self.minor_branches = self._setting_to_array('minor_branches') - self.patch_branches = self._setting_to_array('patch_branches') - - def _setting_to_array(self, setting): - config = ConfigParser() - config.read('./.bumpversion.cfg') - value = config.get('semver', setting) - # filter() removes empty string which is what we get if setting is blank - return list(filter(bool, [v.strip() for v in value.split(',')])) - - # based on commit message see what branches are involved in the merge - def get_branches(self): - p = subprocess.Popen(['git', 'log', '-1'], stdout=subprocess.PIPE, - cwd='.') - #check current branch - b = subprocess.Popen(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], stdout=subprocess.PIPE, - cwd='.') - message = str(p.stdout.read()) - branch = b.stdout.read().decode('utf-8').rstrip() - logger.info('Main branch is ' + branch) - matches = GET_COMMIT_MESSAGE.search(message.replace('\\n','\n').replace('\\','')) - if matches: - if str(matches.group(4)) == branch: - self.merged_branch = matches.group(2) - else: - self.merged_branch = matches.group(5) - self.main_branch = branch - return bool(matches) - - # based on branches involved see what type of versioning should be done - def get_version_type(self): - logger.info('Merged branch is ' + self.merged_branch) - - merged_prefix = None - matches = re.findall("[^\/]*/", self.merged_branch) - if len(matches) >= 1: - merged_prefix = matches[-1][0:-1] - - if merged_prefix: - for prefix in self.major_branches: - if prefix == merged_prefix: - self.version_type = VersionType.MAJOR - return self.version_type - for prefix in self.minor_branches: - if prefix == merged_prefix: - self.version_type = VersionType.MINOR - return self.version_type - for prefix in self.patch_branches: - if prefix == merged_prefix: - self.version_type = VersionType.PATCH - return self.version_type - return False - - # setup git settings so we can commit and tag - def setup_git_user(self): - # setup git user - p = subprocess.Popen(['git', 'config', self.global_user, 'user.email', - '"versioner@semver.com"'], - cwd='.') - p = subprocess.Popen(['git', 'config', self.global_user, 'user.name', - '"Semantic Versioner"'], - cwd='.') - p.wait() - return self - - # use bumpversion to increment the appropriate version type - def version_repo(self): - config_file = "" - with open(".bumpversion.cfg", "r") as file: - config_file = file.read() - - # version repo - logger.debug("Running bumpversion of type: " + str(self.version_type.name)) - bump_version(get_tag_version(), self.version_type) - return self - - def commit_and_push(self): - # push versioning commit - p = subprocess.Popen(['git', 'push', 'origin', self.main_branch], - cwd='.') - p.wait() - - # push versioning tag - p = subprocess.Popen(['git', 'push', 'origin', '--tags'], - cwd='.') - p.wait() - return self - - # 1) get branches from last commit message - # 2) see if we're merging into a main branch - # 3) see what type of versioning we should do - # 4) version the repo - def run(self,push=True): - if not self.get_branches(): - raise NO_MERGE_FOUND - if self.main_branch not in self.main_branches: - raise NOT_MAIN_BRANCH - if not self.get_version_type(): - raise NO_GIT_FLOW - if push: - self.setup_git_user() - self.version_repo() - if push: - self.commit_and_push() - return self def main(): - parser = argparse.ArgumentParser(description='Bump Semantic Version.') - parser.add_argument('-n','--no-push', help='Do not try to push', action='store_false', dest='push') - parser.add_argument('-g','--global-user', help='Set git user at a global level, helps in jenkins', action='store_true', dest='global_user') - parser.add_argument('-D', '--debug', help='Sets logging level to DEBUG', action='store_true', dest='debug', default=False) + """Main entry point for the application""" + parser = argparse.ArgumentParser(description="Bump Semantic Version.") + parser.add_argument( + "-n", "--no-push", help="Do not try to push", action="store_false", dest="push" + ) + parser.add_argument( + "-g", + "--global-user", + help="Set git user at a global level, helps in jenkins", + action="store_true", + dest="global_user", + ) + parser.add_argument( + "-D", + "--debug", + help="Sets logging level to DEBUG", + action="store_true", + dest="debug", + default=False, + ) args = parser.parse_args() + scm: SCM = Git(global_user=args.global_user) + + app = SemVer( + scm=scm, + main_branches=_setting_to_array("main_branches"), + major_branches=_setting_to_array("major_branches"), + minor_branches=_setting_to_array("minor_branches"), + patch_branches=_setting_to_array("patch_branches"), + ) if args.debug: console_logger.setLevel(logging.DEBUG) try: - SemVer(global_user=args.global_user).run(push=args.push) + app.run(push=args.push) except Exception as e: logger.error(e) if args.debug: tb = sys.exc_info()[2] traceback.print_tb(tb) - if e == NO_MERGE_FOUND: + if e is NoMergeFoundException: exit(1) - elif e == NOT_MAIN_BRANCH: + elif e == NotMainBranchException: exit(2) - elif e == NO_GIT_FLOW: + elif e == NoGitFlowException: exit(3) else: exit(128) -if __name__ == '__main__': - try: main() - except: raise + +if __name__ == "__main__": + try: + main() + except Exception: + raise diff --git a/semver/bump.py b/semver/bump.py deleted file mode 100644 index 1694a8e..0000000 --- a/semver/bump.py +++ /dev/null @@ -1,65 +0,0 @@ -from enum import IntEnum -import subprocess, os -from semver.logger import logging, logger, console_logger - - -try: - from configparser import ConfigParser -except ImportError: - # Python < 3 - from ConfigParser import ConfigParser - -def bump_version(version, index=2, tag_repo = True, update_files=True): - v = version.split('.') - - # Bump version - v[index] = str(int(v[index]) + 1) - - # Reset subversions - i = len(v) - 1 - while i > index: - v[i] = '0' - i = i - 1 - - # Get new version - new_version = '.'.join(v) - - # Tag new version - if tag_repo and version != new_version: - p = subprocess.Popen(['git', 'tag', new_version], cwd='.') - p.wait() - - # Update local files - if update_files: - update_file_version(new_version, version) - - return new_version - -def update_file_version(new_version, version="0.0.0"): - # Open up config file - config = ConfigParser() - config.read('./.bumpversion.cfg') - - for section in config.sections(): - if len(section) > 17 and section[0:17] == "bumpversion:file:": - file_name = section[17:] - if os.path.isfile(file_name): - # Get search val from config - search_val = config.get(section, "search") - search_val = process_config_string(search_val, new_version, version) - - # Get replace val from config - replace_val = config.get(section, "replace") - replace_val = process_config_string(replace_val, new_version, version) - - # Update replace values in file - with open(file_name, 'r') as file: - filedata = file.read() - filedata =filedata.replace(search_val,replace_val) - with open(file_name, 'w') as file: - file.write(filedata) - else: - logger.warning("Tried to version file: `" + file_name + "` but it doesn't exist!") - -def process_config_string(cfg_string, new_version, version): - return cfg_string.replace("{new_version}", new_version).replace("{current_version}", version) \ No newline at end of file diff --git a/semver/exceptions.py b/semver/exceptions.py new file mode 100644 index 0000000..cbcaaed --- /dev/null +++ b/semver/exceptions.py @@ -0,0 +1,18 @@ +class SemverException(Exception): + """Semver base exception""" + pass + + +class NoMergeFoundException(SemverException): + """No merge found in commit message""" + pass + + +class NotMainBranchException(SemverException): + """Not on main branch""" + pass + + +class NoGitFlowException(SemverException): + """No git flow branch found""" + pass diff --git a/semver/logger.py b/semver/logger.py index 0b5d96d..a52bf83 100644 --- a/semver/logger.py +++ b/semver/logger.py @@ -2,7 +2,7 @@ import logging import subprocess # create logger -logger = logging.getLogger('simple_example') +logger = logging.getLogger("simple_example") logger.setLevel(logging.DEBUG) # create console handler and set level to INFO @@ -10,10 +10,10 @@ console_logger = logging.StreamHandler() console_logger.setLevel(logging.INFO) # create formatter -formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # add formatter to console_logger console_logger.setFormatter(formatter) # add console_logger to logger -logger.addHandler(console_logger) \ No newline at end of file +logger.addHandler(console_logger) diff --git a/semver/scm/__init__.py b/semver/scm/__init__.py new file mode 100644 index 0000000..be19322 --- /dev/null +++ b/semver/scm/__init__.py @@ -0,0 +1,76 @@ +from abc import ABC, abstractmethod +from typing import Union, List + +from semver.version_type import VersionType +from semver.logger import logger + + +class SCM(ABC): + @abstractmethod + def get_tag_version(self) -> str: + raise NotImplementedError + + @abstractmethod + def get_branch(self) -> str: + raise NotImplementedError + + @abstractmethod + def get_merge_branch(self) -> Union[str, None]: + raise NotImplementedError + + @abstractmethod + def commit_and_push(self, branch: str) -> None: + raise NotImplementedError + + @abstractmethod + def tag_version(self, version: str) -> None: + raise NotImplementedError + + def get_file_version(self, config: dict) -> str: + """ + :param config: The bumpversion config as a dict + :return: The current version from the config file + """ + bumpversion: Union[str, None] = config.get("bumpversion", None) + version: Union[str, None] = ( + bumpversion.get("current_version", None) if bumpversion else None + ) + + if not bumpversion: + config["bumpversion"] = {} + version = "0.0.0" + + if not version: + config["bumpversion"]["current_version"] = "0.0.0" + version = "0.0.0" + + return version + + def get_version_type( + self, + merged_branch: str, + major_branches: List[str], + minor_branches: List[str], + patch_branches: List[str], + ) -> Union[VersionType, None]: + """ + Get the version type based on the branches involved in the merge + :param merged_branch: The branch that was merged + :param major_branches: List of prefixes for major branches + :param minor_branches: List of prefixes for minor branches + :param patch_branches: List of prefixes for patch branches + :return: The version type + """ + logger.info(f"Merged branch is {merged_branch}") + + merged_prefix = merged_branch.split("/")[-1].rstrip("/") + + version_type: Union[VersionType, None] = None + if merged_prefix: + if merged_prefix in major_branches: + version_type = VersionType.MAJOR + if merged_prefix in minor_branches: + version_type = VersionType.MINOR + if merged_prefix in patch_branches: + version_type = VersionType.PATCH + return version_type diff --git a/semver/scm/git.py b/semver/scm/git.py new file mode 100644 index 0000000..bd6d26e --- /dev/null +++ b/semver/scm/git.py @@ -0,0 +1,137 @@ +import re +import subprocess +from typing import Union, List +from functools import cache + +import toml + +from semver.scm import SCM +from semver.logger import logger +from semver.version_type import VersionType +from semver.exceptions import SemverException + + +class Git(SCM): + def __init__(self, global_user: bool = False) -> None: + self.git_commit_pattern = re.compile( + r"Merge (branch|pull request) '?([^']+)'? (into|from) (?:'(.+)'|[^\/]+\/([^\n\\]+))" + ) + + self.git_bin = "git" + + self.global_user: bool = global_user + self.git_email: str = "versioner@semver.com" + self.git_user: str = "Semantic Versioner" + self._setup_git_user() + + super().__init__() + + def _run_command(self, *args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + args, + capture_output=True, + text=True, + check=True, + ) + + def _setup_git_user(self) -> None: + self._run_command( + self.git_bin, + "config", + "--global" if self.global_user else "--local", + "user.email", + f'"{self.git_email}"', + ) + + self._run_command( + self.git_bin, + "config", + "--global" if self.global_user else "--local", + "user.name", + f'"{self.git_user}"', + ) + + def get_tag_version(self) -> str: + """ + Get the latest tagged version from git tags + :return: The latest tagged version + """ + config: dict = toml.load("./.bumpversion.cfg") + + tag_expression: str = config["bumpversion"]["tag_name"].replace( + "{new_version}", "[0-9]*.[0-9]*.[0-9]*" + ) + + logger.debug(f"Tag expression: {tag_expression}") + + # Default version is `0.0.0` or what is found in + version = self.get_file_version(config) + + # If a version is found in git tags, use that the latest tagged version + tagged_versions: Union[List[str], None] = None + try: + proc = self._run_command( + self.git_bin, "tag", "--sort=v:refname", "-l", tag_expression + ) + tagged_versions = proc.stdout.rstrip().split("\n") + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"Error getting latest tagged git version: {str(e.stderr).rstrip()}" + ) + + if len(tagged_versions) > 0 and tagged_versions[-1] != "": + version = tagged_versions[-1] + + logger.debug(f"Tag Version: {version}") + return version + + @cache + def get_branch(self) -> str: + """ + Get the main branch + """ + proc = self._run_command(self.git_bin, "rev-parse", "--abbrev-ref", "HEAD") + return proc.stdout.rstrip() + + @cache + def get_merge_branch(self) -> Union[str, None]: + """ + Get the branches involved in the merge + :return: The branch involved in the merge + """ + proc = self._run_command(self.git_bin, "log", "-1") + message: str = proc.stdout + + branch: str = self.get_branch() + + logger.info(f"Main branch is {branch}") + + matches = self.git_commit_pattern.search( + message.replace("\\n", "\n").replace("\\", "") + ) + merged_branch: Union[str, None] = None + if matches: + merged_branch = str( + matches.group(2) if matches.group(4) == branch else matches.group(5) + ) + + return merged_branch + + def commit_and_push(self, branch: str) -> None: + """ + Commit and push the versioning changes + :param branch: The branch to push + """ + proc = self._run_command(self.git_bin, "push", "origin", branch) + if proc.returncode != 0: + raise SemverException( + f"Error pushing versioning changes to {branch}: {proc.stderr}" + ) + proc = self._run_command(self.git_bin, "push", "origin", "--tags") + if proc.returncode != 0: + raise SemverException( + f"Error pushing versioning changes to {branch}: {proc.stderr}" + ) + + def tag_version(self, version: str) -> None: + self._run_command(self.git_bin, "tag", version) \ No newline at end of file diff --git a/semver/scm/perforce.py b/semver/scm/perforce.py new file mode 100644 index 0000000..a82a348 --- /dev/null +++ b/semver/scm/perforce.py @@ -0,0 +1,49 @@ +import subprocess +from typing import Union, List + +import toml + +from semver.scm import SCM +from semver.logger import logger + + +class Perforce(SCM): + def __init__(self) -> None: + super().__init__() + + def get_tag_version(self) -> str: + """ + Get the latest tagged version from Perforce labels + :return: The latest tagged version + """ + config: dict = toml.load("./.bumpversion.cfg") + + tag_expression: str = config["bumpversion"]["tag_name"].replace( + "{new_version}", "[0-9]*.[0-9]*.[0-9]*" + ) + + logger.debug("Tag expression: " + str(tag_expression)) + + # Default version is `0.0.0` or what is found in + version = self.get_file_version(config) + + # If a version is found in Perforce labels, use that the latest labeled version + labeled_versions: Union[List[str], None] = None + try: + proc = subprocess.run( + ["p4", "labels", "-e", tag_expression, "-m1"], + capture_output=True, + text=True, + check=True, + ) + labeled_versions = proc.stdout.rstrip().split("\n") + except subprocess.CalledProcessError as e: + raise RuntimeError( + f"Error getting latest labeled Perforce version: {str(e.stderr).rstrip()}" + ) + + if len(labeled_versions) > 0 and labeled_versions[-1] != "": + version = labeled_versions[-1] + + logger.debug("Label Version: " + str(version)) + return version diff --git a/semver/semver.py b/semver/semver.py new file mode 100644 index 0000000..657fd55 --- /dev/null +++ b/semver/semver.py @@ -0,0 +1,170 @@ +from typing import List, Union +from pathlib import Path + +import toml + +from semver.logger import logger +from semver.scm import SCM +from semver.version_type import VersionType +from semver.exceptions import ( + NoMergeFoundException, + NotMainBranchException, + NoGitFlowException, +) + + +class SemVer: + """Primary class for handling auto-semver processing""" + + def __init__( + self, + scm: SCM, + main_branches: Union[List[str], None] = None, + major_branches: Union[List[str], None] = None, + minor_branches: Union[List[str], None] = None, + patch_branches: Union[List[str], None] = None, + ): + """ + Initialize the SemVer object + :param global_user: Toggles git user at a global level, useful for build servers + :param scm: The source control manager to use + :param main_branches: Branches to run versioning on + :param major_branches: List of prefixes for major branches + :param minor_branches: List of prefixes for minor branches + :param patch_branches: List of prefixes for patch branches + """ + self._merged_branch: Union[str, None] = None + self._branch: Union[str, None] = None + self._version_type: Union[VersionType, None] = None + + self._main_branches: List[str] = main_branches if main_branches else [] + self._major_branches: List[str] = major_branches if major_branches else [] + self._minor_branches: List[str] = minor_branches if minor_branches else [] + self._patch_branches: List[str] = patch_branches if patch_branches else [] + + self._scm: SCM = scm + + def _version_repo(self) -> None: + """ + Use bump_version to update the repo version + """ + version = self._scm.get_tag_version() + if not self._version_type: + raise NoMergeFoundException() + + logger.debug(f"Running bumpversion of type: {self._version_type.name}") + self._bump_version(version, self._version_type) + + def _process_config_string(self, cfg_string, new_version, version): + return cfg_string.replace("{new_version}", new_version).replace( + "{current_version}", version + ) + + def _bump_version( + self, + version: str, + index: VersionType = VersionType.MINOR, + tag_repo: bool = True, + update_files: bool = True, + ) -> str: + """ + Bump the version of the repo + :param version: The current version + :param index: The index of the version to bump + :param tag_repo: Whether or not to tag the repo + :param update_files: Whether or not to update the files + :return: The new version + """ + v: List[str] = version.split(".") + + # Bump version + v[index] = str(int(v[index]) + 1) + + # Reset subversions + i = len(v) - 1 + while i > index: + v[i] = "0" + i = i - 1 + + # Get new version + new_version = ".".join(v) + + # Tag new version + if tag_repo and version != new_version: + self._scm.tag_version(new_version) + + # Update local files + if update_files: + self._update_file_version(new_version, version) + + return new_version + + def _update_file_version(self, new_version: str, version: str = "0.0.0"): + """ + Update the version in the config file + :param new_version: The new version + :param version: The current version + """ + # Open up config file + config = toml.load("./.bumpversion.cfg") + + bump_version_file_prefix = "bumpversion:file:" + bump_version_file_prefix_len = len(bump_version_file_prefix) + for section in config: + if section.startswith(bump_version_file_prefix): + file_path = Path(section[bump_version_file_prefix_len:]) + section_data = config[section] + if file_path.is_file(): + # Get search val from config + search_val = section_data["search"] + search_val = self._process_config_string( + search_val, new_version, version + ) + + # Get replace val from config + replace_val = section_data["replace"] + replace_val = self._process_config_string( + replace_val, new_version, version + ) + + # Update replace values in file + with open(file_path, "r") as file: + filedata = file.read() + filedata = filedata.replace(search_val, replace_val) + with open(file_path, "w") as file: + file.write(filedata) + else: + logger.warning( + f"Tried to version file: '{file_path}' but it doesn't exist!" + ) + + def run(self, push=True): + """ + Run the versioning process + 1) get branches from last commit message + 2) see if we're merging into a main branch + 3) see what type of versioning we should do + 4) version the repo + :param push: Whether or not to push the changes + """ + self._branch = self._scm.get_branch() + self._merged_branch = self._scm.get_merge_branch() + + if not self._merged_branch: + raise NoMergeFoundException() + if self._branch not in self._main_branches: + raise NotMainBranchException() + + self._version_type = self._scm.get_version_type( + self._branch, + self._major_branches, + self._minor_branches, + self._patch_branches, + ) + + if not self._version_type: + raise NoGitFlowException() + + self._version_repo() + if push: + self._scm.commit_and_push(self._branch) diff --git a/semver/utils.py b/semver/utils.py deleted file mode 100644 index d0d325a..0000000 --- a/semver/utils.py +++ /dev/null @@ -1,72 +0,0 @@ -import subprocess -from typing import Union, List - -try: - from subprocess import DEVNULL # py3k -except ImportError: - import os - - DEVNULL = open(os.devnull, "wb") - -import toml - -from semver.logger import logging, logger, console_logger - - -def get_tag_version() -> str: - """ - Get the latest tagged version from git tags - :return: The latest tagged version - """ - config: dict = toml.load("./.bumpversion.cfg") - - tag_expression: str = config["bumpversion"]["tag_name"].replace( - "{new_version}", "[0-9]*.[0-9]*.[0-9]*" - ) - - logger.debug("Tag expression: " + str(tag_expression)) - - # Default version is `0.0.0` or what is found in - version = get_file_version(config) - - # If a version is found in git tags, use that the latest tagged version - tagged_versions: Union[List[str], None] = None - try: - proc = subprocess.run( - ["git", "tag", "--sort=v:refname", "-l", tag_expression], - capture_output=True, - text=True, - check=True, - ) - tagged_versions = proc.stdout.rstrip().split("\n") - except subprocess.CalledProcessError as e: - raise RuntimeError( - f"Error getting latest tagged git version: {str(e.stderr).rstrip()}" - ) - - if len(tagged_versions) > 0 and tagged_versions[-1] != "": - version = tagged_versions[-1] - - logger.debug("Tag Version: " + str(version)) - return version - - -def get_file_version(config: dict) -> str: - """ - :param config: The bumpversion config as a dict - :return: The current version from the config file - """ - bumpversion: Union[str, None] = config.get("bumpversion", None) - version: Union[str, None] = ( - bumpversion.get("current_version", None) if bumpversion else None - ) - - if not bumpversion: - config["bumpversion"] = {} - version = "0.0.0" - - if not version: - config["bumpversion"]["current_version"] = "0.0.0" - version = "0.0.0" - - return version diff --git a/semver/version_type.py b/semver/version_type.py new file mode 100644 index 0000000..a22dac1 --- /dev/null +++ b/semver/version_type.py @@ -0,0 +1,7 @@ +from enum import IntEnum + + +class VersionType(IntEnum): + MAJOR = 0 + MINOR = 1 + PATCH = 2