Table of Contents:
- Installation
- ToolKit Utilities
- ToolKit Dataclasses
- Maniuplating Dictionaries
- Utilities
- Internet Utilities
- Splunk
- Upload
Python General tools
Installation
Package found at PyPi
Code found at GitHub
>>> python -m pip install pytoolkit928
string_or_list
function allows you to interpret a string and return a list. It provides you the option of adding a delimiter using an OR function to return a possible string that you may be expecting possible common delimiters. Such as: ",|:|\||.| "
.
Example:
>>> from pytoolkit.utils import string_or_list
>>> test = 'string1,string2 string3|string4'
>>> string_or_list(test)
['string1,string2 string3|string4']
>>> string_or_list(test,delimeters=',| ')
['string1', 'string2', 'string3|string4']
>>> string_or_list(test,delimeters=',| |\|')
['string1', 'string2', 'string3', 'string4']
Used for basic extended functionality for data class declarations. Includes the ability to create a dataclass from a dictionary
or from **kwargs
. Also, includes a conversion from Dataclass
to a Python dictionary
.
Usage:
from typing import Optional
from dataclasses import dataclass
from pytoolkit.utilities import BaseMonitor, NONETYPE
@dataclass
class Sample(BaseMonitor):
key1: str
key2: str
key3: int
key5: Optional[str] = NONETYPE
# create a sample module
_dict = {"key1": "value1", "key2": "value2", "key3": 3}
sample1 = Sample.create_from_dict(_dict)
sample2 = Sample.create_from_kwargs(**_dict)
print(sample1)
print(sample2)
print(sample1.to_dict())
OUTPUT:
>>> print(sample1)
Sample(key1='value1', key2='value2', key3=3, key5=<object object at 0x10c8e8b70>)
>>> print(sample2)
Sample(key1='value1', key2='value2', key3=3, key5=<object object at 0x10c8e8b70>)
>>> print(sample1.to_dict())
{'key1': 'value1', 'key2': 'value2', 'key3': 3}
Maniuplating Dictionaries
Flatten a Dictionary:
import json
from pytoolkit import utilities
sample_dict = {"key1":"value","key2": "value2", "metadata": {"key1": "meta_value1","key2":"meta_value2"}}
# Convert dictionary into a flat dictionary
flat_dict = utilities.flatten_dict(sample_dict)
# Convert dictionary back into a nested ditionary
nest_dict = utilities.nested_dict(flat_dict)
print(f"This is a Flattened Dictionary:\n{json.dumps(flat_dict,indent=1)}")
print(f"This is a Nested Dictionary:\n{json.dumps(nest_dict,indent=1)}")
OUTPUT:
This is a Flattened Dictionary:
{
"key1": "value",
"key2": "value2",
"metadata.key1": "meta_value1",
"metadata.key2": "meta_value2"
}
This is a Nested Dictionary:
{
"key1": "value",
"key2": "value2",
"metadata": {
"key1": "meta_value1",
"key2": "meta_value2"
}
}
The above uses the default '.'
separator value. A mix of commands can be passed to adjust how the dictionary is expressed. This is useful for expressing data in other formats that do not allow for nested dictionaries but need a way to recreate the original structured data structure.
Nested Dictionary:
Utilities
Sanatize dictionary data
from pytoolkit.utils import sanatize_data
test_dict = {
"value1": "one",
"value2": "two",
"subvalue01": {
"password": "welcome123",
"username": "testuser",
}
}
sanatized_dict = sanatize_data(data=test_dict)
print(sanatize_dict)
# {"value1": "one", "value2": "two", "subvalue01": { "password": "[MASKED]", "username": "testuser"}}
Internet Utilities
Convert MAC Values:
def convert_mac(
mac: str,
mac_format: str = ":",
remove: bool = False,
to_lower: bool = True,
split_by: int = 2,
) -> str:
"""
Convert a Mac address into regular string or use seperators such as `:` or `-` for every 2 values.
:param mac: Mac Address
:type mac: str
:param mac_format: _description_, defaults to `':'`
:type mac_format: _type_, optional
:param remove: Remove formater, must be split by 2., defaults to `False`
:type remove: bool, optional
:param to_lower: _description_, defaults to True
:type to_lower: bool, optional
:param to_lower: Return all values in lower case; if set to `False` will return in uppercase, defaults to `True`
:return: Formated Mac Address
:rtype: str
:param split_by: Split MAC Address by 2 or 4, defaults to `2`
:type split_by: bool, optional
:raises ValueError: Invalid MAC format.
:return: _description_
:rtype: str
"""
mac_addr = ""
if split_by not in [2, 4]:
raise ValueError(f"Split by {str(split_by)} invalid must be 2 or 4")
if remove:
pattern = re.compile(
"^[a-f0-9]{2}"
+ f"{mac_format}"
+ "[a-f0-9]{2}"
+ f"{mac_format}"
+ "[a-f0-9]{2}"
+ f"{mac_format}"
+ "[a-f0-9]{2}"
+ f"{mac_format}"
+ "[a-z0-9]{2}"
+ f"{mac_format}"
+ "[a-f0-9]{2}$",
re.IGNORECASE,
)
if not pattern.match(mac):
raise ValueError(f"Unable to remove {mac_format} due to invalid MAC {mac}")
mac_addr = re.sub(rf"{mac_format}", "", mac)
else:
pattern = re.compile("^[a-f0-9]{12}$", re.IGNORECASE)
if not pattern.match(mac):
raise ValueError(
f"Unable to reformat MAC using {mac_format} due to invalid MAC {mac}"
)
mac_addr = f"{mac_format}".join(
mac[i : i + split_by] for i in range(0, 12, split_by)
)
return mac_addr.lower() if to_lower else mac_addr.upper()
Splunk
Use to create a header for Splunk to upload HEC formatted data.
Hec Formatter:
from pytoolkit.py_splunk import SplunkHecHeader
Format data based on passed **kwargs
:
def splunk_hec_format(
host: str,
source: str,
sourcetype: str,
metrics_list: Union[list[str], None] = None,
**kwargs: Any,
) -> dict[str, Any]:
"""Create a JSON style hec format.
:param host: hostname
:type host: str
:param source_name: source of dataa
:type source_name: str
:param metrics_list: list of metrics type fileds found in arguments
:type metrics_list: list[str]
:param kwargs: key:value pairs to extract and format data structure
:return: Splunk Hec Datastructure
:rtype: dict[str,Any]
"""
hec_json: dict[str, Any] = {
"time": kwargs.pop("time", datetime.datetime.now().timestamp()),
"host": host,
"source": source,
"sourcetype": sourcetype,
"event": {},
}
if metrics_list:
# Build HEC Style Metrics
hec_json["fields"] = {
f"metric_name:{metric}": kwargs.pop(metric, None) for metric in metrics_list
}
hec_json["fields"] = dict(sorted(hec_json["fields"].items()))
hec_json["event"] = {**hec_json["event"], **kwargs}
hec_json["event"] = dict(sorted(hec_json["event"].items()))
return hec_json
Upload
splunk_hec_upload
is a function to upload Splunk data. It defaults to a list of 100 items that can be changed and adjusted as needed.
def splunk_hec_upload( # pylint: disable=too-many-arguments,too-many-locals
server: str,
token: str,
hec_data: list[dict[str, Any]],
timeout: float = 15.0,
verify: Union[str, bool] = True,
port: int = 8088,
chunk_size: int = 100,
log: Any = splunk_log,
):
"""
Upload Splunk Data.
:param server: _description_
:type server: str
:param token: _description_
:type token: str
:param hec_data: List of dictionary events.
:type hec_data: list[str,Any]
:param verify: Validation of Rest call
:type verify: [str|bool]
:param port: Port to use, defaults to 8088
:type port: int, optional
:param chunk_size: Set size to split up data into if too large;
hec_data must be a list of json entries, defaults to 100 (0 will indicate all)
:type chunk_size: int, optional
"""
if not verify:
log.error(f'msg="SSL Verficiation is off recommended this be fixed"|{verify=}')
urllib3.disable_warnings # pylint: disable=pointless-statement
url: str = f"https://{server}:{port}/{SPLUNK_HEC_EVENTPATH}"
headers: dict[str, str] = {
"Content-Type": "application/json",
"Authorization": f"Splunk {token}",
"X-Splunk-Request-Channel": token,
}
chunk_data: list[list[dict[str,Any]]] = (
chunk(hec_data, chunk_size) if len(hec_data) > chunk_size > 0 else [hec_data]
)
resp_list: list[dict[str, Any]] = []
for payload in chunk_data:
response: requests.Response = requests.post(
url, headers=headers, json=payload, verify=verify, timeout=timeout
)
splunk_log.info(
f'msg="uploaded splunk data response"|status_code={response.status_code}, response={response.json()}'
)
resp_list.append(
{
"status_code": response.status_code,
"payload_len": len(payload),
"message": response.json() if response.json() else "",
}
)
try:
response.raise_for_status()
except Exception as err:
error = reformat_exception(err)
splunk_log.error(
f'msg="Unable to upload data to splunk server"|splunk_server={server}, {error=}'
)
return resp_list