Source code for autoflow.utils.dict

from collections import defaultdict
from copy import deepcopy
from typing import Dict, Any, List, Callable

from autoflow.utils.list import remove_suffix_in_list
from autoflow.utils.logging_ import get_logger

logger = get_logger(__name__)


[docs]def add_prefix_in_dict_keys(dict_: Dict[str, Any], prefix: str) -> Dict[str, Any]: result = {} for key, value in dict_.items(): result[f"{prefix}{key}"] = value return result
[docs]def group_dict_items_before_first_token(dict_: Dict[str, Any], token: str) -> Dict[str, Dict[str, Any]]: result = defaultdict(dict) for packages, value in dict_.items(): if token in packages: split_list = packages.split(token) key1 = token.join(split_list[:-1]) key2 = split_list[-1] else: key1 = "single" key2 = packages result[key1][key2] = value return result
[docs]def replace_kv(dict_: Dict, rk, rv): for k, v in dict_.items(): if isinstance(v, dict): replace_kv(v, rk, rv) elif k == rk: dict_[k] = rv
[docs]def sort_dict(obj): if isinstance(obj, dict): for k, v in obj.items(): obj[k] = sort_dict(v) return dict(sorted(obj.items(), key=lambda x: str(x[0]))) elif isinstance(obj, list): for i, elem in enumerate(obj): obj[i] = sort_dict(elem) return list(sorted(obj, key=str)) else: return obj
[docs]def remove_None_value(dict_: dict) -> dict: result = {} for k, v in dict_.items(): if v is not None: result[k] = v return result
[docs]class GlobalList: data = []
def _find_token_in_dict(dict_: dict, token, path=[]): # -> Iterator[Tuple[List[str], str]]: for key, value in dict_.items(): if key == token: # yield list(path), "key" GlobalList.data.append([list(path), "key"]) elif isinstance(value, str) and value == token: # yield list(path) + [key], "value" GlobalList.data.append([list(path) + [key], "value"]) elif isinstance(value, dict): _find_token_in_dict(value, token, list(path) + [key])
[docs]def find_token_in_dict(dict_: dict, token): GlobalList.data = [] _find_token_in_dict(dict_, token) return GlobalList.data
[docs]def read_dict_in_path_mode(dict_: dict, path: List[str]): for i in path: dict_ = dict_[i] return dict_
[docs]def write_dict_in_path_mode(dict_: dict, path: List[str], value): for i in path[:-1]: dict_ = dict_[i] dict_[path[-1]] = value
[docs]def update_mask_from_other_dict(should_update: Dict, template: Dict, ignored_suffix: str = "(choice)"): token = "<mask>" updated = deepcopy(should_update) for path, type_ in find_token_in_dict(updated, token): origin_path = path template_path = remove_suffix_in_list(path, ignored_suffix) if type_ == "key": # origin_value = updated[origin_path][token] origin_value = read_dict_in_path_mode(updated, origin_path)[token] template_item = read_dict_in_path_mode(template, template_path) assert isinstance(template_item, dict) template_keys = list(template_item.keys()) assert len(template_keys) == 1 template_key = template_keys[0] template_value: dict = template_item[template_key] template_value.update(origin_value) write_dict_in_path_mode(updated, origin_path, {template_key: template_value}) elif type_ == "value": write_dict_in_path_mode(updated, origin_path, read_dict_in_path_mode(template, template_path)) else: raise NotImplementedError return updated
[docs]def filter_item_by_key_condition(dict_: dict, func: Callable) -> dict: available_keys = [key for key in dict_ if func(key)] result = {key: dict_[key] for key in available_keys} return result
[docs]def update_data_structure(old_dict: dict, additional_dict: dict) -> dict: ''' >>> update_data_structure( ... {"key1": "a", "key2": [1, True, {"a", (2, )}], "key3": {"a":1, "b":2}}, ... {"key1": "b", "key2": [1, False], "key3": {"a":{"b":2}}, "key4": "new_value"} ... ) {"key1": "b", "key2": [1, True, {"a", (2, )}, 1, False], "key3": {"a":{"b":2, "b":2}}, "key4": "new_value"} Returns ------- updated_dict: dict ''' updated_dict = deepcopy(old_dict) for k, new_v in additional_dict.items(): if k in updated_dict: old_v = deepcopy(updated_dict[k]) new_v = deepcopy(new_v) if isinstance(old_v, dict): if isinstance(new_v, dict): old_v.update(new_v) else: logger.warning(f"In 'update_data_structure', old_v = {old_v}, but new_v = " f"{new_v} is not dict, cannot update.") elif isinstance(old_v, list): if not isinstance(new_v, list): new_v = [new_v] old_v += new_v else: old_v = new_v updated_dict[k] = old_v else: updated_dict[k] = additional_dict[k] return updated_dict
if __name__ == '__main__': updated_dict = update_data_structure( {"key1": "a", "key2": [1, True, {"a", (2,)}], "key3": {"a": 1, "b": 2}}, {"key1": "b", "key2": [1, False], "key3": {"a": {"b": 2}}, "key4": "new_value"} ) wanted = {"key1": "b", "key2": [1, True, {"a", (2,)}, 1, False], "key3": {"a": {"b": 2}, "b": 2}, "key4": "new_value"} print(updated_dict) print(wanted) print(updated_dict == wanted)