Major semver update and refactor

This commit is contained in:
Layla 2023-06-30 22:53:38 -04:00
parent 22035b89e0
commit 92eb0cff39
10 changed files with 523 additions and 286 deletions

View File

@ -3,176 +3,93 @@ import re
import subprocess
import sys
import traceback
from enum import IntEnum
from semver.utils import get_tag_version
from typing import List, Union
import toml
from semver.logger import logging, logger, console_logger
from semver.bump import bump_version
from semver.scm import SCM
from semver.scm.git import Git
try:
from configparser import ConfigParser
except ImportError:
# Python < 3
from ConfigParser import ConfigParser
from semver.semver import SemVer
version = '0.0.0'
from semver.exceptions import (
NoMergeFoundException,
NotMainBranchException,
NoGitFlowException,
SemverException,
)
class VersionType(IntEnum):
MAJOR = 0
MINOR = 1
PATCH = 2
version = "0.0.0"
# Define common exceptions;
NO_MERGE_FOUND = Exception('No merge found')
NOT_MAIN_BRANCH = Exception('Not merging into a main branch')
NO_GIT_FLOW = Exception('No git flow branch found')
# Important regex
GET_COMMIT_MESSAGE = re.compile(r"Merge (branch|pull request) '?([^']+)'? (into|from) (?:'(.+)'|[^\/]+\/([^\n\\]+))")
def _setting_to_array(setting) -> List[str]:
"""
Get a setting from the config file and return it as a list
:param setting: The setting to get from the config file
:return: The setting as a list
"""
config: dict = toml.load("./.bumpversion.cfg")
semver: dict = config.get("semver", {})
value: str = semver.get(setting, "")
class SemVer(object):
return [v.strip() for v in value.split(",") if v.strip()]
# Merge pull request #1 from RightBrain-Networks/feature/PLAT-185-versioning
def __init__(self,global_user=False):
self.global_user = '--local' if global_user else '--global'
self.merged_branch = None
self.main_branch = None
self.version_type = None
self.main_branches = self._setting_to_array('main_branches')
self.major_branches = self._setting_to_array('major_branches')
self.minor_branches = self._setting_to_array('minor_branches')
self.patch_branches = self._setting_to_array('patch_branches')
def _setting_to_array(self, setting):
config = ConfigParser()
config.read('./.bumpversion.cfg')
value = config.get('semver', setting)
# filter() removes empty string which is what we get if setting is blank
return list(filter(bool, [v.strip() for v in value.split(',')]))
# based on commit message see what branches are involved in the merge
def get_branches(self):
p = subprocess.Popen(['git', 'log', '-1'], stdout=subprocess.PIPE,
cwd='.')
#check current branch
b = subprocess.Popen(['git', 'rev-parse', '--abbrev-ref', 'HEAD'], stdout=subprocess.PIPE,
cwd='.')
message = str(p.stdout.read())
branch = b.stdout.read().decode('utf-8').rstrip()
logger.info('Main branch is ' + branch)
matches = GET_COMMIT_MESSAGE.search(message.replace('\\n','\n').replace('\\',''))
if matches:
if str(matches.group(4)) == branch:
self.merged_branch = matches.group(2)
else:
self.merged_branch = matches.group(5)
self.main_branch = branch
return bool(matches)
# based on branches involved see what type of versioning should be done
def get_version_type(self):
logger.info('Merged branch is ' + self.merged_branch)
merged_prefix = None
matches = re.findall("[^\/]*/", self.merged_branch)
if len(matches) >= 1:
merged_prefix = matches[-1][0:-1]
if merged_prefix:
for prefix in self.major_branches:
if prefix == merged_prefix:
self.version_type = VersionType.MAJOR
return self.version_type
for prefix in self.minor_branches:
if prefix == merged_prefix:
self.version_type = VersionType.MINOR
return self.version_type
for prefix in self.patch_branches:
if prefix == merged_prefix:
self.version_type = VersionType.PATCH
return self.version_type
return False
# setup git settings so we can commit and tag
def setup_git_user(self):
# setup git user
p = subprocess.Popen(['git', 'config', self.global_user, 'user.email',
'"versioner@semver.com"'],
cwd='.')
p = subprocess.Popen(['git', 'config', self.global_user, 'user.name',
'"Semantic Versioner"'],
cwd='.')
p.wait()
return self
# use bumpversion to increment the appropriate version type
def version_repo(self):
config_file = ""
with open(".bumpversion.cfg", "r") as file:
config_file = file.read()
# version repo
logger.debug("Running bumpversion of type: " + str(self.version_type.name))
bump_version(get_tag_version(), self.version_type)
return self
def commit_and_push(self):
# push versioning commit
p = subprocess.Popen(['git', 'push', 'origin', self.main_branch],
cwd='.')
p.wait()
# push versioning tag
p = subprocess.Popen(['git', 'push', 'origin', '--tags'],
cwd='.')
p.wait()
return self
# 1) get branches from last commit message
# 2) see if we're merging into a main branch
# 3) see what type of versioning we should do
# 4) version the repo
def run(self,push=True):
if not self.get_branches():
raise NO_MERGE_FOUND
if self.main_branch not in self.main_branches:
raise NOT_MAIN_BRANCH
if not self.get_version_type():
raise NO_GIT_FLOW
if push:
self.setup_git_user()
self.version_repo()
if push:
self.commit_and_push()
return self
def main():
parser = argparse.ArgumentParser(description='Bump Semantic Version.')
parser.add_argument('-n','--no-push', help='Do not try to push', action='store_false', dest='push')
parser.add_argument('-g','--global-user', help='Set git user at a global level, helps in jenkins', action='store_true', dest='global_user')
parser.add_argument('-D', '--debug', help='Sets logging level to DEBUG', action='store_true', dest='debug', default=False)
"""Main entry point for the application"""
parser = argparse.ArgumentParser(description="Bump Semantic Version.")
parser.add_argument(
"-n", "--no-push", help="Do not try to push", action="store_false", dest="push"
)
parser.add_argument(
"-g",
"--global-user",
help="Set git user at a global level, helps in jenkins",
action="store_true",
dest="global_user",
)
parser.add_argument(
"-D",
"--debug",
help="Sets logging level to DEBUG",
action="store_true",
dest="debug",
default=False,
)
args = parser.parse_args()
scm: SCM = Git(global_user=args.global_user)
app = SemVer(
scm=scm,
main_branches=_setting_to_array("main_branches"),
major_branches=_setting_to_array("major_branches"),
minor_branches=_setting_to_array("minor_branches"),
patch_branches=_setting_to_array("patch_branches"),
)
if args.debug:
console_logger.setLevel(logging.DEBUG)
try:
SemVer(global_user=args.global_user).run(push=args.push)
app.run(push=args.push)
except Exception as e:
logger.error(e)
if args.debug:
tb = sys.exc_info()[2]
traceback.print_tb(tb)
if e == NO_MERGE_FOUND:
if e is NoMergeFoundException:
exit(1)
elif e == NOT_MAIN_BRANCH:
elif e == NotMainBranchException:
exit(2)
elif e == NO_GIT_FLOW:
elif e == NoGitFlowException:
exit(3)
else:
exit(128)
if __name__ == '__main__':
try: main()
except: raise
if __name__ == "__main__":
try:
main()
except Exception:
raise

View File

@ -1,65 +0,0 @@
from enum import IntEnum
import subprocess, os
from semver.logger import logging, logger, console_logger
try:
from configparser import ConfigParser
except ImportError:
# Python < 3
from ConfigParser import ConfigParser
def bump_version(version, index=2, tag_repo = True, update_files=True):
v = version.split('.')
# Bump version
v[index] = str(int(v[index]) + 1)
# Reset subversions
i = len(v) - 1
while i > index:
v[i] = '0'
i = i - 1
# Get new version
new_version = '.'.join(v)
# Tag new version
if tag_repo and version != new_version:
p = subprocess.Popen(['git', 'tag', new_version], cwd='.')
p.wait()
# Update local files
if update_files:
update_file_version(new_version, version)
return new_version
def update_file_version(new_version, version="0.0.0"):
# Open up config file
config = ConfigParser()
config.read('./.bumpversion.cfg')
for section in config.sections():
if len(section) > 17 and section[0:17] == "bumpversion:file:":
file_name = section[17:]
if os.path.isfile(file_name):
# Get search val from config
search_val = config.get(section, "search")
search_val = process_config_string(search_val, new_version, version)
# Get replace val from config
replace_val = config.get(section, "replace")
replace_val = process_config_string(replace_val, new_version, version)
# Update replace values in file
with open(file_name, 'r') as file:
filedata = file.read()
filedata =filedata.replace(search_val,replace_val)
with open(file_name, 'w') as file:
file.write(filedata)
else:
logger.warning("Tried to version file: `" + file_name + "` but it doesn't exist!")
def process_config_string(cfg_string, new_version, version):
return cfg_string.replace("{new_version}", new_version).replace("{current_version}", version)

18
semver/exceptions.py Normal file
View File

@ -0,0 +1,18 @@
class SemverException(Exception):
"""Semver base exception"""
pass
class NoMergeFoundException(SemverException):
"""No merge found in commit message"""
pass
class NotMainBranchException(SemverException):
"""Not on main branch"""
pass
class NoGitFlowException(SemverException):
"""No git flow branch found"""
pass

View File

@ -2,7 +2,7 @@ import logging
import subprocess
# create logger
logger = logging.getLogger('simple_example')
logger = logging.getLogger("simple_example")
logger.setLevel(logging.DEBUG)
# create console handler and set level to INFO
@ -10,10 +10,10 @@ console_logger = logging.StreamHandler()
console_logger.setLevel(logging.INFO)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# add formatter to console_logger
console_logger.setFormatter(formatter)
# add console_logger to logger
logger.addHandler(console_logger)
logger.addHandler(console_logger)

76
semver/scm/__init__.py Normal file
View File

@ -0,0 +1,76 @@
from abc import ABC, abstractmethod
from typing import Union, List
from semver.version_type import VersionType
from semver.logger import logger
class SCM(ABC):
@abstractmethod
def get_tag_version(self) -> str:
raise NotImplementedError
@abstractmethod
def get_branch(self) -> str:
raise NotImplementedError
@abstractmethod
def get_merge_branch(self) -> Union[str, None]:
raise NotImplementedError
@abstractmethod
def commit_and_push(self, branch: str) -> None:
raise NotImplementedError
@abstractmethod
def tag_version(self, version: str) -> None:
raise NotImplementedError
def get_file_version(self, config: dict) -> str:
"""
:param config: The bumpversion config as a dict
:return: The current version from the config file
"""
bumpversion: Union[str, None] = config.get("bumpversion", None)
version: Union[str, None] = (
bumpversion.get("current_version", None) if bumpversion else None
)
if not bumpversion:
config["bumpversion"] = {}
version = "0.0.0"
if not version:
config["bumpversion"]["current_version"] = "0.0.0"
version = "0.0.0"
return version
def get_version_type(
self,
merged_branch: str,
major_branches: List[str],
minor_branches: List[str],
patch_branches: List[str],
) -> Union[VersionType, None]:
"""
Get the version type based on the branches involved in the merge
:param merged_branch: The branch that was merged
:param major_branches: List of prefixes for major branches
:param minor_branches: List of prefixes for minor branches
:param patch_branches: List of prefixes for patch branches
:return: The version type
"""
logger.info(f"Merged branch is {merged_branch}")
merged_prefix = merged_branch.split("/")[-1].rstrip("/")
version_type: Union[VersionType, None] = None
if merged_prefix:
if merged_prefix in major_branches:
version_type = VersionType.MAJOR
if merged_prefix in minor_branches:
version_type = VersionType.MINOR
if merged_prefix in patch_branches:
version_type = VersionType.PATCH
return version_type

137
semver/scm/git.py Normal file
View File

@ -0,0 +1,137 @@
import re
import subprocess
from typing import Union, List
from functools import cache
import toml
from semver.scm import SCM
from semver.logger import logger
from semver.version_type import VersionType
from semver.exceptions import SemverException
class Git(SCM):
def __init__(self, global_user: bool = False) -> None:
self.git_commit_pattern = re.compile(
r"Merge (branch|pull request) '?([^']+)'? (into|from) (?:'(.+)'|[^\/]+\/([^\n\\]+))"
)
self.git_bin = "git"
self.global_user: bool = global_user
self.git_email: str = "versioner@semver.com"
self.git_user: str = "Semantic Versioner"
self._setup_git_user()
super().__init__()
def _run_command(self, *args: str) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
capture_output=True,
text=True,
check=True,
)
def _setup_git_user(self) -> None:
self._run_command(
self.git_bin,
"config",
"--global" if self.global_user else "--local",
"user.email",
f'"{self.git_email}"',
)
self._run_command(
self.git_bin,
"config",
"--global" if self.global_user else "--local",
"user.name",
f'"{self.git_user}"',
)
def get_tag_version(self) -> str:
"""
Get the latest tagged version from git tags
:return: The latest tagged version
"""
config: dict = toml.load("./.bumpversion.cfg")
tag_expression: str = config["bumpversion"]["tag_name"].replace(
"{new_version}", "[0-9]*.[0-9]*.[0-9]*"
)
logger.debug(f"Tag expression: {tag_expression}")
# Default version is `0.0.0` or what is found in
version = self.get_file_version(config)
# If a version is found in git tags, use that the latest tagged version
tagged_versions: Union[List[str], None] = None
try:
proc = self._run_command(
self.git_bin, "tag", "--sort=v:refname", "-l", tag_expression
)
tagged_versions = proc.stdout.rstrip().split("\n")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Error getting latest tagged git version: {str(e.stderr).rstrip()}"
)
if len(tagged_versions) > 0 and tagged_versions[-1] != "":
version = tagged_versions[-1]
logger.debug(f"Tag Version: {version}")
return version
@cache
def get_branch(self) -> str:
"""
Get the main branch
"""
proc = self._run_command(self.git_bin, "rev-parse", "--abbrev-ref", "HEAD")
return proc.stdout.rstrip()
@cache
def get_merge_branch(self) -> Union[str, None]:
"""
Get the branches involved in the merge
:return: The branch involved in the merge
"""
proc = self._run_command(self.git_bin, "log", "-1")
message: str = proc.stdout
branch: str = self.get_branch()
logger.info(f"Main branch is {branch}")
matches = self.git_commit_pattern.search(
message.replace("\\n", "\n").replace("\\", "")
)
merged_branch: Union[str, None] = None
if matches:
merged_branch = str(
matches.group(2) if matches.group(4) == branch else matches.group(5)
)
return merged_branch
def commit_and_push(self, branch: str) -> None:
"""
Commit and push the versioning changes
:param branch: The branch to push
"""
proc = self._run_command(self.git_bin, "push", "origin", branch)
if proc.returncode != 0:
raise SemverException(
f"Error pushing versioning changes to {branch}: {proc.stderr}"
)
proc = self._run_command(self.git_bin, "push", "origin", "--tags")
if proc.returncode != 0:
raise SemverException(
f"Error pushing versioning changes to {branch}: {proc.stderr}"
)
def tag_version(self, version: str) -> None:
self._run_command(self.git_bin, "tag", version)

49
semver/scm/perforce.py Normal file
View File

@ -0,0 +1,49 @@
import subprocess
from typing import Union, List
import toml
from semver.scm import SCM
from semver.logger import logger
class Perforce(SCM):
def __init__(self) -> None:
super().__init__()
def get_tag_version(self) -> str:
"""
Get the latest tagged version from Perforce labels
:return: The latest tagged version
"""
config: dict = toml.load("./.bumpversion.cfg")
tag_expression: str = config["bumpversion"]["tag_name"].replace(
"{new_version}", "[0-9]*.[0-9]*.[0-9]*"
)
logger.debug("Tag expression: " + str(tag_expression))
# Default version is `0.0.0` or what is found in
version = self.get_file_version(config)
# If a version is found in Perforce labels, use that the latest labeled version
labeled_versions: Union[List[str], None] = None
try:
proc = subprocess.run(
["p4", "labels", "-e", tag_expression, "-m1"],
capture_output=True,
text=True,
check=True,
)
labeled_versions = proc.stdout.rstrip().split("\n")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Error getting latest labeled Perforce version: {str(e.stderr).rstrip()}"
)
if len(labeled_versions) > 0 and labeled_versions[-1] != "":
version = labeled_versions[-1]
logger.debug("Label Version: " + str(version))
return version

170
semver/semver.py Normal file
View File

@ -0,0 +1,170 @@
from typing import List, Union
from pathlib import Path
import toml
from semver.logger import logger
from semver.scm import SCM
from semver.version_type import VersionType
from semver.exceptions import (
NoMergeFoundException,
NotMainBranchException,
NoGitFlowException,
)
class SemVer:
"""Primary class for handling auto-semver processing"""
def __init__(
self,
scm: SCM,
main_branches: Union[List[str], None] = None,
major_branches: Union[List[str], None] = None,
minor_branches: Union[List[str], None] = None,
patch_branches: Union[List[str], None] = None,
):
"""
Initialize the SemVer object
:param global_user: Toggles git user at a global level, useful for build servers
:param scm: The source control manager to use
:param main_branches: Branches to run versioning on
:param major_branches: List of prefixes for major branches
:param minor_branches: List of prefixes for minor branches
:param patch_branches: List of prefixes for patch branches
"""
self._merged_branch: Union[str, None] = None
self._branch: Union[str, None] = None
self._version_type: Union[VersionType, None] = None
self._main_branches: List[str] = main_branches if main_branches else []
self._major_branches: List[str] = major_branches if major_branches else []
self._minor_branches: List[str] = minor_branches if minor_branches else []
self._patch_branches: List[str] = patch_branches if patch_branches else []
self._scm: SCM = scm
def _version_repo(self) -> None:
"""
Use bump_version to update the repo version
"""
version = self._scm.get_tag_version()
if not self._version_type:
raise NoMergeFoundException()
logger.debug(f"Running bumpversion of type: {self._version_type.name}")
self._bump_version(version, self._version_type)
def _process_config_string(self, cfg_string, new_version, version):
return cfg_string.replace("{new_version}", new_version).replace(
"{current_version}", version
)
def _bump_version(
self,
version: str,
index: VersionType = VersionType.MINOR,
tag_repo: bool = True,
update_files: bool = True,
) -> str:
"""
Bump the version of the repo
:param version: The current version
:param index: The index of the version to bump
:param tag_repo: Whether or not to tag the repo
:param update_files: Whether or not to update the files
:return: The new version
"""
v: List[str] = version.split(".")
# Bump version
v[index] = str(int(v[index]) + 1)
# Reset subversions
i = len(v) - 1
while i > index:
v[i] = "0"
i = i - 1
# Get new version
new_version = ".".join(v)
# Tag new version
if tag_repo and version != new_version:
self._scm.tag_version(new_version)
# Update local files
if update_files:
self._update_file_version(new_version, version)
return new_version
def _update_file_version(self, new_version: str, version: str = "0.0.0"):
"""
Update the version in the config file
:param new_version: The new version
:param version: The current version
"""
# Open up config file
config = toml.load("./.bumpversion.cfg")
bump_version_file_prefix = "bumpversion:file:"
bump_version_file_prefix_len = len(bump_version_file_prefix)
for section in config:
if section.startswith(bump_version_file_prefix):
file_path = Path(section[bump_version_file_prefix_len:])
section_data = config[section]
if file_path.is_file():
# Get search val from config
search_val = section_data["search"]
search_val = self._process_config_string(
search_val, new_version, version
)
# Get replace val from config
replace_val = section_data["replace"]
replace_val = self._process_config_string(
replace_val, new_version, version
)
# Update replace values in file
with open(file_path, "r") as file:
filedata = file.read()
filedata = filedata.replace(search_val, replace_val)
with open(file_path, "w") as file:
file.write(filedata)
else:
logger.warning(
f"Tried to version file: '{file_path}' but it doesn't exist!"
)
def run(self, push=True):
"""
Run the versioning process
1) get branches from last commit message
2) see if we're merging into a main branch
3) see what type of versioning we should do
4) version the repo
:param push: Whether or not to push the changes
"""
self._branch = self._scm.get_branch()
self._merged_branch = self._scm.get_merge_branch()
if not self._merged_branch:
raise NoMergeFoundException()
if self._branch not in self._main_branches:
raise NotMainBranchException()
self._version_type = self._scm.get_version_type(
self._branch,
self._major_branches,
self._minor_branches,
self._patch_branches,
)
if not self._version_type:
raise NoGitFlowException()
self._version_repo()
if push:
self._scm.commit_and_push(self._branch)

View File

@ -1,72 +0,0 @@
import subprocess
from typing import Union, List
try:
from subprocess import DEVNULL # py3k
except ImportError:
import os
DEVNULL = open(os.devnull, "wb")
import toml
from semver.logger import logging, logger, console_logger
def get_tag_version() -> str:
"""
Get the latest tagged version from git tags
:return: The latest tagged version
"""
config: dict = toml.load("./.bumpversion.cfg")
tag_expression: str = config["bumpversion"]["tag_name"].replace(
"{new_version}", "[0-9]*.[0-9]*.[0-9]*"
)
logger.debug("Tag expression: " + str(tag_expression))
# Default version is `0.0.0` or what is found in
version = get_file_version(config)
# If a version is found in git tags, use that the latest tagged version
tagged_versions: Union[List[str], None] = None
try:
proc = subprocess.run(
["git", "tag", "--sort=v:refname", "-l", tag_expression],
capture_output=True,
text=True,
check=True,
)
tagged_versions = proc.stdout.rstrip().split("\n")
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Error getting latest tagged git version: {str(e.stderr).rstrip()}"
)
if len(tagged_versions) > 0 and tagged_versions[-1] != "":
version = tagged_versions[-1]
logger.debug("Tag Version: " + str(version))
return version
def get_file_version(config: dict) -> str:
"""
:param config: The bumpversion config as a dict
:return: The current version from the config file
"""
bumpversion: Union[str, None] = config.get("bumpversion", None)
version: Union[str, None] = (
bumpversion.get("current_version", None) if bumpversion else None
)
if not bumpversion:
config["bumpversion"] = {}
version = "0.0.0"
if not version:
config["bumpversion"]["current_version"] = "0.0.0"
version = "0.0.0"
return version

7
semver/version_type.py Normal file
View File

@ -0,0 +1,7 @@
from enum import IntEnum
class VersionType(IntEnum):
MAJOR = 0
MINOR = 1
PATCH = 2