Module tagmaps.classes.utils
Module with general utility methods used in tag maps package.
Expand source code
# -*- coding: utf-8 -*-
"""Module with general utility methods used in tag maps package.
# delay evaluation of annotations at runtime (PEP 563)
from __future__ import absolute_import, annotations
import argparse
import hashlib
import io
import logging
import math
import os
import platform
import re
import sys
import unicodedata
import warnings
from collections import namedtuple
from datetime import timedelta
from importlib import reload
from math import asin, cos, radians, sin, sqrt
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
from emoji.core import distinct_emoji_list, demojize
import numpy as np
import pyproj
import shapely.geometry as geometry
from ..classes.shared_structure import AnalysisBounds, ConfigMap, ItemCounter
class Utils:
"""Collection of various tools and helper functions
Primarily @classmethods and @staticmethods
def check_folder_file(folder_file: Path, create_folder: bool = None) -> Path:
"""Check if folder exists, optionally create it"""
if not folder_file.exists():
pname = "File"
if folder_file.is_dir():
return folder_file
pname = "Folder"
if create_folder:
return folder_file
raise ValueError(f"{pname} {folder_file} not found.")
return folder_file
def check_fileheader(
fieldnames: Iterable[str], source_map: ConfigMap, filename: Optional[str]
"""Checks against existing file columns
warns if required keys are missing
source_map (ConfigMap): Custom headers definition
Note: the type hint above makes use of forward-references
defined in PEP 484, as a means to avoid cyclic imports
header_req = [
for header in header_req:
if header not in fieldnames:
raise Warning(
f'File header is missing "{header}"-column, ' f"file: {filename}"
def _count_none(str_list: Union[Dict[str, str], List[str]]) -> int:
"""Return count of list, returns zero for None"""
if str_list is None:
return 0
return len(str_list)
def report_listload(list_ref, str_text):
"""Report only if list_ref contains at least 1 entry."""
entry_count = Utils._count_none(list_ref)
if entry_count > 0:
logging.getLogger("tagmaps").info(f"Loaded {entry_count} {str_text}.")
def concat_topic(term_list):
"""Concatenate list of terms (e.g. TOPIC) to string"""
if any("-" in s for s in term_list):
raise ValueError("No '-' characters supported in topic list terms")
topic_name = "-".join(term_list)
return topic_name
def split_topic(term_concat):
"""Split concat topic"""
topic_terms = term_concat.split("-")
return topic_terms
def set_proj_dir():
"""Update PROJ_LIB location if not found
Leftover from pyproj < 2.0.0 compatibility,
PROJ_LIB not needed anymore
if not os.environ.get("PROJ_LIB"):
local_proj_path = Path.cwd() / "proj"
if not local_proj_path.exists():
raise ValueError(
"Pyproj 'proj' datadir not found. Either specify "
"PROJ_LIB environmental variable or copy 'proj' "
"folder to local path of executable"
os.environ["PROJ_LIB"] = str(local_proj_path)
def get_shapely_bounds(bounds: AnalysisBounds) -> geometry.MultiPoint:
"""Returns boundary shape from 4 coordinates"""
bound_points_shapely = geometry.MultiPoint(
(bounds.lim_lng_min, bounds.lim_lat_min),
(bounds.lim_lng_max, bounds.lim_lat_max),
return bound_points_shapely
def get_best_utmzone(bound_points_shapely: geometry.MultiPoint):
"""Calculate best UTM Zone SRID/EPSG Code
True centroid (coords may be multipoint)"""
input_lon_center = bound_points_shapely.centroid.coords[0][0]
input_lat_center = bound_points_shapely.centroid.coords[0][1]
epsg_code = Utils._convert_wgs_to_utm(input_lon_center, input_lat_center)
crs_proj = f"epsg:{epsg_code}"
return crs_proj, epsg_code
def _convert_wgs_to_utm(lon: float, lat: float):
""" "Return best epsg code for pair
of WGS coordinates (lat/lng)
lon: latitude
lat: longitude
[type]: [description]
utm_band = str((math.floor((lon + 180) / 6) % 60) + 1)
if len(utm_band) == 1:
utm_band = "0" + utm_band
if lat >= 0:
epsg_code = "326" + utm_band
epsg_code = "327" + utm_band
return epsg_code
def encode_string(text_s):
"""Encode string in Sha256,
produce hex
- returns a string of double length,
containing only hexadecimal digits"""
encoded_string = hashlib.sha3_256(text_s.encode("utf8")).hexdigest()
return encoded_string
def remove_special_chars(text_s):
"""Removes a list of special chars from string"""
special_chars = "?.!/;:,[]()'-&#|<>=\""
s_cleaned = text_s.translate({ord(c): " " for c in special_chars})
return s_cleaned
def select_words(text_s, selection_list: List[str]) -> str:
"""Filters a string based on a provided
positive filter list of terms
- removes duplicate terms
# first remove hyperlinks
text_s = Utils.remove_hyperlinks(text_s)
# split string by space character into list
querywords = text_s.split()
resultwords = {word for word in querywords if word.lower() in selection_list}
s_cleaned = " ".join(resultwords)
return s_cleaned
def select_emoji(
input_emoji_set: Set[str], selection_emoji_set: Set[str] = None
) -> Set[str]:
"""Filters a set of emoji based on another set"""
if selection_emoji_set is None:
# no filter on empty selection list
return input_emoji_set
filtered_emoji_set = {
emoji for emoji in input_emoji_set if emoji in selection_emoji_set
return filtered_emoji_set
def remove_stopwords(text_s, stopwords: List[str]) -> str:
"""Removes a list of words from string,
including hyperlinks (<a></a>) and
integer numbers (e.g. 2012)
# first remove hyperlinks
text_s = Utils.remove_hyperlinks(text_s)
# split string by space character into list
querywords = text_s.split()
# clean list by matching against stopwords
resultwords = [
for word in querywords
if word.lower() not in stopwords and not word.isdigit()
s_cleaned = " ".join(resultwords)
return s_cleaned
def remove_hyperlinks(text_s):
"""Regex to remove any hyperlinks from string
- anything between <a>xxx</a> will be kept
pattern = r"<(a|/a).*?>"
result = re.sub(pattern, "", text_s)
return result
def _is_number(number_s):
"""Check if variable is number (float)"""
return True
except ValueError:
return False
def check_intersect_polylist(latlng_point, polylist, poly_exclude_list=None):
"""Checks intersection of Point(lat, lng) against
list of polygons.
latlng_point {Fiona Point} -- coordinate
polylist {list} -- list of polys
poly_exclude_list {list} -- list of polys to exclude
include = False
if polylist is not None:
for poly in polylist:
if latlng_point.within(poly):
include = True
if include is False:
return False
include = True
if poly_exclude_list is not None:
for poly in poly_exclude_list:
if latlng_point.within(poly):
return False
if include is True:
return True
return False
def init_main():
"""Initializing main procedure
if package is executed directly
- disables fiona logging, as it (somehow)
interferes with tag maps logging
(find better solution)
# set console view parameters
# stretch console
if platform.system() == "Windows":
os.system("mode con: cols=197 lines=500")
logging.getLogger("fiona.collection").disabled = True
def set_logger(output_folder: Path = None, logging_level=None):
"""Set logging handler manually,
so we can also print to console while logging to file
# reset logging in case Jupyter Notebook has
# captured stdout
# Create or get logger with specific name
log = logging.getLogger("tagmaps")
if not log:
# no logging handler found for tagmaps
# indicates package import mode
if log.handlers:
# only add log handlers once
return log
if logging_level is None:
logging_level = logging.INFO
if output_folder is not None:
if not output_folder.exists():
# input(f'{type(output_folder)}')
__log_file = output_folder / "log.txt"
log.format = "%(message)s" # type: ignore
log.datefmt = "" # type: ignore
# Set Output to Replace in case of
# encoding issues (console/windows)
if isinstance(sys.stdout, io.TextIOWrapper):
# only for console output (not Juypter Notebook stream)
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), sys.stdout.encoding, "replace"
) # type: ignore
if output_folder is not None:
# only log to file in console mode
logging.FileHandler(__log_file, "w", "utf-8")
) # type: ignore
# log to stdout, not stderr in Jupyter Mode to prevent
# log.Info messages appear as red boxes
format=log.format, # type: ignore
# = sys.stdout
# flush once to clear console
# sys.stdout.flush()
return log
def init_dir(path_folder: Path):
"""Creates local dir if not exists"""
if not path_folder.exists():
print(f"Folder {}/ was created")
def query_yes_no(question, default="yes"):
"""Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is True for "yes" or False for "no".
valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False}
if default is None:
prompt = " [y/n] "
elif default == "yes":
prompt = " [Y/n] "
elif default == "no":
prompt = " [y/N] "
raise ValueError("invalid default answer: '%s'" % default)
while True:
sys.stdout.write(question + prompt)
choice = input().lower()
if default is not None and choice == "":
return valid[default]
elif choice in valid:
return valid[choice]
sys.stdout.write("'yes' or 'no' (or 'y' or 'n').\n")
def daterange(start_date, end_date):
"""Return time difference between two dates"""
for n_val in range(int((end_date - start_date).days)):
yield start_date + timedelta(n_val)
def haversine(lon1, lat1, lon2, lat2):
Calculate the great circle distance between two points
on the earth (specified in decimal degrees)
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a_value = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
c_value = 2 * asin(sqrt(a_value))
# Radius of earth in kilometers is 6371
km_dist = 6371 * c_value
m_dist = km_dist * 1000
return m_dist
def get_radians_from_meters(dist):
"""Get approx. distance in radians from meters
dist (float): Distance in meters
float: Distance in radians
- 1 Radian is about 57.2958 degrees.
- then see:
- Multiply the number of degrees by 111.325
- To convert this to meters, multiply by 1,000.
So, 2 degrees is about 222,65 meters.
dist = dist / 1000
degrees_dist = dist / 111.325
radians_dist = degrees_dist / 57.2958
return radians_dist
def get_meters_from_radians(dist):
"""Get approx. distance in meters from radians
dist (float): Distance in radians
float: Distance in meters
- 1 Radian is about 57.2958 degrees.
- then see
- Multiply the number of degrees by 111.325
- To convert this to meters, multiply by 1,000.
So, 2 degrees is about 222,65 meters.
dist = dist * 57.2958
dist = dist * 111.325
meters_dist = round(dist * 1000, 1)
return meters_dist
def get_emojiname(emoji_string):
""" "Tries to get a name representation for
emoji. Emoji can either be a single character,
or a number of characters that construct a grapheme cluster.
Therefore, cannot directly be applied.
For some grapheme clusters, there exists no
emoji_name = None
# first try to get name for whole str
if len(emoji_string) == 1:
# if single character
emoji_name = Utils._get_unicode_name(emoji_string)
if not emoji_name:
for char_s in emoji_string:
emoji_name = Utils._get_unicode_name(char_s)
if emoji_name:
if not emoji_name:
emoji_name = demojize(emoji_string)
if not emoji_name:
warnings.warn(f"No name found for {emoji_string}")
return ""
return emoji_name
def _get_unicode_name(emoji_string_or_char):
emojiname =
return emojiname
except ValueError:
return False
def _check_emoji_type(char_unicode):
"""Checks if emoji code point is of specific type,
or ZERO WIDTH modifier
To check: Is this method really needed?
# name = name(str_emoji)
return False
return True
except ValueError:
return True
def extract_emoji(string_with_emoji: Optional[str]) -> Set[str]:
"""Extract emoji and flags using emoji package"""
if not string_with_emoji:
# empty
return set()
return set(distinct_emoji_list(string_with_emoji))
def str2bool(str_text):
"""Convert any type of yes no string to
bool representation"""
if str_text.lower() in ("yes", "true", "t", "y", "1"):
return True
elif str_text.lower() in ("no", "false", "f", "n", "0"):
return False
raise argparse.ArgumentTypeError("Boolean value expected.")
def get_rectangle_bounds(points):
"""Get rectangle bounds for numpy.ndarray of point coordinates"""
RectangleBounds = namedtuple(
"RectangleBounds", "lim_lat_min lim_lat_max lim_lng_min lim_lng_max"
lim_y_min = np.min(points.T[1])
lim_y_max = np.max(points.T[1])
lim_x_min = np.min(points.T[0])
lim_x_max = np.max(points.T[0])
return RectangleBounds(lim_y_min, lim_y_max, lim_x_min, lim_x_max)
def filter_tags(
taglist: Set[str],
sort_out_always_set: Set[str],
sort_out_always_instr_set: Set[str],
select_tags_set: Set[str] = None,
) -> Tuple[Set[str], int, int]:
"""Filter list of tags based on two stoplists
- also removes numeric items and duplicates
- extracts only tags with len(s) > 1
- optionally filters list according to positive selection list
taglist (Iterable[str]): List/Set of input tags to filter
sort_out_always_set (Set[str]): Filter complete match
sort_out_always_instr_set (Set[str]): Filter partial match
select_tags_set (Set[str]): Positive filter list
Tuple[Set[str], int, int]: Filtered list,
length of list and
skipped number of items
count_tags = 0
count_skipped = 0
tags_filtered = set()
for tag in taglist:
count_tags += 1
if select_tags_set is not None:
# positive list available,
# return only selected items
if tag in select_tags_set:
count_skipped += 1
# exclude numbers and those tags
# that are in sort_out_always_set
# or sort_out_always_instr_set
if (
len(tag) == 1
or tag == '""'
or tag.isdigit()
or tag in sort_out_always_set
count_skipped += 1
for in_str_partial in sort_out_always_instr_set:
if in_str_partial in tag:
count_skipped += 1
# final else Clause on loop statement
return (tags_filtered, count_tags, count_skipped)
def get_index_of_item(l_tuple_str: List[ItemCounter], value: Optional[str]) -> int:
"""Get index pos from list of tuples.
Stops iterating through the list as
soon as it finds the value
for pos, tuple_str in enumerate(l_tuple_str):
if == value:
return pos
# Matches behavior of list.index
raise ValueError(f"{value}: x not in list")
def get_locname(item: str, loc_name_dict: Optional[Dict[str, str]]):
"""Gets location name from ID, if available"""
if loc_name_dict:
item_name = loc_name_dict.get(item, item)
# default to item if dict key not found
item_name = item
return item_name
class Utils
Collection of various tools and helper functions
Primarily @classmethods and @staticmethods
Expand source code
class Utils: """Collection of various tools and helper functions Primarily @classmethods and @staticmethods """ @staticmethod def check_folder_file(folder_file: Path, create_folder: bool = None) -> Path: """Check if folder exists, optionally create it""" if not folder_file.exists(): pname = "File" if folder_file.is_dir(): return folder_file pname = "Folder" if create_folder: Utils.init_dir(folder_file) return folder_file raise ValueError(f"{pname} {folder_file} not found.") return folder_file @staticmethod def check_fileheader( fieldnames: Iterable[str], source_map: ConfigMap, filename: Optional[str] ): """Checks against existing file columns warns if required keys are missing Args: source_map (ConfigMap): Custom headers definition Note: the type hint above makes use of forward-references defined in PEP 484, as a means to avoid cyclic imports """ header_req = [ source_map.post_guid_col, source_map.user_guid_col, source_map.latitude_col, source_map.longitude_col, source_map.tags_col, ] for header in header_req: if header not in fieldnames: raise Warning( f'File header is missing "{header}"-column, ' f"file: {filename}" ) @staticmethod def _count_none(str_list: Union[Dict[str, str], List[str]]) -> int: """Return count of list, returns zero for None""" if str_list is None: return 0 return len(str_list) @staticmethod def report_listload(list_ref, str_text): """Report only if list_ref contains at least 1 entry.""" entry_count = Utils._count_none(list_ref) if entry_count > 0: logging.getLogger("tagmaps").info(f"Loaded {entry_count} {str_text}.") @staticmethod def concat_topic(term_list): """Concatenate list of terms (e.g. TOPIC) to string""" if any("-" in s for s in term_list): raise ValueError("No '-' characters supported in topic list terms") topic_name = "-".join(term_list) return topic_name @staticmethod def split_topic(term_concat): """Split concat topic""" topic_terms = term_concat.split("-") return topic_terms @staticmethod def set_proj_dir(): """Update PROJ_LIB location if not found Leftover from pyproj < 2.0.0 compatibility, PROJ_LIB not needed anymore """ if not os.environ.get("PROJ_LIB"): local_proj_path = Path.cwd() / "proj" if not local_proj_path.exists(): raise ValueError( "Pyproj 'proj' datadir not found. Either specify " "PROJ_LIB environmental variable or copy 'proj' " "folder to local path of executable" ) os.environ["PROJ_LIB"] = str(local_proj_path) pyproj.datadir.set_data_dir(str(local_proj_path)) @staticmethod def get_shapely_bounds(bounds: AnalysisBounds) -> geometry.MultiPoint: """Returns boundary shape from 4 coordinates""" bound_points_shapely = geometry.MultiPoint( [ (bounds.lim_lng_min, bounds.lim_lat_min), (bounds.lim_lng_max, bounds.lim_lat_max), ] ) return bound_points_shapely @staticmethod def get_best_utmzone(bound_points_shapely: geometry.MultiPoint): """Calculate best UTM Zone SRID/EPSG Code Args: True centroid (coords may be multipoint)""" input_lon_center = bound_points_shapely.centroid.coords[0][0] input_lat_center = bound_points_shapely.centroid.coords[0][1] epsg_code = Utils._convert_wgs_to_utm(input_lon_center, input_lat_center) crs_proj = f"epsg:{epsg_code}" return crs_proj, epsg_code @staticmethod def _convert_wgs_to_utm(lon: float, lat: float): """ "Return best epsg code for pair of WGS coordinates (lat/lng) Args: lon: latitude lat: longitude Returns: [type]: [description] See: """ utm_band = str((math.floor((lon + 180) / 6) % 60) + 1) if len(utm_band) == 1: utm_band = "0" + utm_band if lat >= 0: epsg_code = "326" + utm_band else: epsg_code = "327" + utm_band return epsg_code @staticmethod def encode_string(text_s): """Encode string in Sha256, produce hex - returns a string of double length, containing only hexadecimal digits""" encoded_string = hashlib.sha3_256(text_s.encode("utf8")).hexdigest() return encoded_string @staticmethod def remove_special_chars(text_s): """Removes a list of special chars from string""" special_chars = "?.!/;:,[]()'-&#|<>=\"" s_cleaned = text_s.translate({ord(c): " " for c in special_chars}) return s_cleaned @staticmethod def select_words(text_s, selection_list: List[str]) -> str: """Filters a string based on a provided positive filter list of terms - removes duplicate terms """ # first remove hyperlinks text_s = Utils.remove_hyperlinks(text_s) # split string by space character into list querywords = text_s.split() resultwords = {word for word in querywords if word.lower() in selection_list} s_cleaned = " ".join(resultwords) return s_cleaned @staticmethod def select_emoji( input_emoji_set: Set[str], selection_emoji_set: Set[str] = None ) -> Set[str]: """Filters a set of emoji based on another set""" if selection_emoji_set is None: # no filter on empty selection list return input_emoji_set filtered_emoji_set = { emoji for emoji in input_emoji_set if emoji in selection_emoji_set } return filtered_emoji_set @staticmethod def remove_stopwords(text_s, stopwords: List[str]) -> str: """Removes a list of words from string, including hyperlinks (<a></a>) and integer numbers (e.g. 2012) """ # first remove hyperlinks text_s = Utils.remove_hyperlinks(text_s) # split string by space character into list querywords = text_s.split() # clean list by matching against stopwords resultwords = [ word for word in querywords if word.lower() not in stopwords and not word.isdigit() ] s_cleaned = " ".join(resultwords) return s_cleaned @staticmethod def remove_hyperlinks(text_s): """Regex to remove any hyperlinks from string Note: - anything between <a>xxx</a> will be kept """ pattern = r"<(a|/a).*?>" result = re.sub(pattern, "", text_s) return result @staticmethod def _is_number(number_s): """Check if variable is number (float)""" try: float(number_s) return True except ValueError: return False @staticmethod def check_intersect_polylist(latlng_point, polylist, poly_exclude_list=None): """Checks intersection of Point(lat, lng) against list of polygons. Arguments: latlng_point {Fiona Point} -- coordinate polylist {list} -- list of polys poly_exclude_list {list} -- list of polys to exclude """ include = False if polylist is not None: for poly in polylist: if latlng_point.within(poly): include = True break if include is False: return False else: include = True if poly_exclude_list is not None: for poly in poly_exclude_list: if latlng_point.within(poly): return False if include is True: return True return False @staticmethod def init_main(): """Initializing main procedure if package is executed directly TODO: - disables fiona logging, as it (somehow) interferes with tag maps logging (find better solution) """ # set console view parameters # stretch console if platform.system() == "Windows": os.system("mode con: cols=197 lines=500") logging.getLogger("fiona.collection").disabled = True @staticmethod def set_logger(output_folder: Path = None, logging_level=None): """Set logging handler manually, so we can also print to console while logging to file """ # reset logging in case Jupyter Notebook has # captured stdout reload(logging) # Create or get logger with specific name log = logging.getLogger("tagmaps") if not log: # no logging handler found for tagmaps # indicates package import mode return if log.handlers: # only add log handlers once return log if logging_level is None: logging_level = logging.INFO if output_folder is not None: if not output_folder.exists(): Utils.init_dir(output_folder) # input(f'{type(output_folder)}') __log_file = output_folder / "log.txt" log.format = "%(message)s" # type: ignore log.datefmt = "" # type: ignore log.setLevel(logging_level) # Set Output to Replace in case of # encoding issues (console/windows) if isinstance(sys.stdout, io.TextIOWrapper): # only for console output (not Juypter Notebook stream) sys.stdout = io.TextIOWrapper( sys.stdout.detach(), sys.stdout.encoding, "replace" ) # type: ignore log.addHandler(logging.StreamHandler()) if output_folder is not None: # only log to file in console mode log.addHandler( logging.FileHandler(__log_file, "w", "utf-8") ) # type: ignore else: # log to stdout, not stderr in Jupyter Mode to prevent # log.Info messages appear as red boxes logging.basicConfig( stream=sys.stdout, format=log.format, # type: ignore level=logging_level, datefmt=None, ) # = sys.stdout # flush once to clear console # sys.stdout.flush() return log @staticmethod def init_dir(path_folder: Path): """Creates local dir if not exists""" if not path_folder.exists(): path_folder.mkdir() print(f"Folder {}/ was created") @staticmethod def query_yes_no(question, default="yes"): """Ask a yes/no question via raw_input() and return their answer. "question" is a string that is presented to the user. "default" is the presumed answer if the user just hits <Enter>. It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no". """ valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == "": return valid[default] elif choice in valid: return valid[choice] sys.stdout.write("'yes' or 'no' (or 'y' or 'n').\n") @staticmethod def daterange(start_date, end_date): """Return time difference between two dates""" for n_val in range(int((end_date - start_date).days)): yield start_date + timedelta(n_val) @staticmethod def haversine(lon1, lat1, lon2, lat2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a_value = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c_value = 2 * asin(sqrt(a_value)) # Radius of earth in kilometers is 6371 km_dist = 6371 * c_value m_dist = km_dist * 1000 return m_dist @staticmethod def get_radians_from_meters(dist): """Get approx. distance in radians from meters Args: dist (float): Distance in meters Returns: float: Distance in radians - 1 Radian is about 57.2958 degrees. - then see: - Multiply the number of degrees by 111.325 - To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters. """ dist = dist / 1000 degrees_dist = dist / 111.325 radians_dist = degrees_dist / 57.2958 return radians_dist @staticmethod def get_meters_from_radians(dist): """Get approx. distance in meters from radians distance Args: dist (float): Distance in radians Returns: float: Distance in meters - 1 Radian is about 57.2958 degrees. - then see - Multiply the number of degrees by 111.325 - To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters. """ dist = dist * 57.2958 dist = dist * 111.325 meters_dist = round(dist * 1000, 1) return meters_dist @staticmethod def get_emojiname(emoji_string): """ "Tries to get a name representation for emoji. Emoji can either be a single character, or a number of characters that construct a grapheme cluster. Therefore, cannot directly be applied. For some grapheme clusters, there exists no """ emoji_name = None # first try to get name for whole str if len(emoji_string) == 1: # if single character emoji_name = Utils._get_unicode_name(emoji_string) if not emoji_name: for char_s in emoji_string: emoji_name = Utils._get_unicode_name(char_s) if emoji_name: break if not emoji_name: emoji_name = demojize(emoji_string) if not emoji_name: warnings.warn(f"No name found for {emoji_string}") return "" return emoji_name @staticmethod def _get_unicode_name(emoji_string_or_char): try: emojiname = return emojiname except ValueError: return False @staticmethod def _check_emoji_type(char_unicode): """Checks if emoji code point is of specific type, e.g.: EMOJI MODIFIER, VARIATION SELECTOR or ZERO WIDTH modifier To check: Is this method really needed? """ # name = name(str_emoji) try: if ("EMOJI MODIFIER", "VARIATION SELECTOR", "ZERO WIDTH") ): return False return True except ValueError: print(char_unicode) return True @staticmethod def extract_emoji(string_with_emoji: Optional[str]) -> Set[str]: """Extract emoji and flags using emoji package""" if not string_with_emoji: # empty return set() return set(distinct_emoji_list(string_with_emoji)) @staticmethod def str2bool(str_text): """Convert any type of yes no string to bool representation""" if str_text.lower() in ("yes", "true", "t", "y", "1"): return True elif str_text.lower() in ("no", "false", "f", "n", "0"): return False raise argparse.ArgumentTypeError("Boolean value expected.") @staticmethod def get_rectangle_bounds(points): """Get rectangle bounds for numpy.ndarray of point coordinates""" RectangleBounds = namedtuple( "RectangleBounds", "lim_lat_min lim_lat_max lim_lng_min lim_lng_max" ) lim_y_min = np.min(points.T[1]) lim_y_max = np.max(points.T[1]) lim_x_min = np.min(points.T[0]) lim_x_max = np.max(points.T[0]) return RectangleBounds(lim_y_min, lim_y_max, lim_x_min, lim_x_max) @staticmethod def filter_tags( taglist: Set[str], sort_out_always_set: Set[str], sort_out_always_instr_set: Set[str], select_tags_set: Set[str] = None, ) -> Tuple[Set[str], int, int]: """Filter list of tags based on two stoplists - also removes numeric items and duplicates - extracts only tags with len(s) > 1 - optionally filters list according to positive selection list Args: taglist (Iterable[str]): List/Set of input tags to filter sort_out_always_set (Set[str]): Filter complete match sort_out_always_instr_set (Set[str]): Filter partial match select_tags_set (Set[str]): Positive filter list Returns: Tuple[Set[str], int, int]: Filtered list, length of list and skipped number of items """ count_tags = 0 count_skipped = 0 tags_filtered = set() for tag in taglist: count_tags += 1 if select_tags_set is not None: # positive list available, # return only selected items if tag in select_tags_set: tags_filtered.add(tag) else: count_skipped += 1 else: # exclude numbers and those tags # that are in sort_out_always_set # or sort_out_always_instr_set if ( len(tag) == 1 or tag == '""' or tag.isdigit() or tag in sort_out_always_set ): count_skipped += 1 continue for in_str_partial in sort_out_always_instr_set: if in_str_partial in tag: count_skipped += 1 break else: # final else Clause on loop statement tags_filtered.add(tag) return (tags_filtered, count_tags, count_skipped) @staticmethod def get_index_of_item(l_tuple_str: List[ItemCounter], value: Optional[str]) -> int: """Get index pos from list of tuples. Stops iterating through the list as soon as it finds the value """ for pos, tuple_str in enumerate(l_tuple_str): if == value: return pos # Matches behavior of list.index raise ValueError(f"{value}: x not in list") @staticmethod def get_locname(item: str, loc_name_dict: Optional[Dict[str, str]]): """Gets location name from ID, if available""" if loc_name_dict: item_name = loc_name_dict.get(item, item) else: # default to item if dict key not found item_name = item return item_name
Static methods
def check_fileheader(fieldnames: Iterable[str], source_map: ConfigMap, filename: Optional[str])
Checks against existing file columns warns if required keys are missing
- Custom headers definition
- the type hint above makes use of forward-references
defined in PEP 484, as a means to avoid cyclic imports
Expand source code
@staticmethod def check_fileheader( fieldnames: Iterable[str], source_map: ConfigMap, filename: Optional[str] ): """Checks against existing file columns warns if required keys are missing Args: source_map (ConfigMap): Custom headers definition Note: the type hint above makes use of forward-references defined in PEP 484, as a means to avoid cyclic imports """ header_req = [ source_map.post_guid_col, source_map.user_guid_col, source_map.latitude_col, source_map.longitude_col, source_map.tags_col, ] for header in header_req: if header not in fieldnames: raise Warning( f'File header is missing "{header}"-column, ' f"file: {filename}" )
def check_folder_file(folder_file: Path, create_folder: bool = None) ‑> pathlib.Path
Check if folder exists, optionally create it
Expand source code
@staticmethod def check_folder_file(folder_file: Path, create_folder: bool = None) -> Path: """Check if folder exists, optionally create it""" if not folder_file.exists(): pname = "File" if folder_file.is_dir(): return folder_file pname = "Folder" if create_folder: Utils.init_dir(folder_file) return folder_file raise ValueError(f"{pname} {folder_file} not found.") return folder_file
def check_intersect_polylist(latlng_point, polylist, poly_exclude_list=None)
Checks intersection of Point(lat, lng) against list of polygons.
latlng_point {Fiona Point} – coordinate polylist {list} – list of polys poly_exclude_list {list} – list of polys to exclude
Expand source code
@staticmethod def check_intersect_polylist(latlng_point, polylist, poly_exclude_list=None): """Checks intersection of Point(lat, lng) against list of polygons. Arguments: latlng_point {Fiona Point} -- coordinate polylist {list} -- list of polys poly_exclude_list {list} -- list of polys to exclude """ include = False if polylist is not None: for poly in polylist: if latlng_point.within(poly): include = True break if include is False: return False else: include = True if poly_exclude_list is not None: for poly in poly_exclude_list: if latlng_point.within(poly): return False if include is True: return True return False
def concat_topic(term_list)
Concatenate list of terms (e.g. TOPIC) to string
Expand source code
@staticmethod def concat_topic(term_list): """Concatenate list of terms (e.g. TOPIC) to string""" if any("-" in s for s in term_list): raise ValueError("No '-' characters supported in topic list terms") topic_name = "-".join(term_list) return topic_name
def daterange(start_date, end_date)
Return time difference between two dates
Expand source code
@staticmethod def daterange(start_date, end_date): """Return time difference between two dates""" for n_val in range(int((end_date - start_date).days)): yield start_date + timedelta(n_val)
def encode_string(text_s)
Encode string in Sha256, produce hex
- returns a string of double length, containing only hexadecimal digits
Expand source code
@staticmethod def encode_string(text_s): """Encode string in Sha256, produce hex - returns a string of double length, containing only hexadecimal digits""" encoded_string = hashlib.sha3_256(text_s.encode("utf8")).hexdigest() return encoded_string
def extract_emoji(string_with_emoji: Optional[str]) ‑> Set[str]
Extract emoji and flags using emoji package
Expand source code
@staticmethod def extract_emoji(string_with_emoji: Optional[str]) -> Set[str]: """Extract emoji and flags using emoji package""" if not string_with_emoji: # empty return set() return set(distinct_emoji_list(string_with_emoji))
Filter list of tags based on two stoplists
- also removes numeric items and duplicates
- extracts only tags with len(s) > 1
- optionally filters list according to positive selection list
- List/Set of input tags to filter
- Filter complete match
- Filter partial match
- Positive filter list
Tuple[Set[str], int, int]
- Filtered list, length of list and skipped number of items
Expand source code
@staticmethod def filter_tags( taglist: Set[str], sort_out_always_set: Set[str], sort_out_always_instr_set: Set[str], select_tags_set: Set[str] = None, ) -> Tuple[Set[str], int, int]: """Filter list of tags based on two stoplists - also removes numeric items and duplicates - extracts only tags with len(s) > 1 - optionally filters list according to positive selection list Args: taglist (Iterable[str]): List/Set of input tags to filter sort_out_always_set (Set[str]): Filter complete match sort_out_always_instr_set (Set[str]): Filter partial match select_tags_set (Set[str]): Positive filter list Returns: Tuple[Set[str], int, int]: Filtered list, length of list and skipped number of items """ count_tags = 0 count_skipped = 0 tags_filtered = set() for tag in taglist: count_tags += 1 if select_tags_set is not None: # positive list available, # return only selected items if tag in select_tags_set: tags_filtered.add(tag) else: count_skipped += 1 else: # exclude numbers and those tags # that are in sort_out_always_set # or sort_out_always_instr_set if ( len(tag) == 1 or tag == '""' or tag.isdigit() or tag in sort_out_always_set ): count_skipped += 1 continue for in_str_partial in sort_out_always_instr_set: if in_str_partial in tag: count_skipped += 1 break else: # final else Clause on loop statement tags_filtered.add(tag) return (tags_filtered, count_tags, count_skipped)
def get_best_utmzone(bound_points_shapely: geometry.MultiPoint)
Calculate best UTM Zone SRID/EPSG Code Args: True centroid (coords may be multipoint)
Expand source code
@staticmethod def get_best_utmzone(bound_points_shapely: geometry.MultiPoint): """Calculate best UTM Zone SRID/EPSG Code Args: True centroid (coords may be multipoint)""" input_lon_center = bound_points_shapely.centroid.coords[0][0] input_lat_center = bound_points_shapely.centroid.coords[0][1] epsg_code = Utils._convert_wgs_to_utm(input_lon_center, input_lat_center) crs_proj = f"epsg:{epsg_code}" return crs_proj, epsg_code
def get_emojiname(emoji_string)
"Tries to get a name representation for emoji. Emoji can either be a single character, or a number of characters that construct a grapheme cluster. Therefore, cannot directly be applied. For some grapheme clusters, there exists no
Expand source code
@staticmethod def get_emojiname(emoji_string): """ "Tries to get a name representation for emoji. Emoji can either be a single character, or a number of characters that construct a grapheme cluster. Therefore, cannot directly be applied. For some grapheme clusters, there exists no """ emoji_name = None # first try to get name for whole str if len(emoji_string) == 1: # if single character emoji_name = Utils._get_unicode_name(emoji_string) if not emoji_name: for char_s in emoji_string: emoji_name = Utils._get_unicode_name(char_s) if emoji_name: break if not emoji_name: emoji_name = demojize(emoji_string) if not emoji_name: warnings.warn(f"No name found for {emoji_string}") return "" return emoji_name
def get_index_of_item(l_tuple_str: List[ItemCounter], value: Optional[str]) ‑> int
Get index pos from list of tuples.
Stops iterating through the list as soon as it finds the value
Expand source code
@staticmethod def get_index_of_item(l_tuple_str: List[ItemCounter], value: Optional[str]) -> int: """Get index pos from list of tuples. Stops iterating through the list as soon as it finds the value """ for pos, tuple_str in enumerate(l_tuple_str): if == value: return pos # Matches behavior of list.index raise ValueError(f"{value}: x not in list")
def get_locname(item: str, loc_name_dict: Optional[Dict[str, str]])
Gets location name from ID, if available
Expand source code
@staticmethod def get_locname(item: str, loc_name_dict: Optional[Dict[str, str]]): """Gets location name from ID, if available""" if loc_name_dict: item_name = loc_name_dict.get(item, item) else: # default to item if dict key not found item_name = item return item_name
def get_meters_from_radians(dist)
Get approx. distance in meters from radians distance
- Distance in radians
- Distance in meters
- 1 Radian is about 57.2958 degrees.
- then see
- Multiply the number of degrees by 111.325
- To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters.
Expand source code
@staticmethod def get_meters_from_radians(dist): """Get approx. distance in meters from radians distance Args: dist (float): Distance in radians Returns: float: Distance in meters - 1 Radian is about 57.2958 degrees. - then see - Multiply the number of degrees by 111.325 - To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters. """ dist = dist * 57.2958 dist = dist * 111.325 meters_dist = round(dist * 1000, 1) return meters_dist
def get_radians_from_meters(dist)
Get approx. distance in radians from meters
- Distance in meters
- Distance in radians - 1 Radian is about 57.2958 degrees. - then see: - Multiply the number of degrees by 111.325 - To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters.
Expand source code
@staticmethod def get_radians_from_meters(dist): """Get approx. distance in radians from meters Args: dist (float): Distance in meters Returns: float: Distance in radians - 1 Radian is about 57.2958 degrees. - then see: - Multiply the number of degrees by 111.325 - To convert this to meters, multiply by 1,000. So, 2 degrees is about 222,65 meters. """ dist = dist / 1000 degrees_dist = dist / 111.325 radians_dist = degrees_dist / 57.2958 return radians_dist
def get_rectangle_bounds(points)
Get rectangle bounds for numpy.ndarray of point coordinates
Expand source code
@staticmethod def get_rectangle_bounds(points): """Get rectangle bounds for numpy.ndarray of point coordinates""" RectangleBounds = namedtuple( "RectangleBounds", "lim_lat_min lim_lat_max lim_lng_min lim_lng_max" ) lim_y_min = np.min(points.T[1]) lim_y_max = np.max(points.T[1]) lim_x_min = np.min(points.T[0]) lim_x_max = np.max(points.T[0]) return RectangleBounds(lim_y_min, lim_y_max, lim_x_min, lim_x_max)
def get_shapely_bounds(bounds: AnalysisBounds) ‑> shapely.geometry.multipoint.MultiPoint
Returns boundary shape from 4 coordinates
Expand source code
@staticmethod def get_shapely_bounds(bounds: AnalysisBounds) -> geometry.MultiPoint: """Returns boundary shape from 4 coordinates""" bound_points_shapely = geometry.MultiPoint( [ (bounds.lim_lng_min, bounds.lim_lat_min), (bounds.lim_lng_max, bounds.lim_lat_max), ] ) return bound_points_shapely
def haversine(lon1, lat1, lon2, lat2)
Calculate the great circle distance between two points on the earth (specified in decimal degrees)
Expand source code
@staticmethod def haversine(lon1, lat1, lon2, lat2): """ Calculate the great circle distance between two points on the earth (specified in decimal degrees) """ # convert decimal degrees to radians lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) # haversine formula dlon = lon2 - lon1 dlat = lat2 - lat1 a_value = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c_value = 2 * asin(sqrt(a_value)) # Radius of earth in kilometers is 6371 km_dist = 6371 * c_value m_dist = km_dist * 1000 return m_dist
def init_dir(path_folder: Path)
Creates local dir if not exists
Expand source code
@staticmethod def init_dir(path_folder: Path): """Creates local dir if not exists""" if not path_folder.exists(): path_folder.mkdir() print(f"Folder {}/ was created")
def init_main()
Initializing main procedure if package is executed directly
TODO: - disables fiona logging, as it (somehow) interferes with tag maps logging (find better solution)
Expand source code
@staticmethod def init_main(): """Initializing main procedure if package is executed directly TODO: - disables fiona logging, as it (somehow) interferes with tag maps logging (find better solution) """ # set console view parameters # stretch console if platform.system() == "Windows": os.system("mode con: cols=197 lines=500") logging.getLogger("fiona.collection").disabled = True
def query_yes_no(question, default='yes')
Ask a yes/no question via raw_input() and return their answer.
"question" is a string that is presented to the user. "default" is the presumed answer if the user just hits
. It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no".
Expand source code
@staticmethod def query_yes_no(question, default="yes"): """Ask a yes/no question via raw_input() and return their answer. "question" is a string that is presented to the user. "default" is the presumed answer if the user just hits <Enter>. It must be "yes" (the default), "no" or None (meaning an answer is required of the user). The "answer" return value is True for "yes" or False for "no". """ valid = {"yes": True, "y": True, "ye": True, "no": False, "n": False} if default is None: prompt = " [y/n] " elif default == "yes": prompt = " [Y/n] " elif default == "no": prompt = " [y/N] " else: raise ValueError("invalid default answer: '%s'" % default) while True: sys.stdout.write(question + prompt) choice = input().lower() if default is not None and choice == "": return valid[default] elif choice in valid: return valid[choice] sys.stdout.write("'yes' or 'no' (or 'y' or 'n').\n")
def remove_hyperlinks(text_s)
Regex to remove any hyperlinks from string
Note: - anything between xxx will be kept
Expand source code
@staticmethod def remove_hyperlinks(text_s): """Regex to remove any hyperlinks from string Note: - anything between <a>xxx</a> will be kept """ pattern = r"<(a|/a).*?>" result = re.sub(pattern, "", text_s) return result
def remove_special_chars(text_s)
Removes a list of special chars from string
Expand source code
@staticmethod def remove_special_chars(text_s): """Removes a list of special chars from string""" special_chars = "?.!/;:,[]()'-&#|<>=\"" s_cleaned = text_s.translate({ord(c): " " for c in special_chars}) return s_cleaned
def remove_stopwords(text_s, stopwords: List[str]) ‑> str
Expand source code
@staticmethod def remove_stopwords(text_s, stopwords: List[str]) -> str: """Removes a list of words from string, including hyperlinks (<a></a>) and integer numbers (e.g. 2012) """ # first remove hyperlinks text_s = Utils.remove_hyperlinks(text_s) # split string by space character into list querywords = text_s.split() # clean list by matching against stopwords resultwords = [ word for word in querywords if word.lower() not in stopwords and not word.isdigit() ] s_cleaned = " ".join(resultwords) return s_cleaned
def report_listload(list_ref, str_text)
Report only if list_ref contains at least 1 entry.
Expand source code
@staticmethod def report_listload(list_ref, str_text): """Report only if list_ref contains at least 1 entry.""" entry_count = Utils._count_none(list_ref) if entry_count > 0: logging.getLogger("tagmaps").info(f"Loaded {entry_count} {str_text}.")
def select_emoji(input_emoji_set: Set[str], selection_emoji_set: Set[str] = None) ‑> Set[str]
Filters a set of emoji based on another set
Expand source code
@staticmethod def select_emoji( input_emoji_set: Set[str], selection_emoji_set: Set[str] = None ) -> Set[str]: """Filters a set of emoji based on another set""" if selection_emoji_set is None: # no filter on empty selection list return input_emoji_set filtered_emoji_set = { emoji for emoji in input_emoji_set if emoji in selection_emoji_set } return filtered_emoji_set
def select_words(text_s, selection_list: List[str]) ‑> str
Filters a string based on a provided positive filter list of terms
- removes duplicate terms
Expand source code
@staticmethod def select_words(text_s, selection_list: List[str]) -> str: """Filters a string based on a provided positive filter list of terms - removes duplicate terms """ # first remove hyperlinks text_s = Utils.remove_hyperlinks(text_s) # split string by space character into list querywords = text_s.split() resultwords = {word for word in querywords if word.lower() in selection_list} s_cleaned = " ".join(resultwords) return s_cleaned
def set_logger(output_folder: Path = None, logging_level=None)
Set logging handler manually, so we can also print to console while logging to file
Expand source code
@staticmethod def set_logger(output_folder: Path = None, logging_level=None): """Set logging handler manually, so we can also print to console while logging to file """ # reset logging in case Jupyter Notebook has # captured stdout reload(logging) # Create or get logger with specific name log = logging.getLogger("tagmaps") if not log: # no logging handler found for tagmaps # indicates package import mode return if log.handlers: # only add log handlers once return log if logging_level is None: logging_level = logging.INFO if output_folder is not None: if not output_folder.exists(): Utils.init_dir(output_folder) # input(f'{type(output_folder)}') __log_file = output_folder / "log.txt" log.format = "%(message)s" # type: ignore log.datefmt = "" # type: ignore log.setLevel(logging_level) # Set Output to Replace in case of # encoding issues (console/windows) if isinstance(sys.stdout, io.TextIOWrapper): # only for console output (not Juypter Notebook stream) sys.stdout = io.TextIOWrapper( sys.stdout.detach(), sys.stdout.encoding, "replace" ) # type: ignore log.addHandler(logging.StreamHandler()) if output_folder is not None: # only log to file in console mode log.addHandler( logging.FileHandler(__log_file, "w", "utf-8") ) # type: ignore else: # log to stdout, not stderr in Jupyter Mode to prevent # log.Info messages appear as red boxes logging.basicConfig( stream=sys.stdout, format=log.format, # type: ignore level=logging_level, datefmt=None, ) # = sys.stdout # flush once to clear console # sys.stdout.flush() return log
def set_proj_dir()
Update PROJ_LIB location if not found
Leftover from pyproj < 2.0.0 compatibility, PROJ_LIB not needed anymore
Expand source code
@staticmethod def set_proj_dir(): """Update PROJ_LIB location if not found Leftover from pyproj < 2.0.0 compatibility, PROJ_LIB not needed anymore """ if not os.environ.get("PROJ_LIB"): local_proj_path = Path.cwd() / "proj" if not local_proj_path.exists(): raise ValueError( "Pyproj 'proj' datadir not found. Either specify " "PROJ_LIB environmental variable or copy 'proj' " "folder to local path of executable" ) os.environ["PROJ_LIB"] = str(local_proj_path) pyproj.datadir.set_data_dir(str(local_proj_path))
def split_topic(term_concat)
Split concat topic
Expand source code
@staticmethod def split_topic(term_concat): """Split concat topic""" topic_terms = term_concat.split("-") return topic_terms
def str2bool(str_text)
Convert any type of yes no string to bool representation
Expand source code
@staticmethod def str2bool(str_text): """Convert any type of yes no string to bool representation""" if str_text.lower() in ("yes", "true", "t", "y", "1"): return True elif str_text.lower() in ("no", "false", "f", "n", "0"): return False raise argparse.ArgumentTypeError("Boolean value expected.")