Module tagmaps.classes.load_data
Module for loading data
Returns
cleanedPost
- a subset of the original available post attributes that is needed for Tag Maps clustering
Expand source code
# -*- coding: utf-8 -*-
"""Module for loading data
Returns:
cleanedPost: a subset of the original available
post attributes
that is needed for Tag Maps clustering
"""
from __future__ import absolute_import
import csv
import json
import logging
import sys
from decimal import Decimal
from typing import IO, Any, Dict, Iterable, Iterator, Optional, OrderedDict, Set, Tuple
from shapely.geometry import Point
from tagmaps.classes.shared_structure import AnalysisBounds, PostStructure
from tagmaps.classes.utils import Utils
class LoadData:
"""Main Class for ingesting data
- will apply basic filters (based on stoplists etc.)
- Returns CleanedPost
"""
def __init__(self, cfg, user_variety_input=None, console_reporting=None):
"""Initializes Load Data structure"""
if user_variety_input is None:
user_variety_input = False
if console_reporting is None:
console_reporting = False
self.filelist = self._read_local_files(cfg)
self.guid_hash = set() # global list of guids
self.append_to_already_exist = False # unused?
self.shape_exclude_locid_hash = set()
self.shape_included_locid_hash = set()
self.filter_origin = cfg.filter_origin
self.cfg = cfg
self.console_reporting = console_reporting
self.log = logging.getLogger("tagmaps")
self.bounds = AnalysisBounds()
self.distinct_locations_set = set()
self.ignore_empty_latlng = False
self.current_file = None
# basic statistics collection
self.stats = DataStats()
if user_variety_input:
# get user input for max tags to process
# this is combined here with output reporting
# of how many files to process
# the user can start loading data with enter, or
# by adding a number (e.g. 100), which will
# later be used to remove the long tail for tags/emoji
self._get_imax()
def __enter__(self):
"""Main pipeline for reading posts from file
Combine multiple generators to single pipeline
that is returned for being processed by
with-statement.
"""
post_pipeline = self._parse_postlist(
self._process_inputfile(self._parse_input_files(count=True))
)
return post_pipeline
def __exit__(self, c_type, value, traceback):
"""Contextmanager exit: nothing to do here"""
return False
def _parse_input_files(self, count: bool = None) -> Iterator[IO[str]]:
"""Loops input input filelist and
returns opened file handles
"""
for file_name in self.filelist:
if count:
self.stats.partcount += 1
self.current_file = file_name.stem
yield open(file_name, "r", newline="", encoding="utf8")
def is_intermediate(self):
"""Auto test if intermediate data is present"""
post_reader = next(
self._process_inputfile(self._parse_input_files(count=False))
)
for post in post_reader:
pguid = post.get(self.cfg.source_map.post_guid_col)
if pguid is None and post.get("guid") is not None:
# if column name is "guid",
# data is likely of type intermediate
self.log.info("Intermediate data detected.. skipping filtering step.\n")
return True
return False
def _process_inputfile(self, file_handles: Iterator[IO[str]]) -> Iterator[Any]:
"""File parse for CSV or JSON from open file handle
Output: produces a list of post that can be parsed
"""
post_reader = []
for file_handle in file_handles:
if self.cfg.source_map.file_extension == "csv":
post_reader = csv.DictReader(
file_handle,
delimiter=self.cfg.source_map.delimiter,
quotechar=self.cfg.source_map.quote_char,
quoting=self.cfg.source_map.quoting,
)
# next(post_list, None) # skip headerline
elif self.cfg.source_map.file_extension == "json":
post_reader = post_reader + json.loads(file_handle.read())
yield post_reader
def _parse_postlist(self, post_readers: Iterable[OrderedDict[str, Optional[str]]]):
"""Process posts according to specifications
Returns generator for single record
"""
# row_num = 0
msg = None
for post_reader in post_readers:
Utils.check_fileheader(
post_reader.fieldnames, self.cfg.source_map, self.current_file
)
for post in post_reader:
# row_num += 1
lbsn_post = self._parse_post(post)
if lbsn_post is None:
continue
else:
self.stats.count_glob += 1
msg = self._report_progress()
# if (row_num % 10 == 0):
# modulo: print only once every 10 iterations
if self.console_reporting:
print(msg, end="\r")
yield lbsn_post
# log last message to file, clean stdout
if msg and self.console_reporting:
print(" " * len(msg), end="\r")
sys.stdout.flush()
if self.stats.count_glob == 0:
raise ValueError(
f"No posts found in input data. "
f"First file: {next(iter(self.filelist or []), None)}."
)
self.log.info(msg)
def _report_progress(self):
"""Status report"""
msg = (
f"Cleaned input to {len(self.distinct_locations_set):02d} "
f"distinct locations from "
f"{self.stats.count_glob:02d} posts "
f"(File {self.stats.partcount} of {len(self.filelist)}) - "
f"Skipped posts: {self.stats.skipped_count} - skipped tags: "
f"{self.stats.count_tags_skipped} of "
f"{self.stats.count_tags_global}"
)
return msg
def _parse_post(self, post: Dict[str, str]) -> Optional[PostStructure]:
"""Process single post and attach to common structure"""
# skip duplicates and erroneous entries
post_guid = post.get(self.cfg.source_map.post_guid_col)
if post_guid in self.guid_hash:
self.stats.skipped_count += 1
return None
if post_guid is None:
raise ValueError(f"Post guid is None: {post}")
self.guid_hash.add(post_guid)
origin_id = post.get(self.cfg.source_map.originid_col)
if origin_id is None:
origin_id = 0
if self.filter_origin and not origin_id == self.filter_origin:
# optional exclude origin
self.stats.skipped_count += 1
return None
user_guid = post.get(self.cfg.source_map.user_guid_col)
if user_guid is None:
raise ValueError(f"User guid is None: {post}")
if (
not self.cfg.ignore_stoplists
and self.cfg.sort_out_user_set is not None
and user_guid in self.cfg.sort_out_user_set
):
return None
# Continue Parse Post
lbsn_post = PostStructure(
origin_id=int(origin_id), guid=post_guid, user_guid=user_guid
)
lbsn_post.post_url = post.get(self.cfg.source_map.post_url_col)
lbsn_post.post_publish_date = post.get(
self.cfg.source_map.post_publish_date_col
)
# Process Spatial Query first (if skipping necessary)
if self.cfg.sort_out_places and self._is_sortout_place(post):
return None
lat = None
lng = None
if self._is_empty_latlng(post):
if self.ignore_empty_latlng:
pass
else:
return None
else:
# assign lat/lng coordinates from dict
lat, lng = self._correct_placelatlng(
post.get(self.cfg.source_map.place_guid_col),
post.get(self.cfg.source_map.latitude_col),
post.get(self.cfg.source_map.longitude_col),
)
# update boundary
self.bounds.upd_latlng_bounds(lat, lng)
lbsn_post.latitude = lat
lbsn_post.longitude = lng
if lat is None or lng is None:
# Try to substitude place_guid
# if self.ignore_empty_latlng has been set to True
lbsn_post.loc_id = post.get(self.cfg.source_map.place_guid_col)
if not lbsn_post.loc_id:
self.log.warning("Neither coordinates nor place guid found.")
else:
# Note: loc_id not loaded from file
# create loc_id from lat/lng
lbsn_post.loc_id = f"{lat}:{lng}"
# counting of distinct loc ids
self.distinct_locations_set.add(lbsn_post.loc_id)
lbsn_post.loc_name = post.get(self.cfg.source_map.place_name_col)
# exclude posts outside boundary
if (
self.cfg.shapefile_intersect or self.cfg.shapefile_exclude
) and self._is_outside_shapebounds(lbsn_post) is True:
return None
if self.cfg.cluster_tags or self.cfg.cluster_emoji or self.cfg.topic_modeling:
post_body = post.get(self.cfg.source_map.post_body_col)
post_title = post.get(self.cfg.source_map.post_title_col)
if self.cfg.ignore_stoplists:
lbsn_post.post_body = post_body
lbsn_post.post_title = post_title
else:
if self.cfg.select_tags_set is not None:
# if positive filterlist available
lbsn_post.post_body = Utils.select_words(
post_body, self.cfg.select_tags_set
)
lbsn_post.post_title = Utils.select_words(
post_title, self.cfg.select_tags_set
)
else:
# check against stoplists
lbsn_post.post_body = Utils.remove_stopwords(
post_body, self.cfg.sort_out_always_set
)
lbsn_post.post_title = Utils.remove_stopwords(
post_title, self.cfg.sort_out_always_set
)
else:
lbsn_post.post_title = ""
lbsn_post.post_body = ""
lbsn_post.post_like_count = self._get_count_frompost(
post.get(self.cfg.source_map.post_like_count_col)
)
lbsn_post.hashtags = set()
if self.cfg.cluster_tags or self.cfg.topic_modeling:
lbsn_post.hashtags = self._get_tags(post.get(self.cfg.source_map.tags_col))
if self.cfg.cluster_emoji:
lbsn_post.emoji = self._get_emoji(post)
lbsn_post.post_create_date = post.get(self.cfg.source_map.post_create_date_col)
lbsn_post.post_views_count = self._get_count_frompost(
post.get(self.cfg.source_map.post_views_count_col)
)
# return parsed post object
return lbsn_post
@staticmethod
def _read_local_files(config):
"""Read Local Files according to config parameters
- returns list of file-paths
"""
input_path = config.input_folder
filelist = list(input_path.glob(f"*.{config.source_map.file_extension}"))
input_count = len(filelist)
if input_count == 0:
raise ValueError(
f"No input files *."
f"{config.source_map.file_extension} "
f"in ./{input_path.name}/ found."
)
return filelist
@staticmethod
def _get_count_frompost(count_string: Optional[str]) -> int:
"""Parse post like count field"""
if count_string and not count_string == "":
try:
photo_likes_int = int(count_string)
return photo_likes_int
except TypeError:
logging.getLogger("tagmaps").debug(
f"\nPost like count parser: Type Error: "
f"{type(count_string)} not a valid number format "
f"Returning 0."
)
except ValueError:
logging.getLogger("tagmaps").debug(
f"\nPost like count parser: Value Error: "
f"{count_string} not a valid number. "
f"Returning 0."
)
return 0
def _get_emoji(self, post: Dict[str, str]) -> Set[str]:
"""Extract emoji from post_body and emoji col,
use selection list if available
"""
emoji_body = Utils.select_emoji(
Utils.extract_emoji(post.get(self.cfg.source_map.post_body_col)),
self.cfg.select_emoji_set,
)
emoji_col = Utils.select_emoji(
Utils.extract_emoji(post.get(self.cfg.source_map.emoji_col)),
self.cfg.select_emoji_set,
)
emoji_filtered = set.union(emoji_body, emoji_col)
if emoji_filtered:
self.stats.count_emojis_global += len(emoji_filtered)
return emoji_filtered
def _get_tags(self, tags_string: Optional[str]) -> Set[str]:
"""Extract tags, apply filter lists if available"""
# base str conversion to set
tags = set(filter(None, tags_string.lower().split(";")))
# Filter tags based on two stoplists
if self.cfg.ignore_stoplists:
count_tags = len(tags)
count_skipped = 0
else:
tags, count_tags, count_skipped = Utils.filter_tags(
tags,
self.cfg.sort_out_always_set,
self.cfg.sort_out_always_instr_set,
self.cfg.select_tags_set,
)
# update global stats
self.stats.count_tags_global += count_tags
self.stats.count_tags_skipped += count_skipped
return tags
def _correct_placelatlng(
self, place_guid_string: Optional[str], lat, lng
) -> Tuple[Decimal, Decimal]:
"""If place corrections available, update lat/lng coordinates
Needs test: not place_guid_string
"""
if (
self.cfg.correct_places
and not place_guid_string
and place_guid_string in self.cfg.correct_place_latlng_dict
):
lat = Decimal(
# correct lat
self.cfg.correct_place_latlng_dict[place_guid_string][0]
)
lng = Decimal(
# correct lng
self.cfg.correct_place_latlng_dict[place_guid_string][1]
)
else:
# return original lat/lng
lat = Decimal(lat) # original lat
lng = Decimal(lng) # original lng
return lat, lng
def _is_outside_shapebounds(self, post):
"""Skip all posts outside shapefile"""
# do not expensive spatial check twice:
if post.loc_id in self.shape_exclude_locid_hash:
self.stats.skipped_count += 1
return True
if post.loc_id not in self.shape_included_locid_hash:
lng_lat_point = Point(post.longitude, post.latitude)
if (
Utils.check_intersect_polylist(
lng_lat_point,
self.cfg.shapefile_intersect,
self.cfg.shapefile_exclude,
)
is False
):
self.stats.skipped_count += 1
self.shape_exclude_locid_hash.add(post.loc_id)
return True
self.shape_included_locid_hash.add(post.loc_id)
return False
def _is_empty_latlng(self, post):
"""skip non-geotagged medias"""
latitude = post.get(self.cfg.source_map.latitude_col)
longitude = post.get(self.cfg.source_map.longitude_col)
if not latitude or not longitude:
self.stats.count_non_geotagged += 1
return True
return False
def _is_sortout_place(self, post):
"""Returns False if place of post is in ignore list"""
place_guid = post.get(self.cfg.source_map.place_guid_col)
if place_guid:
if place_guid in self.cfg.sort_out_places_set:
self.stats.skipped_count += 1
return True
return False
def _get_imax(self):
"""User Input to get number of tags to process"""
if self.cfg.auto_mode:
return
if self.cfg.cluster_tags or self.cfg.cluster_emoji:
inputtext = input(
f"Files to process: {len(self.filelist)}. \nOptional: "
f"Enter a Number for the variety of tags to process "
f"(default is 1000)\nPress Enter to proceed.. \n"
)
if inputtext is None or inputtext == "" or not inputtext.isdigit():
return
self.cfg.max_items = int(inputtext)
def input_stats_report(self):
"""Return input stats"""
self.log.info(f"\nTotal post count (PC): " f"{self.stats.count_glob:02d}")
self.log.info(f"Total tag count (PTC): " f"{self.stats.count_tags_global}")
self.log.info(f"Total emoji count (PEC): " f"{self.stats.count_emojis_global}")
class DataStats:
"""Class storing basic data stats"""
def __init__(self):
"""Initialize stats."""
self.count_glob = 0
self.partcount = 0
self.skipped_count = 0
self.count_non_geotagged = 0
self.count_outside_shape = 0
self.count_tags_global = 0
self.count_emojis_global = 0
self.count_tags_skipped = 0
Classes
class DataStats
-
Class storing basic data stats
Initialize stats.
Expand source code
class DataStats: """Class storing basic data stats""" def __init__(self): """Initialize stats.""" self.count_glob = 0 self.partcount = 0 self.skipped_count = 0 self.count_non_geotagged = 0 self.count_outside_shape = 0 self.count_tags_global = 0 self.count_emojis_global = 0 self.count_tags_skipped = 0
class LoadData (cfg, user_variety_input=None, console_reporting=None)
-
Main Class for ingesting data
- will apply basic filters (based on stoplists etc.)
- Returns CleanedPost
Initializes Load Data structure
Expand source code
class LoadData: """Main Class for ingesting data - will apply basic filters (based on stoplists etc.) - Returns CleanedPost """ def __init__(self, cfg, user_variety_input=None, console_reporting=None): """Initializes Load Data structure""" if user_variety_input is None: user_variety_input = False if console_reporting is None: console_reporting = False self.filelist = self._read_local_files(cfg) self.guid_hash = set() # global list of guids self.append_to_already_exist = False # unused? self.shape_exclude_locid_hash = set() self.shape_included_locid_hash = set() self.filter_origin = cfg.filter_origin self.cfg = cfg self.console_reporting = console_reporting self.log = logging.getLogger("tagmaps") self.bounds = AnalysisBounds() self.distinct_locations_set = set() self.ignore_empty_latlng = False self.current_file = None # basic statistics collection self.stats = DataStats() if user_variety_input: # get user input for max tags to process # this is combined here with output reporting # of how many files to process # the user can start loading data with enter, or # by adding a number (e.g. 100), which will # later be used to remove the long tail for tags/emoji self._get_imax() def __enter__(self): """Main pipeline for reading posts from file Combine multiple generators to single pipeline that is returned for being processed by with-statement. """ post_pipeline = self._parse_postlist( self._process_inputfile(self._parse_input_files(count=True)) ) return post_pipeline def __exit__(self, c_type, value, traceback): """Contextmanager exit: nothing to do here""" return False def _parse_input_files(self, count: bool = None) -> Iterator[IO[str]]: """Loops input input filelist and returns opened file handles """ for file_name in self.filelist: if count: self.stats.partcount += 1 self.current_file = file_name.stem yield open(file_name, "r", newline="", encoding="utf8") def is_intermediate(self): """Auto test if intermediate data is present""" post_reader = next( self._process_inputfile(self._parse_input_files(count=False)) ) for post in post_reader: pguid = post.get(self.cfg.source_map.post_guid_col) if pguid is None and post.get("guid") is not None: # if column name is "guid", # data is likely of type intermediate self.log.info("Intermediate data detected.. skipping filtering step.\n") return True return False def _process_inputfile(self, file_handles: Iterator[IO[str]]) -> Iterator[Any]: """File parse for CSV or JSON from open file handle Output: produces a list of post that can be parsed """ post_reader = [] for file_handle in file_handles: if self.cfg.source_map.file_extension == "csv": post_reader = csv.DictReader( file_handle, delimiter=self.cfg.source_map.delimiter, quotechar=self.cfg.source_map.quote_char, quoting=self.cfg.source_map.quoting, ) # next(post_list, None) # skip headerline elif self.cfg.source_map.file_extension == "json": post_reader = post_reader + json.loads(file_handle.read()) yield post_reader def _parse_postlist(self, post_readers: Iterable[OrderedDict[str, Optional[str]]]): """Process posts according to specifications Returns generator for single record """ # row_num = 0 msg = None for post_reader in post_readers: Utils.check_fileheader( post_reader.fieldnames, self.cfg.source_map, self.current_file ) for post in post_reader: # row_num += 1 lbsn_post = self._parse_post(post) if lbsn_post is None: continue else: self.stats.count_glob += 1 msg = self._report_progress() # if (row_num % 10 == 0): # modulo: print only once every 10 iterations if self.console_reporting: print(msg, end="\r") yield lbsn_post # log last message to file, clean stdout if msg and self.console_reporting: print(" " * len(msg), end="\r") sys.stdout.flush() if self.stats.count_glob == 0: raise ValueError( f"No posts found in input data. " f"First file: {next(iter(self.filelist or []), None)}." ) self.log.info(msg) def _report_progress(self): """Status report""" msg = ( f"Cleaned input to {len(self.distinct_locations_set):02d} " f"distinct locations from " f"{self.stats.count_glob:02d} posts " f"(File {self.stats.partcount} of {len(self.filelist)}) - " f"Skipped posts: {self.stats.skipped_count} - skipped tags: " f"{self.stats.count_tags_skipped} of " f"{self.stats.count_tags_global}" ) return msg def _parse_post(self, post: Dict[str, str]) -> Optional[PostStructure]: """Process single post and attach to common structure""" # skip duplicates and erroneous entries post_guid = post.get(self.cfg.source_map.post_guid_col) if post_guid in self.guid_hash: self.stats.skipped_count += 1 return None if post_guid is None: raise ValueError(f"Post guid is None: {post}") self.guid_hash.add(post_guid) origin_id = post.get(self.cfg.source_map.originid_col) if origin_id is None: origin_id = 0 if self.filter_origin and not origin_id == self.filter_origin: # optional exclude origin self.stats.skipped_count += 1 return None user_guid = post.get(self.cfg.source_map.user_guid_col) if user_guid is None: raise ValueError(f"User guid is None: {post}") if ( not self.cfg.ignore_stoplists and self.cfg.sort_out_user_set is not None and user_guid in self.cfg.sort_out_user_set ): return None # Continue Parse Post lbsn_post = PostStructure( origin_id=int(origin_id), guid=post_guid, user_guid=user_guid ) lbsn_post.post_url = post.get(self.cfg.source_map.post_url_col) lbsn_post.post_publish_date = post.get( self.cfg.source_map.post_publish_date_col ) # Process Spatial Query first (if skipping necessary) if self.cfg.sort_out_places and self._is_sortout_place(post): return None lat = None lng = None if self._is_empty_latlng(post): if self.ignore_empty_latlng: pass else: return None else: # assign lat/lng coordinates from dict lat, lng = self._correct_placelatlng( post.get(self.cfg.source_map.place_guid_col), post.get(self.cfg.source_map.latitude_col), post.get(self.cfg.source_map.longitude_col), ) # update boundary self.bounds.upd_latlng_bounds(lat, lng) lbsn_post.latitude = lat lbsn_post.longitude = lng if lat is None or lng is None: # Try to substitude place_guid # if self.ignore_empty_latlng has been set to True lbsn_post.loc_id = post.get(self.cfg.source_map.place_guid_col) if not lbsn_post.loc_id: self.log.warning("Neither coordinates nor place guid found.") else: # Note: loc_id not loaded from file # create loc_id from lat/lng lbsn_post.loc_id = f"{lat}:{lng}" # counting of distinct loc ids self.distinct_locations_set.add(lbsn_post.loc_id) lbsn_post.loc_name = post.get(self.cfg.source_map.place_name_col) # exclude posts outside boundary if ( self.cfg.shapefile_intersect or self.cfg.shapefile_exclude ) and self._is_outside_shapebounds(lbsn_post) is True: return None if self.cfg.cluster_tags or self.cfg.cluster_emoji or self.cfg.topic_modeling: post_body = post.get(self.cfg.source_map.post_body_col) post_title = post.get(self.cfg.source_map.post_title_col) if self.cfg.ignore_stoplists: lbsn_post.post_body = post_body lbsn_post.post_title = post_title else: if self.cfg.select_tags_set is not None: # if positive filterlist available lbsn_post.post_body = Utils.select_words( post_body, self.cfg.select_tags_set ) lbsn_post.post_title = Utils.select_words( post_title, self.cfg.select_tags_set ) else: # check against stoplists lbsn_post.post_body = Utils.remove_stopwords( post_body, self.cfg.sort_out_always_set ) lbsn_post.post_title = Utils.remove_stopwords( post_title, self.cfg.sort_out_always_set ) else: lbsn_post.post_title = "" lbsn_post.post_body = "" lbsn_post.post_like_count = self._get_count_frompost( post.get(self.cfg.source_map.post_like_count_col) ) lbsn_post.hashtags = set() if self.cfg.cluster_tags or self.cfg.topic_modeling: lbsn_post.hashtags = self._get_tags(post.get(self.cfg.source_map.tags_col)) if self.cfg.cluster_emoji: lbsn_post.emoji = self._get_emoji(post) lbsn_post.post_create_date = post.get(self.cfg.source_map.post_create_date_col) lbsn_post.post_views_count = self._get_count_frompost( post.get(self.cfg.source_map.post_views_count_col) ) # return parsed post object return lbsn_post @staticmethod def _read_local_files(config): """Read Local Files according to config parameters - returns list of file-paths """ input_path = config.input_folder filelist = list(input_path.glob(f"*.{config.source_map.file_extension}")) input_count = len(filelist) if input_count == 0: raise ValueError( f"No input files *." f"{config.source_map.file_extension} " f"in ./{input_path.name}/ found." ) return filelist @staticmethod def _get_count_frompost(count_string: Optional[str]) -> int: """Parse post like count field""" if count_string and not count_string == "": try: photo_likes_int = int(count_string) return photo_likes_int except TypeError: logging.getLogger("tagmaps").debug( f"\nPost like count parser: Type Error: " f"{type(count_string)} not a valid number format " f"Returning 0." ) except ValueError: logging.getLogger("tagmaps").debug( f"\nPost like count parser: Value Error: " f"{count_string} not a valid number. " f"Returning 0." ) return 0 def _get_emoji(self, post: Dict[str, str]) -> Set[str]: """Extract emoji from post_body and emoji col, use selection list if available """ emoji_body = Utils.select_emoji( Utils.extract_emoji(post.get(self.cfg.source_map.post_body_col)), self.cfg.select_emoji_set, ) emoji_col = Utils.select_emoji( Utils.extract_emoji(post.get(self.cfg.source_map.emoji_col)), self.cfg.select_emoji_set, ) emoji_filtered = set.union(emoji_body, emoji_col) if emoji_filtered: self.stats.count_emojis_global += len(emoji_filtered) return emoji_filtered def _get_tags(self, tags_string: Optional[str]) -> Set[str]: """Extract tags, apply filter lists if available""" # base str conversion to set tags = set(filter(None, tags_string.lower().split(";"))) # Filter tags based on two stoplists if self.cfg.ignore_stoplists: count_tags = len(tags) count_skipped = 0 else: tags, count_tags, count_skipped = Utils.filter_tags( tags, self.cfg.sort_out_always_set, self.cfg.sort_out_always_instr_set, self.cfg.select_tags_set, ) # update global stats self.stats.count_tags_global += count_tags self.stats.count_tags_skipped += count_skipped return tags def _correct_placelatlng( self, place_guid_string: Optional[str], lat, lng ) -> Tuple[Decimal, Decimal]: """If place corrections available, update lat/lng coordinates Needs test: not place_guid_string """ if ( self.cfg.correct_places and not place_guid_string and place_guid_string in self.cfg.correct_place_latlng_dict ): lat = Decimal( # correct lat self.cfg.correct_place_latlng_dict[place_guid_string][0] ) lng = Decimal( # correct lng self.cfg.correct_place_latlng_dict[place_guid_string][1] ) else: # return original lat/lng lat = Decimal(lat) # original lat lng = Decimal(lng) # original lng return lat, lng def _is_outside_shapebounds(self, post): """Skip all posts outside shapefile""" # do not expensive spatial check twice: if post.loc_id in self.shape_exclude_locid_hash: self.stats.skipped_count += 1 return True if post.loc_id not in self.shape_included_locid_hash: lng_lat_point = Point(post.longitude, post.latitude) if ( Utils.check_intersect_polylist( lng_lat_point, self.cfg.shapefile_intersect, self.cfg.shapefile_exclude, ) is False ): self.stats.skipped_count += 1 self.shape_exclude_locid_hash.add(post.loc_id) return True self.shape_included_locid_hash.add(post.loc_id) return False def _is_empty_latlng(self, post): """skip non-geotagged medias""" latitude = post.get(self.cfg.source_map.latitude_col) longitude = post.get(self.cfg.source_map.longitude_col) if not latitude or not longitude: self.stats.count_non_geotagged += 1 return True return False def _is_sortout_place(self, post): """Returns False if place of post is in ignore list""" place_guid = post.get(self.cfg.source_map.place_guid_col) if place_guid: if place_guid in self.cfg.sort_out_places_set: self.stats.skipped_count += 1 return True return False def _get_imax(self): """User Input to get number of tags to process""" if self.cfg.auto_mode: return if self.cfg.cluster_tags or self.cfg.cluster_emoji: inputtext = input( f"Files to process: {len(self.filelist)}. \nOptional: " f"Enter a Number for the variety of tags to process " f"(default is 1000)\nPress Enter to proceed.. \n" ) if inputtext is None or inputtext == "" or not inputtext.isdigit(): return self.cfg.max_items = int(inputtext) def input_stats_report(self): """Return input stats""" self.log.info(f"\nTotal post count (PC): " f"{self.stats.count_glob:02d}") self.log.info(f"Total tag count (PTC): " f"{self.stats.count_tags_global}") self.log.info(f"Total emoji count (PEC): " f"{self.stats.count_emojis_global}")
Methods
def input_stats_report(self)
-
Return input stats
Expand source code
def input_stats_report(self): """Return input stats""" self.log.info(f"\nTotal post count (PC): " f"{self.stats.count_glob:02d}") self.log.info(f"Total tag count (PTC): " f"{self.stats.count_tags_global}") self.log.info(f"Total emoji count (PEC): " f"{self.stats.count_emojis_global}")
def is_intermediate(self)
-
Auto test if intermediate data is present
Expand source code
def is_intermediate(self): """Auto test if intermediate data is present""" post_reader = next( self._process_inputfile(self._parse_input_files(count=False)) ) for post in post_reader: pguid = post.get(self.cfg.source_map.post_guid_col) if pguid is None and post.get("guid") is not None: # if column name is "guid", # data is likely of type intermediate self.log.info("Intermediate data detected.. skipping filtering step.\n") return True return False