Module tagmaps.classes.cluster

Module for tag maps clustering methods

Expand source code
# -*- coding: utf-8 -*-

"""
Module for tag maps clustering methods
"""

from __future__ import absolute_import

import logging
import queue
import sys
import threading
from collections import defaultdict
from dataclasses import astuple, dataclass
from functools import wraps
from typing import Dict, List, Optional, Set, Tuple

import hdbscan
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import shapely.geometry as geometry
from pyproj import Transformer  # pylint: disable=C0412
from shapely.ops import transform  # pylint: disable=C0412
from tagmaps.classes.alpha_shapes import (AlphaShapes, AlphaShapesAndMeta,
                                          AlphaShapesArea)
from tagmaps.classes.plotting import TPLT
from tagmaps.classes.prepare_data import PreparedStats
from tagmaps.classes.shared_structure import (EMOJI, LOCATIONS, POST_FIELDS,
                                              TAGS, TOPICS, AnalysisBounds,
                                              CleanedPost, ItemCounter)
from tagmaps.classes.utils import Utils

# init threaded cluster queue
cluster_queue = queue.Queue()

sns.set_context('poster')
sns.set_style('white')


@dataclass
class SelectedItems:
    guids: List[str]
    location_count: int


@dataclass
class Guids:
    clustered: List[str]
    nonclustered: List[str]


@dataclass
class SelItems:
    """List of coordinates (points) with related post_guids"""
    points: List[Optional[np.ndarray]]
    guids: List[str]


@dataclass
class ClusterResults:
    """List of post guids and assigned cluster labels (from HDBSCAN)"""
    clusters: Tuple[np.ndarray, Tuple[int, Optional[int]]]
    guids: List[str]
    points: Optional[List[np.ndarray]] = None
    colors: Optional[List[Tuple[float, float, float]]] = None
    mask_noisy: Optional[np.ndarray] = None
    cluster_count: Optional[int] = None

    def __iter__(self):
        return iter(astuple(self))


def store_in_queue(f):
    """Decorator to store function in threaded queue"""
    def wrapper(*args):
        cluster_queue.put(f(*args))
    return wrapper


@dataclass
class ClusterShapes:
    """Count of user per cluster centroid

    data: List of Tuples with
          (1) Point = cluster centroid and
          (2) int = user count
    cls_type: cluster type (TAGS, EMOJI, ..)
    itemized: bool
              False: Overall Location clusters
              True: Itemoized clusters (TAGS, EMOJI)
    """
    data: List[Tuple[geometry.Point, int]]
    cls_type: str
    itemized: bool

    def __iter__(self):
        return iter(astuple(self))


class ClusterGen():
    """Cluster methods for tags, emoji and post locations

    Note that there are three different projections used here:

    1. Input: Original Data in Decimal Degrees (WGS1984)
    2. Intermediate: Radians data converted from Decimal Degrees with
    np.radians(points) for use in HDBSCAN clustering
    3. Output: Projected coordinates based on auto-selected UTM Zone,
    for calculating Alpha Shapes and writing results to
    shapefile
    """
    class CGDec():
        """Decorators for class CG methods"""
        @staticmethod
        def input_topic_format(func):
            """Check if cluster type is topic and if,
                concat item list to string."""
            @wraps(func)
            def _wrapper(self, item, **kwargs):
                if self.cls_type == TOPICS:
                    if isinstance(item, list):
                        item = Utils.concat_topic(item)
                    elif not '-' in item:
                        raise ValueError(
                            "Please supply either list of terms, or"
                            "concatenate terms with '-' character.")
                return func(self, item, **kwargs)
            return _wrapper

    def __init__(self, bounds: AnalysisBounds,
                 cleaned_post_dict: Optional[Dict[str, CleanedPost]],
                 cleaned_post_list: Optional[List[CleanedPost]],
                 top_list: List[ItemCounter],
                 total_distinct_locations: int,
                 cluster_type: str = TAGS,
                 local_saturation_check: bool = False):
        self.cls_type = cluster_type
        self.bounds = bounds
        self.cluster_distance: float = ClusterGen._init_cluster_dist(
            self.bounds, self.cls_type)
        self.cleaned_post_dict = cleaned_post_dict
        self.cleaned_post_list = cleaned_post_list
        self.top_list = top_list
        self.top_item: Optional[ItemCounter]
        if self.top_list:
            self.top_item = top_list[0]  # TODO: check 2nd [0]
        else:
            self.top_item = None
        self.total_distinct_locations = total_distinct_locations
        self.autoselect_clusters = False  # no cluster distance needed
        self.clusterer = None
        self.local_saturation_check = local_saturation_check
        # storing cluster results:
        self.single_items_dict = defaultdict(list)
        self.clustered_items_dict = defaultdict(list)
        self.clustered_guids_all: List[str] = list()
        self.none_clustered_guids: List[str] = list()
        # get initial analysis bounds in Decimal Degrees
        # for calculating output UTM Zone Projection
        self._update_bounds()
        self.bound_points_shapely = Utils.get_shapely_bounds(
            self.bounds)
        # verify that PROJ_LIB exists,
        # only necessary for pyproj < 2.0.0
        # Utils.set_proj_dir()
        # input data always in lat/lng WGS1984
        # define input and UTM projections
        self.crs_wgs = "epsg:4326"
        self.crs_proj, __ = Utils.get_best_utmzone(
            self.bound_points_shapely)
        # define projection function ahead
        # for reasons of speed
        # always_xy ensures traditional order of
        # coordinates (lng, lat), see also:
        # https://gis.stackexchange.com/a/326919/33092
        self.proj_transformer = Transformer.from_crs(
            self.crs_wgs, self.crs_proj, always_xy=True)
        self.proj_transformer_back = Transformer.from_crs(
            self.crs_proj, self.crs_wgs, always_xy=True)

    @classmethod
    def new_clusterer(cls,
                      cls_type: str,
                      bounds: AnalysisBounds,
                      cleaned_post_dict: Optional[Dict[str, CleanedPost]],
                      cleaned_post_list: Optional[List[CleanedPost]],
                      cleaned_stats: Optional[Dict[str, PreparedStats]],
                      local_saturation_check: bool):
        """Create new clusterer from type and input data

        Args:
            cls_type (ClusterType): Either TAGS,
                LOCATIONS, TOPICS or EMOJI
            bounds (LoadData.AnalysisBounds): Analaysis spatial boundary
            cleaned_post_dict (Dict[str, CleanedPost]): Dict of cleaned posts
            prepared_data (LoadData.PreparedData): Statistics data

        Returns:
            clusterer (ClusterGen): A new clusterer of ClusterType
        """
        cls_cleaned_stats = cleaned_stats.get(cls_type)
        if not cls_cleaned_stats:
            raise ValueError("Cleaned_stats not initialized")
        clusterer = cls(
            bounds=bounds,
            cleaned_post_dict=cleaned_post_dict,
            cleaned_post_list=cleaned_post_list,
            top_list=cls_cleaned_stats.top_items_list,
            total_distinct_locations=cleaned_stats[
                LOCATIONS].total_unique_items,
            cluster_type=cls_type,
            local_saturation_check=local_saturation_check)
        return clusterer

    @staticmethod
    def _init_cluster_dist(bounds: AnalysisBounds,
                           cls_type: str) -> float:
        """Get initial cluster distance from analysis bounds.

        - 7% of research area width/height (max) = optimal
        - default value #223.245922725 #= 0.000035 radians dist
        """
        dist_y = Utils.haversine(bounds.lim_lng_min,
                                 bounds.lim_lat_min,
                                 bounds.lim_lng_min,
                                 bounds.lim_lat_max)
        dist_x = Utils.haversine(bounds.lim_lng_min,
                                 bounds.lim_lat_min,
                                 bounds.lim_lng_max,
                                 bounds.lim_lat_min)
        cluster_distance = (min(dist_x, dist_y)/100)*7
        if cls_type == LOCATIONS:
            # since location clustering includes
            # all data, use reduced default distance
            cluster_distance = cluster_distance/8
        return cluster_distance

    def _update_bounds(self):
        """Update analysis rectangle boundary based on

        cleaned posts list."""

        dataframe = pd.DataFrame(self.cleaned_post_list, columns=POST_FIELDS)
        # get columns lng, lat
        # convert to numpy ndarray
        # (List of [lng, lat] lists)
        points = dataframe.loc[:, ['lng', 'lat']].to_numpy()
        (self.bounds.lim_lat_min,
         self.bounds.lim_lat_max,
         self.bounds.lim_lng_min,
         self.bounds.lim_lng_max) = Utils.get_rectangle_bounds(points)

    def _select_postguids(self, item: Optional[str]) -> SelectedItems:
        """Select all posts that have a specific item

        Args:
            item: tag, emoji, location

        Returns:
            selected_items: list of post_guids and
                            number of distinct locations
        """
        distinct_localloc_count = set()
        selected_postguids_list = list()
        for cleaned_post_location in self.cleaned_post_list:
            if self.cls_type == TAGS:
                self._filter_tags(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == EMOJI:
                self._filter_emoji(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == LOCATIONS:
                self._filter_locations(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == TOPICS:
                self._filter_topics(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            else:
                raise ValueError(f"Clusterer {self.cls_type} unknown.")
        selected_items = SelectedItems(
            selected_postguids_list, len(distinct_localloc_count))
        return selected_items

    @staticmethod
    def _filter_tags(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if (item in (cleaned_photo_location.hashtags) or
                (item in cleaned_photo_location.post_body)):
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _filter_topics(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        """Check topics against tags, body and emoji"""
        item_list = Utils.split_topic(item)
        if (ClusterGen._compare_anyinlist(
                item_list, cleaned_photo_location.hashtags)
                or ClusterGen._compare_anyinlist(
                    item_list, cleaned_photo_location.post_body)
                or ClusterGen._compare_anyinlist(
                    item_list, cleaned_photo_location.emoji)):
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _compare_anyinlist(items, item_list):
        """Check if any term of topic is in list"""
        if any(x in items for x in item_list):
            return True
        return False

    @staticmethod
    def _filter_emoji(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if item in cleaned_photo_location.emoji:
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _filter_locations(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if item == cleaned_photo_location.loc_id:
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    def _getselect_postguids(self, item: Optional[str],
                             silent: bool = True) -> List[str]:
        """Get list of post guids with specific item

        Args:
            item: tag, emoji, location
        """
        sel_items = self._select_postguids(item)
        if silent:
            return sel_items.guids
        # console reporting
        if self.cls_type == EMOJI:
            item_text = Utils.get_emojiname(item)
        else:
            item_text = item
        type_text = self.cls_type.rstrip('s')
        perc_oftotal_locations = (
            sel_items.location_count /
            (self.total_distinct_locations/100)
        )
        perc_text = ""
        if perc_oftotal_locations >= 1:
            perc_text = (f'(found in {perc_oftotal_locations:.0f}% '
                         f'of DLC in area)')
        item_index_pos = self._get_toplist_index(item) + 1
        print(f"({item_index_pos} of {len(self.top_list)}) "
              f"Found {len(sel_items.guids)} posts (UPL) "
              f"for {type_text} '{item_text}' "
              f"{perc_text}", end=" ")
        return sel_items.guids

    def _get_toplist_index(self, item_text: Optional[str]) -> int:
        """Get Position of Item in Toplist"""
        try:
            index_pos = Utils.get_index_of_item(
                self.top_list, item_text)
        except ValueError:
            index_pos = 0
        return index_pos

    def _getselect_posts(self,
                         selected_postguids_list: List[str]
                         ) -> List[CleanedPost]:
        selected_posts_list = [self.cleaned_post_dict[x]
                               for x in selected_postguids_list]
        return selected_posts_list

    def get_np_points_guids(self, item: Optional[str] = None,
                            silent: bool = None, sel_all: bool = None
                            ) -> SelItems:
        """Gets numpy array of selected points with latlng containing _item

        Args:
            item: tag, emoji, location; or topic (list of terms)
            silent: if true, no console output (interface mode)

        Returns:
            points: A list of lat/lng points to map
            selected_postguids_list: List of selected post guids
        """
        # no log reporting for selected points
        if silent is None:
            silent = False
        if sel_all is None:
            sel_all = False
        if sel_all:
            # select all post guids
            selected_postguids_list = list()
            for cleaned_post in self.cleaned_post_list:
                selected_postguids_list.append(
                    cleaned_post.guid)
            selected_posts_list = self.cleaned_post_list
        else:
            selected_postguids_list = self._getselect_postguids(
                item, silent=silent)
            # clustering
            if len(selected_postguids_list) < 2:
                # return empty list of points
                return SelItems([], selected_postguids_list)
            selected_posts_list = self._getselect_posts(
                selected_postguids_list)
        # only used for tag clustering,
        # otherwise (photo location clusters),
        # global vars are used (dataframe, points)
        dataframe = pd.DataFrame(selected_posts_list, columns=POST_FIELDS)
        # converts pandas data to numpy array
        # (limit by list of column-names)
        points = dataframe.loc[:, ['lng', 'lat']].to_numpy()
        # only return preview fig without clustering
        return SelItems(points, selected_postguids_list)

    def get_np_points(self, item: str = None, silent: bool = None
                      ) -> np.ndarray:
        """Wrapper that only returns points for _get_np_points_guids"""
        # decide if select all or specific item
        sel_all = bool(item is None)
        sel_items = self.get_np_points_guids(item, silent, sel_all)
        # ndarray.size: Number of elements in the array
        if len(sel_items.points) > 0 and sel_items.points.size:
            return sel_items.points

    def _cluster_points(self, points,
                        min_span_tree: bool = None,
                        preview_mode: bool = None,
                        min_cluster_size: int = None,
                        allow_single_cluster: bool = True):
        """Cluster points using HDBSCAN"""
        if min_span_tree is None:
            min_span_tree = False
        if preview_mode is None:
            preview_mode = False
        if allow_single_cluster is None:
            allow_single_cluster = True
        # conversion to radians for HDBSCAN
        tag_radians_data = np.radians(points)  # pylint: disable=E1111
        if min_cluster_size is None:
            min_cluster_size = max(
                2, int(((len(points))/100)*5))
        # init hdbscan clusterer
        clusterer = hdbscan.HDBSCAN(
            min_cluster_size=min_cluster_size,
            gen_min_span_tree=min_span_tree,
            allow_single_cluster=allow_single_cluster,
            min_samples=1)
        # Start clusterer on different thread
        # to prevent GUI from freezing
        t = threading.Thread(
            target=ClusterGen._fit_cluster,
            args=(clusterer, tag_radians_data),
            group=None,
            name="tm-clustering",
        )
        t.start()
        self.clusterer = cluster_queue.get()

        if self.autoselect_clusters:
            cluster_labels = self.clusterer.labels_
        else:
            cluster_labels = self.clusterer.single_linkage_tree_.get_clusters(
                Utils.get_radians_from_meters(
                    self.cluster_distance), min_cluster_size=2)
        # exit function in case of
        # final processing loop (no figure generating)
        if not preview_mode:
            return cluster_labels, None, None, None
        # verbose reporting if preview mode
        mask_noisy = (cluster_labels == -1)
        number_of_clusters = len(
            np.unique(cluster_labels[~mask_noisy]))  # nopep8 false positive? pylint: disable=E1130
        palette = sns.color_palette("husl", number_of_clusters+1)
        sel_colors = [palette[x] if x >= 0
                      else (0.5, 0.5, 0.5)
                      # for x in clusterer.labels_ ]
                      for x in cluster_labels]
        # return additional information in preview mode
        # for plotting
        return cluster_labels, sel_colors, mask_noisy, number_of_clusters

    def cluster_item(
            self, item: Optional[str],
            preview_mode=None) -> Optional[ClusterResults]:
        """Cluster specific item

        Args:
            item (str): The item to select and cluster
            preview_mode ([type], optional): Defaults to None. If True,
                sel_colors, mask_noisy, number_of_clusters will be returned,
                which can be used as additional information during plot

        Returns:
            clusters: The cluster labels returned from HDBSCAN
            selected_post_guids: All selected post guids for item
            points: numpy.ndarray of selected post coordinates (radians)
            sel_colors: color codes assigned to points for plotting clusters
            mask_noisy: number of clusters that were ambiguous (from HDBSCAN)
            number_of_clusters: number of identified clusters (from HDBSCAN)
        """
        if preview_mode is None:
            preview_mode = False
        sel_items = self.get_np_points_guids(
            item=item, silent=preview_mode)

        if len(sel_items.guids) < 2:
            # no need to cluster
            return None
        (clusters, sel_colors,
         mask_noisy, number_of_clusters) = self._cluster_points(
             points=sel_items.points, preview_mode=preview_mode)
        return ClusterResults(
            clusters, sel_items.guids,
            sel_items.points, sel_colors, mask_noisy, number_of_clusters)

    def _cluster_all_items(self):
        """Cluster all items (e.g. all locations)"""
        sel_items = self.get_np_points_guids(
            silent=False, sel_all=True)
        # min_cluster_size = 2 (LOCATIONS)
        # do not allow clusters with one item
        if len(sel_items.guids) < 2:
            return
        cluster_labels, _, _, _ = self._cluster_points(
            points=sel_items.points, preview_mode=False,
            min_cluster_size=2, allow_single_cluster=False)
        return ClusterResults(cluster_labels, sel_items.guids)

    @staticmethod
    def _get_cluster_guids(clusters, selected_post_guids) -> Guids:
        """Returns two lists: clustered and non clustered guids"""
        clustered_guids = list()
        np_selected_post_guids = np.asarray(selected_post_guids)
        mask_noisy = (clusters == -1)
        if len(selected_post_guids) == 1:
            number_of_clusters = 0
        else:
            number_of_clusters = len(np.unique(clusters[~mask_noisy]))
        if number_of_clusters == 0:
            print("--> No cluster.")
            none_clustered_guids = list(np_selected_post_guids)
        else:
            print(f'--> {number_of_clusters} cluster.')
            for cluster_x in range(number_of_clusters):
                current_clustered_guids = np_selected_post_guids[clusters == cluster_x]
                clustered_guids.append(current_clustered_guids)
            none_clustered_guids = list(np_selected_post_guids[clusters == -1])
            # Sort descending based on size of cluster
            # this is needed to later compute HImp Value (1 or 0)
            clustered_guids.sort(key=len, reverse=True)
        return Guids(clustered_guids, none_clustered_guids)

    def _get_update_clusters(self, item: Optional[str] = None,
                             single_items_dict=None,
                             cluster_items_dict=None,
                             itemized: bool = None):
        """Get clusters for items and write results to dicts"""
        if not single_items_dict:
            single_items_dict = self.single_items_dict
        if not cluster_items_dict:
            cluster_items_dict = self.clustered_items_dict
        if itemized is None:
            # default
            itemized = True
        if itemized:
            # clusters guids points colors mask_noisy cluster_count
            cluster = self.cluster_item(item)
        else:
            cluster = self._cluster_all_items()
        if not cluster:
            print("--> No cluster (all locations removed).")
            return
        # get clustered guids/ non-clustered guids
        guids = self._get_cluster_guids(cluster.clusters, cluster.guids)
        if itemized:
            single_items_dict[item] = guids.nonclustered
            if guids.clustered:
                cluster_items_dict[item] = guids.clustered
            # dicts modified in place, no need to return
            return
        else:
            self.clustered_guids_all = guids.clustered
            self.none_clustered_guids = guids.nonclustered

    def get_overall_clusters(self):
        """Get clusters for all items attached to self

        Updates results as two lists:
            self.clustered_guids_all
            self.none_clustered_guids
        """
        # update in case of locations removed
        # self.cleaned_post_list = list(
        #     self.cleaned_post_dict.values())
        self._get_update_clusters(itemized=False)

    def get_itemized_clusters(self):
        """Get itemized clusters for top_list attached to self

        Updates results as two Dict of Lists:
            self.single_items_dict
            self.clustered_items_dict
        """
        # get clusters for top item
        if self.local_saturation_check:
            self._get_update_clusters(
                item=self.top_item.name)  # TODO: test .name
        tnum = 0
        # get remaining clusters
        for item in self.top_list:
            if (self.local_saturation_check and
                    tnum == 0):
                # skip topitem if already
                # clustered due to local saturation
                continue
            tnum += 1
            self._get_update_clusters(
                item=item.name)
        # logging.getLogger("tagmaps").info(
        #    f'{len(self.clustered_items)} '
        #    f'{self.cls_type.rstrip("s")} clusters.\n'
        #    f'{len(self.single_items)} without neighbors.')
        # flush console output once
        sys.stdout.flush()

    def get_all_cluster_centroids(self) -> ClusterShapes:
        """Get all centroids for clustered data

        Returns:
            PreparedStats: Results as named tuple
                        data: shapes and meta information
                        cls_type: ClusterGen [EMOJI, TAGS etc.)
                        itemized: bool
        """

        itemized = False
        cluster_guids = self.clustered_guids_all
        none_clustered_guids = self.none_clustered_guids
        resultshapes_and_meta = self.get_cluster_centroids(
            cluster_guids, none_clustered_guids)
        return ClusterShapes(resultshapes_and_meta, self.cls_type, itemized)

    def get_item_cluster_centroids(self, item, single_clusters=None):
        """Get centroids for item clustered data"""
        if single_clusters is None:
            single_clusters = True
        self._get_update_clusters(
            item=item)
        cluster_guids = self.clustered_items_dict[item]
        if single_clusters:
            none_clustered_guids = self.single_items_dict[item]
        else:
            none_clustered_guids = None
        resultshapes_and_meta = self.get_cluster_centroids(
            cluster_guids, none_clustered_guids)
        return resultshapes_and_meta

    def _proj_coords(self, lng: float, lat: float):
        """Project coordinates based on available packages

        pyproj.transformer needs pyproj > 2.0.0,
        which provides a more convenient and faster way to
        project many coordinates.
        """
        lng_proj, lat_proj = self.proj_transformer.transform(
            lng, lat)
        return lng_proj, lat_proj

    def get_cluster_centroids(
            self, clustered_guids,
            none_clustered_guids=None) -> List[Tuple[geometry.Point, int]]:
        """Get centroids for clustered data

        This method needs refactor, since it produces as sparse version of
        AlphaShapesAndMeta (only geometry and user_count) -> create specific
        dataclass
        """
        resultshapes_and_meta = list()
        for post_cluster in clustered_guids:
            posts = [self.cleaned_post_dict[x] for x in post_cluster]
            unique_user_count = len(set([post.user_guid for post in posts]))
            # get points and project coordinates to suitable UTM
            points = [geometry.Point(
                self._proj_coords(post.lng, post.lat)
            ) for post in posts]
            point_collection = geometry.MultiPoint(list(points))
            # convex hull enough for calculating centroid
            result_polygon = point_collection.convex_hull
            result_centroid = result_polygon.centroid
            if result_centroid is not None and not result_centroid.is_empty:
                resultshapes_and_meta.append(
                    (result_centroid, unique_user_count)
                )
        if not none_clustered_guids:
            return resultshapes_and_meta
        # noclusterphotos = [cleanedPhotoDict[x] for x in singlePhotoGuidList]
        for no_cluster_post in none_clustered_guids:
            post = self.cleaned_post_dict[no_cluster_post]
            x_point, y_point = self._proj_coords(
                post.lng, post.lat)
            p_center = geometry.Point(x_point, y_point)
            if p_center is not None and not p_center.is_empty:
                resultshapes_and_meta.append((p_center, 1))
        sys.stdout.flush()
        # log.debug(f'{resultshapes_and_meta[:10]}')
        return resultshapes_and_meta

    def _get_item_clustershapes(
            self,
            item: ItemCounter,
            cluster_guids=None) -> AlphaShapesArea:
        """Get Cluster Shapes from a list of coordinates
        for a given item"""
        if cluster_guids is None:
            cluster_guids = self.clustered_items_dict.get(
                item.name, None)
        if not cluster_guids:
            return AlphaShapesArea(None, 0)
        alphashapes_data = AlphaShapes.get_cluster_shape(
            item=item,
            clustered_post_guids=cluster_guids,
            cleaned_post_dict=self.cleaned_post_dict,
            cluster_distance=self.cluster_distance,
            local_saturation_check=self.local_saturation_check,
            proj_coords=self._proj_coords)
        return alphashapes_data

    def _get_item_clusterarea(
            self,
            item: ItemCounter) -> float:
        """Wrapper: only get cluster shape area for item"""
        alphashape_data = self._get_item_clustershapes(item)
        return alphashape_data.item_area

    @staticmethod
    def _is_saturated_item(
            item_area: float,
            topitem_area: float):
        """Skip item entirely if saturated, i.e.
        if total area > 80%
        of top item cluster area

        Args:
            item_area: item cluster area
            topitem_area: top item cluster area
        """
        local_saturation = item_area/(topitem_area/100)
        # print("Local Saturation for Tag " + self.top_item "
        #       "+ ": " + str(round(localSaturation,0)))
        if local_saturation > 60:
            return True
        else:
            return False

    def _get_item_shapeslist(
            self, item, topitem_area,
            tnum) -> Optional[List[List[AlphaShapesAndMeta]]]:
        """Get all item shapes for item clusters

        Note: A function ref to self._proj_coords is handed
        to AlphaShapes.get_single_cluster_shape(). Coordinates are then
        projected inside AlphaShapes Class, depending on the pyproj version.
        """
        resultshapes_and_meta_tmp = list()
        result = self._get_item_clustershapes(item)
        shapes_tmp = result.alphashape
        item_area = result.item_area
        if (self.local_saturation_check
                and item_area != 0
                and tnum != 1):
            if self._is_saturated_item(item_area,
                                       topitem_area):
                # next item
                return None
        # append result
        if shapes_tmp:
            resultshapes_and_meta_tmp.extend(
                shapes_tmp)
        # get shapes for single items (non-clustered)
        none_clustered_guids = self.single_items_dict.get(
            item.name, None)
        if not none_clustered_guids:
            return resultshapes_and_meta_tmp
        posts = [self.cleaned_post_dict[x]
                 for x in none_clustered_guids]
        for single_post in posts:
            shapes_single_tmp = AlphaShapes.get_single_cluster_shape(
                item, single_post, self.cluster_distance,
                self._proj_coords)
            if not shapes_single_tmp:
                continue
            # Use append, since always single Tuple
            resultshapes_and_meta_tmp.append(
                shapes_single_tmp)
        return resultshapes_and_meta_tmp

    def get_cluster_shapes(self):
        """For each cluster of points,
        calculate boundary shape and
        add statistics (HImpTag etc.)

        Returns results as shapes_and_meta
        = list(), ClusterType, itemized = bool
        """
        itemized = True
        saturation_exclude_count = 0
        shapes_and_meta = list()
        tnum = 0
        topitem_area = None
        if self.local_saturation_check and self.top_item:
            # calculate total area of Top1-Tag
            # for 80% saturation check for lower level tags
            topitem_area = self._get_item_clusterarea(
                self.top_item)
            if topitem_area == 0:
                raise ValueError(
                    f'Something went wrong: '
                    f'Could not get area for Top item '
                    f'{self.top_item}')
        for item in self.top_list:
            tnum += 1
            shapes_tmp = self._get_item_shapeslist(
                item, topitem_area, tnum)
            if shapes_tmp is None:
                saturation_exclude_count += 1
                continue
            if not shapes_tmp:
                continue
            shapes_and_meta.extend(shapes_tmp)
        logging.getLogger("tagmaps").info(
            f'{len(shapes_and_meta)} '
            f'alpha shapes. Done.')
        if saturation_exclude_count > 0:
            logging.getLogger("tagmaps").info(
                f'Excluded {saturation_exclude_count} '
                f'{self.cls_type.rstrip("s")} on local saturation check.')
        return shapes_and_meta, self.cls_type, itemized

    @staticmethod
    @store_in_queue
    def _fit_cluster(clusterer, data):
        """Perform HDBSCAN clustering from features or distance matrix.

        Args:
            clusterer ([type]): HDBScan clusterer
            data ([type]): A feature array (points)

        Returns:
            [type]: Clusterer
        """

        clusterer.fit(data)
        return clusterer

    @CGDec.input_topic_format
    def get_sel_preview(self, item):
        """Returns plt map for item selection preview"""
        points = self.get_np_points(
            item=item,
            silent=True)
        fig = TPLT.get_sel_preview(
            points, item, self.bounds, self.cls_type)
        return fig

    @CGDec.input_topic_format
    def get_cluster_centroid_data(
            self, item, zipped=None, projected=None, single_clusters=None):
        """Returns centroids for cluster selection based on item

        Args:
            item (str or list of str): Item to be selected
            zipped ([type], optional): Will merge centroids and user_count,
                                       defaults to False
            projected (bool, optional): Will return projected data (UTM),
                                        otherwise, centroids are returned
                                        in decimal degrees (WGS1984),
                                        defaults to False
            single_clusters: Return single item cluster centroids,
                                        defaults to True

        Returns:
            Tuple: [0] point (List of coordinate pairs),
                   [1] user_count (count of user_count per centroid)
        """
        if zipped is None:
            zipped = False
        if projected is None:
            projected = False
        if single_clusters is None:
            single_clusters = True
        shapes = self.get_item_cluster_centroids(
            item=item, single_clusters=single_clusters)
        points = [meta[0] for meta in shapes]
        user_count = [meta[1] for meta in shapes]
        if not projected:
            # AlphaShapes automatically projects data
            # to compute shapes. If no projection is
            # requested, we have to convert it back to
            # original WGS1984 decimal degrees data
            points = self._project_centroids_back(points)
        # extract centroid coordinates from
        # shapely geometry.Point
        latlng_list = [[point.x, point.y] for point in points]
        # convert coords to numpy.nd array
        points = np.array(latlng_list)
        if zipped:
            zip_list = []
            zip_list = list()
            x_id = 0
            for point in points:
                zip_list.append((point[0], point[1], user_count[x_id]))
                x_id += 1
            result = np.asarray(zip_list)
        else:
            result = (points, user_count)
        return result

    @CGDec.input_topic_format
    def get_cluster_centroid_preview(
            self, item, single_clusters=None) -> plt.figure:
        """Returns plt map for item selection cluster centroids"""
        if single_clusters is None:
            single_clusters = True
        points, user_count = self.get_cluster_centroid_data(
            item=item, single_clusters=single_clusters)
        fig = TPLT.get_centroid_preview(
            points, item, self.bounds, self.cls_type, user_count)
        return fig

    @CGDec.input_topic_format
    def get_cluster_preview(self, item) -> plt.figure:
        """Returns plt map for item cluster preview"""
        points = self.get_np_points(
            item=item,
            silent=True)
        self._cluster_points(
            points=points,
            preview_mode=True)

        clusters = self.cluster_item(
            item=item,
            preview_mode=True)
        if clusters is None:
            return
        fig = TPLT.get_cluster_preview(
            points=clusters.points, sel_colors=clusters.colors, item_text=item,
            bounds=self.bounds, mask_noisy=clusters.mask_noisy,
            cluster_distance=self.cluster_distance,
            number_of_clusters=clusters.cluster_count,
            auto_select_clusters=self.autoselect_clusters,
            cls_type=self.cls_type)
        return fig

    @CGDec.input_topic_format
    def get_clustershapes_preview(self, item) -> plt.figure:
        """Returns plt map for item cluster preview"""
        # selected post guids: all posts for item
        # points: numpy-points for plotting
        # clusters: hdbscan labels for clustered items
        item = ItemCounter(item, 0)
        result = self.cluster_item(
            item=item.name,
            preview_mode=True)
        if result is None:
            return print("No items found.")
        # cluster_guids: those guids that are clustered
        cluster_guids = self._get_cluster_guids(
            result.clusters, result.guids)

        shapes_and_area = self._get_item_clustershapes(
            item, cluster_guids.clustered)
        # get only shapely shapes, not usercount and other info

        shapes = [meta.shape for meta in shapes_and_area.alphashape]
        shapes_wgs = self._project_centroids_back(shapes)
        fig = TPLT.get_cluster_preview(
            points=result.points, sel_colors=result.colors, item_text=item.name,
            bounds=self.bounds, mask_noisy=result.mask_noisy,
            cluster_distance=self.cluster_distance,
            number_of_clusters=result.cluster_count,
            auto_select_clusters=self.autoselect_clusters,
            shapes=shapes_wgs, cls_type=self.cls_type)
        return fig

    def _project_centroids_back(self, shapes):
        """Proj shapes back to WGS1984 for plotting in matplotlib

        simple list comprehension with projection:
        """
        project = self.proj_transformer_back
        shapes_wgs = [(ClusterGen._project_geometry(
            shape, project)) for shape in shapes]
        return shapes_wgs

    @staticmethod
    def _project_geometry(geom_shape, project):
        # geom_shape_proj = project.transform(geom_shape)
        geom_shape_proj = transform(project.transform, geom_shape)
        return geom_shape_proj

    def get_singlelinkagetree_preview(self, item):
        """Returns figure for single linkage tree from HDBSCAN clustering"""
        if self.cls_type == TOPICS:
            item = Utils.concat_topic(item)
        cluster_results = self.cluster_item(
            item=item,
            preview_mode=True)
        axis = self.clusterer.single_linkage_tree_.plot(
            truncate_mode='lastp',
            p=max(50, min(cluster_results.cluster_count*10, 256)))
        fig = TPLT.get_single_linkage_tree_preview(
            item, axis.figure, self.cluster_distance,
            self.cls_type)
        return fig

Functions

def store_in_queue(f)

Decorator to store function in threaded queue

Expand source code
def store_in_queue(f):
    """Decorator to store function in threaded queue"""
    def wrapper(*args):
        cluster_queue.put(f(*args))
    return wrapper

Classes

class ClusterGen (bounds: AnalysisBounds, cleaned_post_dict: Optional[Dict[str, CleanedPost]], cleaned_post_list: Optional[List[CleanedPost]], top_list: List[ItemCounter], total_distinct_locations: int, cluster_type: str = 'Tags', local_saturation_check: bool = False)

Cluster methods for tags, emoji and post locations

Note that there are three different projections used here:

  1. Input: Original Data in Decimal Degrees (WGS1984)
  2. Intermediate: Radians data converted from Decimal Degrees with np.radians(points) for use in HDBSCAN clustering
  3. Output: Projected coordinates based on auto-selected UTM Zone, for calculating Alpha Shapes and writing results to shapefile
Expand source code
class ClusterGen():
    """Cluster methods for tags, emoji and post locations

    Note that there are three different projections used here:

    1. Input: Original Data in Decimal Degrees (WGS1984)
    2. Intermediate: Radians data converted from Decimal Degrees with
    np.radians(points) for use in HDBSCAN clustering
    3. Output: Projected coordinates based on auto-selected UTM Zone,
    for calculating Alpha Shapes and writing results to
    shapefile
    """
    class CGDec():
        """Decorators for class CG methods"""
        @staticmethod
        def input_topic_format(func):
            """Check if cluster type is topic and if,
                concat item list to string."""
            @wraps(func)
            def _wrapper(self, item, **kwargs):
                if self.cls_type == TOPICS:
                    if isinstance(item, list):
                        item = Utils.concat_topic(item)
                    elif not '-' in item:
                        raise ValueError(
                            "Please supply either list of terms, or"
                            "concatenate terms with '-' character.")
                return func(self, item, **kwargs)
            return _wrapper

    def __init__(self, bounds: AnalysisBounds,
                 cleaned_post_dict: Optional[Dict[str, CleanedPost]],
                 cleaned_post_list: Optional[List[CleanedPost]],
                 top_list: List[ItemCounter],
                 total_distinct_locations: int,
                 cluster_type: str = TAGS,
                 local_saturation_check: bool = False):
        self.cls_type = cluster_type
        self.bounds = bounds
        self.cluster_distance: float = ClusterGen._init_cluster_dist(
            self.bounds, self.cls_type)
        self.cleaned_post_dict = cleaned_post_dict
        self.cleaned_post_list = cleaned_post_list
        self.top_list = top_list
        self.top_item: Optional[ItemCounter]
        if self.top_list:
            self.top_item = top_list[0]  # TODO: check 2nd [0]
        else:
            self.top_item = None
        self.total_distinct_locations = total_distinct_locations
        self.autoselect_clusters = False  # no cluster distance needed
        self.clusterer = None
        self.local_saturation_check = local_saturation_check
        # storing cluster results:
        self.single_items_dict = defaultdict(list)
        self.clustered_items_dict = defaultdict(list)
        self.clustered_guids_all: List[str] = list()
        self.none_clustered_guids: List[str] = list()
        # get initial analysis bounds in Decimal Degrees
        # for calculating output UTM Zone Projection
        self._update_bounds()
        self.bound_points_shapely = Utils.get_shapely_bounds(
            self.bounds)
        # verify that PROJ_LIB exists,
        # only necessary for pyproj < 2.0.0
        # Utils.set_proj_dir()
        # input data always in lat/lng WGS1984
        # define input and UTM projections
        self.crs_wgs = "epsg:4326"
        self.crs_proj, __ = Utils.get_best_utmzone(
            self.bound_points_shapely)
        # define projection function ahead
        # for reasons of speed
        # always_xy ensures traditional order of
        # coordinates (lng, lat), see also:
        # https://gis.stackexchange.com/a/326919/33092
        self.proj_transformer = Transformer.from_crs(
            self.crs_wgs, self.crs_proj, always_xy=True)
        self.proj_transformer_back = Transformer.from_crs(
            self.crs_proj, self.crs_wgs, always_xy=True)

    @classmethod
    def new_clusterer(cls,
                      cls_type: str,
                      bounds: AnalysisBounds,
                      cleaned_post_dict: Optional[Dict[str, CleanedPost]],
                      cleaned_post_list: Optional[List[CleanedPost]],
                      cleaned_stats: Optional[Dict[str, PreparedStats]],
                      local_saturation_check: bool):
        """Create new clusterer from type and input data

        Args:
            cls_type (ClusterType): Either TAGS,
                LOCATIONS, TOPICS or EMOJI
            bounds (LoadData.AnalysisBounds): Analaysis spatial boundary
            cleaned_post_dict (Dict[str, CleanedPost]): Dict of cleaned posts
            prepared_data (LoadData.PreparedData): Statistics data

        Returns:
            clusterer (ClusterGen): A new clusterer of ClusterType
        """
        cls_cleaned_stats = cleaned_stats.get(cls_type)
        if not cls_cleaned_stats:
            raise ValueError("Cleaned_stats not initialized")
        clusterer = cls(
            bounds=bounds,
            cleaned_post_dict=cleaned_post_dict,
            cleaned_post_list=cleaned_post_list,
            top_list=cls_cleaned_stats.top_items_list,
            total_distinct_locations=cleaned_stats[
                LOCATIONS].total_unique_items,
            cluster_type=cls_type,
            local_saturation_check=local_saturation_check)
        return clusterer

    @staticmethod
    def _init_cluster_dist(bounds: AnalysisBounds,
                           cls_type: str) -> float:
        """Get initial cluster distance from analysis bounds.

        - 7% of research area width/height (max) = optimal
        - default value #223.245922725 #= 0.000035 radians dist
        """
        dist_y = Utils.haversine(bounds.lim_lng_min,
                                 bounds.lim_lat_min,
                                 bounds.lim_lng_min,
                                 bounds.lim_lat_max)
        dist_x = Utils.haversine(bounds.lim_lng_min,
                                 bounds.lim_lat_min,
                                 bounds.lim_lng_max,
                                 bounds.lim_lat_min)
        cluster_distance = (min(dist_x, dist_y)/100)*7
        if cls_type == LOCATIONS:
            # since location clustering includes
            # all data, use reduced default distance
            cluster_distance = cluster_distance/8
        return cluster_distance

    def _update_bounds(self):
        """Update analysis rectangle boundary based on

        cleaned posts list."""

        dataframe = pd.DataFrame(self.cleaned_post_list, columns=POST_FIELDS)
        # get columns lng, lat
        # convert to numpy ndarray
        # (List of [lng, lat] lists)
        points = dataframe.loc[:, ['lng', 'lat']].to_numpy()
        (self.bounds.lim_lat_min,
         self.bounds.lim_lat_max,
         self.bounds.lim_lng_min,
         self.bounds.lim_lng_max) = Utils.get_rectangle_bounds(points)

    def _select_postguids(self, item: Optional[str]) -> SelectedItems:
        """Select all posts that have a specific item

        Args:
            item: tag, emoji, location

        Returns:
            selected_items: list of post_guids and
                            number of distinct locations
        """
        distinct_localloc_count = set()
        selected_postguids_list = list()
        for cleaned_post_location in self.cleaned_post_list:
            if self.cls_type == TAGS:
                self._filter_tags(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == EMOJI:
                self._filter_emoji(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == LOCATIONS:
                self._filter_locations(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            elif self.cls_type == TOPICS:
                self._filter_topics(
                    item, cleaned_post_location,
                    selected_postguids_list,
                    distinct_localloc_count)
            else:
                raise ValueError(f"Clusterer {self.cls_type} unknown.")
        selected_items = SelectedItems(
            selected_postguids_list, len(distinct_localloc_count))
        return selected_items

    @staticmethod
    def _filter_tags(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if (item in (cleaned_photo_location.hashtags) or
                (item in cleaned_photo_location.post_body)):
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _filter_topics(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        """Check topics against tags, body and emoji"""
        item_list = Utils.split_topic(item)
        if (ClusterGen._compare_anyinlist(
                item_list, cleaned_photo_location.hashtags)
                or ClusterGen._compare_anyinlist(
                    item_list, cleaned_photo_location.post_body)
                or ClusterGen._compare_anyinlist(
                    item_list, cleaned_photo_location.emoji)):
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _compare_anyinlist(items, item_list):
        """Check if any term of topic is in list"""
        if any(x in items for x in item_list):
            return True
        return False

    @staticmethod
    def _filter_emoji(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if item in cleaned_photo_location.emoji:
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    @staticmethod
    def _filter_locations(
            item: Optional[str],
            cleaned_photo_location: CleanedPost,
            selected_postguids_list: List[str],
            distinct_localloc_count: Set[str]):
        if item == cleaned_photo_location.loc_id:
            selected_postguids_list.append(
                cleaned_photo_location.guid)
            distinct_localloc_count.add(
                cleaned_photo_location.loc_id)

    def _getselect_postguids(self, item: Optional[str],
                             silent: bool = True) -> List[str]:
        """Get list of post guids with specific item

        Args:
            item: tag, emoji, location
        """
        sel_items = self._select_postguids(item)
        if silent:
            return sel_items.guids
        # console reporting
        if self.cls_type == EMOJI:
            item_text = Utils.get_emojiname(item)
        else:
            item_text = item
        type_text = self.cls_type.rstrip('s')
        perc_oftotal_locations = (
            sel_items.location_count /
            (self.total_distinct_locations/100)
        )
        perc_text = ""
        if perc_oftotal_locations >= 1:
            perc_text = (f'(found in {perc_oftotal_locations:.0f}% '
                         f'of DLC in area)')
        item_index_pos = self._get_toplist_index(item) + 1
        print(f"({item_index_pos} of {len(self.top_list)}) "
              f"Found {len(sel_items.guids)} posts (UPL) "
              f"for {type_text} '{item_text}' "
              f"{perc_text}", end=" ")
        return sel_items.guids

    def _get_toplist_index(self, item_text: Optional[str]) -> int:
        """Get Position of Item in Toplist"""
        try:
            index_pos = Utils.get_index_of_item(
                self.top_list, item_text)
        except ValueError:
            index_pos = 0
        return index_pos

    def _getselect_posts(self,
                         selected_postguids_list: List[str]
                         ) -> List[CleanedPost]:
        selected_posts_list = [self.cleaned_post_dict[x]
                               for x in selected_postguids_list]
        return selected_posts_list

    def get_np_points_guids(self, item: Optional[str] = None,
                            silent: bool = None, sel_all: bool = None
                            ) -> SelItems:
        """Gets numpy array of selected points with latlng containing _item

        Args:
            item: tag, emoji, location; or topic (list of terms)
            silent: if true, no console output (interface mode)

        Returns:
            points: A list of lat/lng points to map
            selected_postguids_list: List of selected post guids
        """
        # no log reporting for selected points
        if silent is None:
            silent = False
        if sel_all is None:
            sel_all = False
        if sel_all:
            # select all post guids
            selected_postguids_list = list()
            for cleaned_post in self.cleaned_post_list:
                selected_postguids_list.append(
                    cleaned_post.guid)
            selected_posts_list = self.cleaned_post_list
        else:
            selected_postguids_list = self._getselect_postguids(
                item, silent=silent)
            # clustering
            if len(selected_postguids_list) < 2:
                # return empty list of points
                return SelItems([], selected_postguids_list)
            selected_posts_list = self._getselect_posts(
                selected_postguids_list)
        # only used for tag clustering,
        # otherwise (photo location clusters),
        # global vars are used (dataframe, points)
        dataframe = pd.DataFrame(selected_posts_list, columns=POST_FIELDS)
        # converts pandas data to numpy array
        # (limit by list of column-names)
        points = dataframe.loc[:, ['lng', 'lat']].to_numpy()
        # only return preview fig without clustering
        return SelItems(points, selected_postguids_list)

    def get_np_points(self, item: str = None, silent: bool = None
                      ) -> np.ndarray:
        """Wrapper that only returns points for _get_np_points_guids"""
        # decide if select all or specific item
        sel_all = bool(item is None)
        sel_items = self.get_np_points_guids(item, silent, sel_all)
        # ndarray.size: Number of elements in the array
        if len(sel_items.points) > 0 and sel_items.points.size:
            return sel_items.points

    def _cluster_points(self, points,
                        min_span_tree: bool = None,
                        preview_mode: bool = None,
                        min_cluster_size: int = None,
                        allow_single_cluster: bool = True):
        """Cluster points using HDBSCAN"""
        if min_span_tree is None:
            min_span_tree = False
        if preview_mode is None:
            preview_mode = False
        if allow_single_cluster is None:
            allow_single_cluster = True
        # conversion to radians for HDBSCAN
        tag_radians_data = np.radians(points)  # pylint: disable=E1111
        if min_cluster_size is None:
            min_cluster_size = max(
                2, int(((len(points))/100)*5))
        # init hdbscan clusterer
        clusterer = hdbscan.HDBSCAN(
            min_cluster_size=min_cluster_size,
            gen_min_span_tree=min_span_tree,
            allow_single_cluster=allow_single_cluster,
            min_samples=1)
        # Start clusterer on different thread
        # to prevent GUI from freezing
        t = threading.Thread(
            target=ClusterGen._fit_cluster,
            args=(clusterer, tag_radians_data),
            group=None,
            name="tm-clustering",
        )
        t.start()
        self.clusterer = cluster_queue.get()

        if self.autoselect_clusters:
            cluster_labels = self.clusterer.labels_
        else:
            cluster_labels = self.clusterer.single_linkage_tree_.get_clusters(
                Utils.get_radians_from_meters(
                    self.cluster_distance), min_cluster_size=2)
        # exit function in case of
        # final processing loop (no figure generating)
        if not preview_mode:
            return cluster_labels, None, None, None
        # verbose reporting if preview mode
        mask_noisy = (cluster_labels == -1)
        number_of_clusters = len(
            np.unique(cluster_labels[~mask_noisy]))  # nopep8 false positive? pylint: disable=E1130
        palette = sns.color_palette("husl", number_of_clusters+1)
        sel_colors = [palette[x] if x >= 0
                      else (0.5, 0.5, 0.5)
                      # for x in clusterer.labels_ ]
                      for x in cluster_labels]
        # return additional information in preview mode
        # for plotting
        return cluster_labels, sel_colors, mask_noisy, number_of_clusters

    def cluster_item(
            self, item: Optional[str],
            preview_mode=None) -> Optional[ClusterResults]:
        """Cluster specific item

        Args:
            item (str): The item to select and cluster
            preview_mode ([type], optional): Defaults to None. If True,
                sel_colors, mask_noisy, number_of_clusters will be returned,
                which can be used as additional information during plot

        Returns:
            clusters: The cluster labels returned from HDBSCAN
            selected_post_guids: All selected post guids for item
            points: numpy.ndarray of selected post coordinates (radians)
            sel_colors: color codes assigned to points for plotting clusters
            mask_noisy: number of clusters that were ambiguous (from HDBSCAN)
            number_of_clusters: number of identified clusters (from HDBSCAN)
        """
        if preview_mode is None:
            preview_mode = False
        sel_items = self.get_np_points_guids(
            item=item, silent=preview_mode)

        if len(sel_items.guids) < 2:
            # no need to cluster
            return None
        (clusters, sel_colors,
         mask_noisy, number_of_clusters) = self._cluster_points(
             points=sel_items.points, preview_mode=preview_mode)
        return ClusterResults(
            clusters, sel_items.guids,
            sel_items.points, sel_colors, mask_noisy, number_of_clusters)

    def _cluster_all_items(self):
        """Cluster all items (e.g. all locations)"""
        sel_items = self.get_np_points_guids(
            silent=False, sel_all=True)
        # min_cluster_size = 2 (LOCATIONS)
        # do not allow clusters with one item
        if len(sel_items.guids) < 2:
            return
        cluster_labels, _, _, _ = self._cluster_points(
            points=sel_items.points, preview_mode=False,
            min_cluster_size=2, allow_single_cluster=False)
        return ClusterResults(cluster_labels, sel_items.guids)

    @staticmethod
    def _get_cluster_guids(clusters, selected_post_guids) -> Guids:
        """Returns two lists: clustered and non clustered guids"""
        clustered_guids = list()
        np_selected_post_guids = np.asarray(selected_post_guids)
        mask_noisy = (clusters == -1)
        if len(selected_post_guids) == 1:
            number_of_clusters = 0
        else:
            number_of_clusters = len(np.unique(clusters[~mask_noisy]))
        if number_of_clusters == 0:
            print("--> No cluster.")
            none_clustered_guids = list(np_selected_post_guids)
        else:
            print(f'--> {number_of_clusters} cluster.')
            for cluster_x in range(number_of_clusters):
                current_clustered_guids = np_selected_post_guids[clusters == cluster_x]
                clustered_guids.append(current_clustered_guids)
            none_clustered_guids = list(np_selected_post_guids[clusters == -1])
            # Sort descending based on size of cluster
            # this is needed to later compute HImp Value (1 or 0)
            clustered_guids.sort(key=len, reverse=True)
        return Guids(clustered_guids, none_clustered_guids)

    def _get_update_clusters(self, item: Optional[str] = None,
                             single_items_dict=None,
                             cluster_items_dict=None,
                             itemized: bool = None):
        """Get clusters for items and write results to dicts"""
        if not single_items_dict:
            single_items_dict = self.single_items_dict
        if not cluster_items_dict:
            cluster_items_dict = self.clustered_items_dict
        if itemized is None:
            # default
            itemized = True
        if itemized:
            # clusters guids points colors mask_noisy cluster_count
            cluster = self.cluster_item(item)
        else:
            cluster = self._cluster_all_items()
        if not cluster:
            print("--> No cluster (all locations removed).")
            return
        # get clustered guids/ non-clustered guids
        guids = self._get_cluster_guids(cluster.clusters, cluster.guids)
        if itemized:
            single_items_dict[item] = guids.nonclustered
            if guids.clustered:
                cluster_items_dict[item] = guids.clustered
            # dicts modified in place, no need to return
            return
        else:
            self.clustered_guids_all = guids.clustered
            self.none_clustered_guids = guids.nonclustered

    def get_overall_clusters(self):
        """Get clusters for all items attached to self

        Updates results as two lists:
            self.clustered_guids_all
            self.none_clustered_guids
        """
        # update in case of locations removed
        # self.cleaned_post_list = list(
        #     self.cleaned_post_dict.values())
        self._get_update_clusters(itemized=False)

    def get_itemized_clusters(self):
        """Get itemized clusters for top_list attached to self

        Updates results as two Dict of Lists:
            self.single_items_dict
            self.clustered_items_dict
        """
        # get clusters for top item
        if self.local_saturation_check:
            self._get_update_clusters(
                item=self.top_item.name)  # TODO: test .name
        tnum = 0
        # get remaining clusters
        for item in self.top_list:
            if (self.local_saturation_check and
                    tnum == 0):
                # skip topitem if already
                # clustered due to local saturation
                continue
            tnum += 1
            self._get_update_clusters(
                item=item.name)
        # logging.getLogger("tagmaps").info(
        #    f'{len(self.clustered_items)} '
        #    f'{self.cls_type.rstrip("s")} clusters.\n'
        #    f'{len(self.single_items)} without neighbors.')
        # flush console output once
        sys.stdout.flush()

    def get_all_cluster_centroids(self) -> ClusterShapes:
        """Get all centroids for clustered data

        Returns:
            PreparedStats: Results as named tuple
                        data: shapes and meta information
                        cls_type: ClusterGen [EMOJI, TAGS etc.)
                        itemized: bool
        """

        itemized = False
        cluster_guids = self.clustered_guids_all
        none_clustered_guids = self.none_clustered_guids
        resultshapes_and_meta = self.get_cluster_centroids(
            cluster_guids, none_clustered_guids)
        return ClusterShapes(resultshapes_and_meta, self.cls_type, itemized)

    def get_item_cluster_centroids(self, item, single_clusters=None):
        """Get centroids for item clustered data"""
        if single_clusters is None:
            single_clusters = True
        self._get_update_clusters(
            item=item)
        cluster_guids = self.clustered_items_dict[item]
        if single_clusters:
            none_clustered_guids = self.single_items_dict[item]
        else:
            none_clustered_guids = None
        resultshapes_and_meta = self.get_cluster_centroids(
            cluster_guids, none_clustered_guids)
        return resultshapes_and_meta

    def _proj_coords(self, lng: float, lat: float):
        """Project coordinates based on available packages

        pyproj.transformer needs pyproj > 2.0.0,
        which provides a more convenient and faster way to
        project many coordinates.
        """
        lng_proj, lat_proj = self.proj_transformer.transform(
            lng, lat)
        return lng_proj, lat_proj

    def get_cluster_centroids(
            self, clustered_guids,
            none_clustered_guids=None) -> List[Tuple[geometry.Point, int]]:
        """Get centroids for clustered data

        This method needs refactor, since it produces as sparse version of
        AlphaShapesAndMeta (only geometry and user_count) -> create specific
        dataclass
        """
        resultshapes_and_meta = list()
        for post_cluster in clustered_guids:
            posts = [self.cleaned_post_dict[x] for x in post_cluster]
            unique_user_count = len(set([post.user_guid for post in posts]))
            # get points and project coordinates to suitable UTM
            points = [geometry.Point(
                self._proj_coords(post.lng, post.lat)
            ) for post in posts]
            point_collection = geometry.MultiPoint(list(points))
            # convex hull enough for calculating centroid
            result_polygon = point_collection.convex_hull
            result_centroid = result_polygon.centroid
            if result_centroid is not None and not result_centroid.is_empty:
                resultshapes_and_meta.append(
                    (result_centroid, unique_user_count)
                )
        if not none_clustered_guids:
            return resultshapes_and_meta
        # noclusterphotos = [cleanedPhotoDict[x] for x in singlePhotoGuidList]
        for no_cluster_post in none_clustered_guids:
            post = self.cleaned_post_dict[no_cluster_post]
            x_point, y_point = self._proj_coords(
                post.lng, post.lat)
            p_center = geometry.Point(x_point, y_point)
            if p_center is not None and not p_center.is_empty:
                resultshapes_and_meta.append((p_center, 1))
        sys.stdout.flush()
        # log.debug(f'{resultshapes_and_meta[:10]}')
        return resultshapes_and_meta

    def _get_item_clustershapes(
            self,
            item: ItemCounter,
            cluster_guids=None) -> AlphaShapesArea:
        """Get Cluster Shapes from a list of coordinates
        for a given item"""
        if cluster_guids is None:
            cluster_guids = self.clustered_items_dict.get(
                item.name, None)
        if not cluster_guids:
            return AlphaShapesArea(None, 0)
        alphashapes_data = AlphaShapes.get_cluster_shape(
            item=item,
            clustered_post_guids=cluster_guids,
            cleaned_post_dict=self.cleaned_post_dict,
            cluster_distance=self.cluster_distance,
            local_saturation_check=self.local_saturation_check,
            proj_coords=self._proj_coords)
        return alphashapes_data

    def _get_item_clusterarea(
            self,
            item: ItemCounter) -> float:
        """Wrapper: only get cluster shape area for item"""
        alphashape_data = self._get_item_clustershapes(item)
        return alphashape_data.item_area

    @staticmethod
    def _is_saturated_item(
            item_area: float,
            topitem_area: float):
        """Skip item entirely if saturated, i.e.
        if total area > 80%
        of top item cluster area

        Args:
            item_area: item cluster area
            topitem_area: top item cluster area
        """
        local_saturation = item_area/(topitem_area/100)
        # print("Local Saturation for Tag " + self.top_item "
        #       "+ ": " + str(round(localSaturation,0)))
        if local_saturation > 60:
            return True
        else:
            return False

    def _get_item_shapeslist(
            self, item, topitem_area,
            tnum) -> Optional[List[List[AlphaShapesAndMeta]]]:
        """Get all item shapes for item clusters

        Note: A function ref to self._proj_coords is handed
        to AlphaShapes.get_single_cluster_shape(). Coordinates are then
        projected inside AlphaShapes Class, depending on the pyproj version.
        """
        resultshapes_and_meta_tmp = list()
        result = self._get_item_clustershapes(item)
        shapes_tmp = result.alphashape
        item_area = result.item_area
        if (self.local_saturation_check
                and item_area != 0
                and tnum != 1):
            if self._is_saturated_item(item_area,
                                       topitem_area):
                # next item
                return None
        # append result
        if shapes_tmp:
            resultshapes_and_meta_tmp.extend(
                shapes_tmp)
        # get shapes for single items (non-clustered)
        none_clustered_guids = self.single_items_dict.get(
            item.name, None)
        if not none_clustered_guids:
            return resultshapes_and_meta_tmp
        posts = [self.cleaned_post_dict[x]
                 for x in none_clustered_guids]
        for single_post in posts:
            shapes_single_tmp = AlphaShapes.get_single_cluster_shape(
                item, single_post, self.cluster_distance,
                self._proj_coords)
            if not shapes_single_tmp:
                continue
            # Use append, since always single Tuple
            resultshapes_and_meta_tmp.append(
                shapes_single_tmp)
        return resultshapes_and_meta_tmp

    def get_cluster_shapes(self):
        """For each cluster of points,
        calculate boundary shape and
        add statistics (HImpTag etc.)

        Returns results as shapes_and_meta
        = list(), ClusterType, itemized = bool
        """
        itemized = True
        saturation_exclude_count = 0
        shapes_and_meta = list()
        tnum = 0
        topitem_area = None
        if self.local_saturation_check and self.top_item:
            # calculate total area of Top1-Tag
            # for 80% saturation check for lower level tags
            topitem_area = self._get_item_clusterarea(
                self.top_item)
            if topitem_area == 0:
                raise ValueError(
                    f'Something went wrong: '
                    f'Could not get area for Top item '
                    f'{self.top_item}')
        for item in self.top_list:
            tnum += 1
            shapes_tmp = self._get_item_shapeslist(
                item, topitem_area, tnum)
            if shapes_tmp is None:
                saturation_exclude_count += 1
                continue
            if not shapes_tmp:
                continue
            shapes_and_meta.extend(shapes_tmp)
        logging.getLogger("tagmaps").info(
            f'{len(shapes_and_meta)} '
            f'alpha shapes. Done.')
        if saturation_exclude_count > 0:
            logging.getLogger("tagmaps").info(
                f'Excluded {saturation_exclude_count} '
                f'{self.cls_type.rstrip("s")} on local saturation check.')
        return shapes_and_meta, self.cls_type, itemized

    @staticmethod
    @store_in_queue
    def _fit_cluster(clusterer, data):
        """Perform HDBSCAN clustering from features or distance matrix.

        Args:
            clusterer ([type]): HDBScan clusterer
            data ([type]): A feature array (points)

        Returns:
            [type]: Clusterer
        """

        clusterer.fit(data)
        return clusterer

    @CGDec.input_topic_format
    def get_sel_preview(self, item):
        """Returns plt map for item selection preview"""
        points = self.get_np_points(
            item=item,
            silent=True)
        fig = TPLT.get_sel_preview(
            points, item, self.bounds, self.cls_type)
        return fig

    @CGDec.input_topic_format
    def get_cluster_centroid_data(
            self, item, zipped=None, projected=None, single_clusters=None):
        """Returns centroids for cluster selection based on item

        Args:
            item (str or list of str): Item to be selected
            zipped ([type], optional): Will merge centroids and user_count,
                                       defaults to False
            projected (bool, optional): Will return projected data (UTM),
                                        otherwise, centroids are returned
                                        in decimal degrees (WGS1984),
                                        defaults to False
            single_clusters: Return single item cluster centroids,
                                        defaults to True

        Returns:
            Tuple: [0] point (List of coordinate pairs),
                   [1] user_count (count of user_count per centroid)
        """
        if zipped is None:
            zipped = False
        if projected is None:
            projected = False
        if single_clusters is None:
            single_clusters = True
        shapes = self.get_item_cluster_centroids(
            item=item, single_clusters=single_clusters)
        points = [meta[0] for meta in shapes]
        user_count = [meta[1] for meta in shapes]
        if not projected:
            # AlphaShapes automatically projects data
            # to compute shapes. If no projection is
            # requested, we have to convert it back to
            # original WGS1984 decimal degrees data
            points = self._project_centroids_back(points)
        # extract centroid coordinates from
        # shapely geometry.Point
        latlng_list = [[point.x, point.y] for point in points]
        # convert coords to numpy.nd array
        points = np.array(latlng_list)
        if zipped:
            zip_list = []
            zip_list = list()
            x_id = 0
            for point in points:
                zip_list.append((point[0], point[1], user_count[x_id]))
                x_id += 1
            result = np.asarray(zip_list)
        else:
            result = (points, user_count)
        return result

    @CGDec.input_topic_format
    def get_cluster_centroid_preview(
            self, item, single_clusters=None) -> plt.figure:
        """Returns plt map for item selection cluster centroids"""
        if single_clusters is None:
            single_clusters = True
        points, user_count = self.get_cluster_centroid_data(
            item=item, single_clusters=single_clusters)
        fig = TPLT.get_centroid_preview(
            points, item, self.bounds, self.cls_type, user_count)
        return fig

    @CGDec.input_topic_format
    def get_cluster_preview(self, item) -> plt.figure:
        """Returns plt map for item cluster preview"""
        points = self.get_np_points(
            item=item,
            silent=True)
        self._cluster_points(
            points=points,
            preview_mode=True)

        clusters = self.cluster_item(
            item=item,
            preview_mode=True)
        if clusters is None:
            return
        fig = TPLT.get_cluster_preview(
            points=clusters.points, sel_colors=clusters.colors, item_text=item,
            bounds=self.bounds, mask_noisy=clusters.mask_noisy,
            cluster_distance=self.cluster_distance,
            number_of_clusters=clusters.cluster_count,
            auto_select_clusters=self.autoselect_clusters,
            cls_type=self.cls_type)
        return fig

    @CGDec.input_topic_format
    def get_clustershapes_preview(self, item) -> plt.figure:
        """Returns plt map for item cluster preview"""
        # selected post guids: all posts for item
        # points: numpy-points for plotting
        # clusters: hdbscan labels for clustered items
        item = ItemCounter(item, 0)
        result = self.cluster_item(
            item=item.name,
            preview_mode=True)
        if result is None:
            return print("No items found.")
        # cluster_guids: those guids that are clustered
        cluster_guids = self._get_cluster_guids(
            result.clusters, result.guids)

        shapes_and_area = self._get_item_clustershapes(
            item, cluster_guids.clustered)
        # get only shapely shapes, not usercount and other info

        shapes = [meta.shape for meta in shapes_and_area.alphashape]
        shapes_wgs = self._project_centroids_back(shapes)
        fig = TPLT.get_cluster_preview(
            points=result.points, sel_colors=result.colors, item_text=item.name,
            bounds=self.bounds, mask_noisy=result.mask_noisy,
            cluster_distance=self.cluster_distance,
            number_of_clusters=result.cluster_count,
            auto_select_clusters=self.autoselect_clusters,
            shapes=shapes_wgs, cls_type=self.cls_type)
        return fig

    def _project_centroids_back(self, shapes):
        """Proj shapes back to WGS1984 for plotting in matplotlib

        simple list comprehension with projection:
        """
        project = self.proj_transformer_back
        shapes_wgs = [(ClusterGen._project_geometry(
            shape, project)) for shape in shapes]
        return shapes_wgs

    @staticmethod
    def _project_geometry(geom_shape, project):
        # geom_shape_proj = project.transform(geom_shape)
        geom_shape_proj = transform(project.transform, geom_shape)
        return geom_shape_proj

    def get_singlelinkagetree_preview(self, item):
        """Returns figure for single linkage tree from HDBSCAN clustering"""
        if self.cls_type == TOPICS:
            item = Utils.concat_topic(item)
        cluster_results = self.cluster_item(
            item=item,
            preview_mode=True)
        axis = self.clusterer.single_linkage_tree_.plot(
            truncate_mode='lastp',
            p=max(50, min(cluster_results.cluster_count*10, 256)))
        fig = TPLT.get_single_linkage_tree_preview(
            item, axis.figure, self.cluster_distance,
            self.cls_type)
        return fig

Class variables

var CGDec

Decorators for class CG methods

Static methods

def new_clusterer(cls_type: str, bounds: AnalysisBounds, cleaned_post_dict: Optional[Dict[str, CleanedPost]], cleaned_post_list: Optional[List[CleanedPost]], cleaned_stats: Optional[Dict[str, PreparedStats]], local_saturation_check: bool)

Create new clusterer from type and input data

Args

cls_type : ClusterType
Either TAGS, LOCATIONS, TOPICS or EMOJI
bounds : LoadData.AnalysisBounds
Analaysis spatial boundary
cleaned_post_dict : Dict[str, CleanedPost]
Dict of cleaned posts
prepared_data : LoadData.PreparedData
Statistics data

Returns

clusterer (ClusterGen): A new clusterer of ClusterType

Expand source code
@classmethod
def new_clusterer(cls,
                  cls_type: str,
                  bounds: AnalysisBounds,
                  cleaned_post_dict: Optional[Dict[str, CleanedPost]],
                  cleaned_post_list: Optional[List[CleanedPost]],
                  cleaned_stats: Optional[Dict[str, PreparedStats]],
                  local_saturation_check: bool):
    """Create new clusterer from type and input data

    Args:
        cls_type (ClusterType): Either TAGS,
            LOCATIONS, TOPICS or EMOJI
        bounds (LoadData.AnalysisBounds): Analaysis spatial boundary
        cleaned_post_dict (Dict[str, CleanedPost]): Dict of cleaned posts
        prepared_data (LoadData.PreparedData): Statistics data

    Returns:
        clusterer (ClusterGen): A new clusterer of ClusterType
    """
    cls_cleaned_stats = cleaned_stats.get(cls_type)
    if not cls_cleaned_stats:
        raise ValueError("Cleaned_stats not initialized")
    clusterer = cls(
        bounds=bounds,
        cleaned_post_dict=cleaned_post_dict,
        cleaned_post_list=cleaned_post_list,
        top_list=cls_cleaned_stats.top_items_list,
        total_distinct_locations=cleaned_stats[
            LOCATIONS].total_unique_items,
        cluster_type=cls_type,
        local_saturation_check=local_saturation_check)
    return clusterer

Methods

def cluster_item(self, item: Optional[str], preview_mode=None) ‑> Optional[ClusterResults]

Cluster specific item

Args

item : str
The item to select and cluster
preview_mode : [type], optional
Defaults to None. If True, sel_colors, mask_noisy, number_of_clusters will be returned, which can be used as additional information during plot

Returns

clusters
The cluster labels returned from HDBSCAN
selected_post_guids
All selected post guids for item
points
numpy.ndarray of selected post coordinates (radians)
sel_colors
color codes assigned to points for plotting clusters
mask_noisy
number of clusters that were ambiguous (from HDBSCAN)
number_of_clusters
number of identified clusters (from HDBSCAN)
Expand source code
def cluster_item(
        self, item: Optional[str],
        preview_mode=None) -> Optional[ClusterResults]:
    """Cluster specific item

    Args:
        item (str): The item to select and cluster
        preview_mode ([type], optional): Defaults to None. If True,
            sel_colors, mask_noisy, number_of_clusters will be returned,
            which can be used as additional information during plot

    Returns:
        clusters: The cluster labels returned from HDBSCAN
        selected_post_guids: All selected post guids for item
        points: numpy.ndarray of selected post coordinates (radians)
        sel_colors: color codes assigned to points for plotting clusters
        mask_noisy: number of clusters that were ambiguous (from HDBSCAN)
        number_of_clusters: number of identified clusters (from HDBSCAN)
    """
    if preview_mode is None:
        preview_mode = False
    sel_items = self.get_np_points_guids(
        item=item, silent=preview_mode)

    if len(sel_items.guids) < 2:
        # no need to cluster
        return None
    (clusters, sel_colors,
     mask_noisy, number_of_clusters) = self._cluster_points(
         points=sel_items.points, preview_mode=preview_mode)
    return ClusterResults(
        clusters, sel_items.guids,
        sel_items.points, sel_colors, mask_noisy, number_of_clusters)
def get_all_cluster_centroids(self) ‑> ClusterShapes

Get all centroids for clustered data

Returns

PreparedStats
Results as named tuple data: shapes and meta information cls_type: ClusterGen [EMOJI, TAGS etc.) itemized: bool
Expand source code
def get_all_cluster_centroids(self) -> ClusterShapes:
    """Get all centroids for clustered data

    Returns:
        PreparedStats: Results as named tuple
                    data: shapes and meta information
                    cls_type: ClusterGen [EMOJI, TAGS etc.)
                    itemized: bool
    """

    itemized = False
    cluster_guids = self.clustered_guids_all
    none_clustered_guids = self.none_clustered_guids
    resultshapes_and_meta = self.get_cluster_centroids(
        cluster_guids, none_clustered_guids)
    return ClusterShapes(resultshapes_and_meta, self.cls_type, itemized)
def get_cluster_centroid_data(self, item, zipped=None, projected=None, single_clusters=None)

Returns centroids for cluster selection based on item

Args

item : str or list of str
Item to be selected
zipped : [type], optional
Will merge centroids and user_count, defaults to False
projected : bool, optional
Will return projected data (UTM), otherwise, centroids are returned in decimal degrees (WGS1984), defaults to False
single_clusters
Return single item cluster centroids, defaults to True

Returns

Tuple
[0] point (List of coordinate pairs), [1] user_count (count of user_count per centroid)
Expand source code
@CGDec.input_topic_format
def get_cluster_centroid_data(
        self, item, zipped=None, projected=None, single_clusters=None):
    """Returns centroids for cluster selection based on item

    Args:
        item (str or list of str): Item to be selected
        zipped ([type], optional): Will merge centroids and user_count,
                                   defaults to False
        projected (bool, optional): Will return projected data (UTM),
                                    otherwise, centroids are returned
                                    in decimal degrees (WGS1984),
                                    defaults to False
        single_clusters: Return single item cluster centroids,
                                    defaults to True

    Returns:
        Tuple: [0] point (List of coordinate pairs),
               [1] user_count (count of user_count per centroid)
    """
    if zipped is None:
        zipped = False
    if projected is None:
        projected = False
    if single_clusters is None:
        single_clusters = True
    shapes = self.get_item_cluster_centroids(
        item=item, single_clusters=single_clusters)
    points = [meta[0] for meta in shapes]
    user_count = [meta[1] for meta in shapes]
    if not projected:
        # AlphaShapes automatically projects data
        # to compute shapes. If no projection is
        # requested, we have to convert it back to
        # original WGS1984 decimal degrees data
        points = self._project_centroids_back(points)
    # extract centroid coordinates from
    # shapely geometry.Point
    latlng_list = [[point.x, point.y] for point in points]
    # convert coords to numpy.nd array
    points = np.array(latlng_list)
    if zipped:
        zip_list = []
        zip_list = list()
        x_id = 0
        for point in points:
            zip_list.append((point[0], point[1], user_count[x_id]))
            x_id += 1
        result = np.asarray(zip_list)
    else:
        result = (points, user_count)
    return result
def get_cluster_centroid_preview(self, item, single_clusters=None) ‑> 

Returns plt map for item selection cluster centroids

Expand source code
@CGDec.input_topic_format
def get_cluster_centroid_preview(
        self, item, single_clusters=None) -> plt.figure:
    """Returns plt map for item selection cluster centroids"""
    if single_clusters is None:
        single_clusters = True
    points, user_count = self.get_cluster_centroid_data(
        item=item, single_clusters=single_clusters)
    fig = TPLT.get_centroid_preview(
        points, item, self.bounds, self.cls_type, user_count)
    return fig
def get_cluster_centroids(self, clustered_guids, none_clustered_guids=None) ‑> List[Tuple[shapely.geometry.point.Point, int]]

Get centroids for clustered data

This method needs refactor, since it produces as sparse version of AlphaShapesAndMeta (only geometry and user_count) -> create specific dataclass

Expand source code
def get_cluster_centroids(
        self, clustered_guids,
        none_clustered_guids=None) -> List[Tuple[geometry.Point, int]]:
    """Get centroids for clustered data

    This method needs refactor, since it produces as sparse version of
    AlphaShapesAndMeta (only geometry and user_count) -> create specific
    dataclass
    """
    resultshapes_and_meta = list()
    for post_cluster in clustered_guids:
        posts = [self.cleaned_post_dict[x] for x in post_cluster]
        unique_user_count = len(set([post.user_guid for post in posts]))
        # get points and project coordinates to suitable UTM
        points = [geometry.Point(
            self._proj_coords(post.lng, post.lat)
        ) for post in posts]
        point_collection = geometry.MultiPoint(list(points))
        # convex hull enough for calculating centroid
        result_polygon = point_collection.convex_hull
        result_centroid = result_polygon.centroid
        if result_centroid is not None and not result_centroid.is_empty:
            resultshapes_and_meta.append(
                (result_centroid, unique_user_count)
            )
    if not none_clustered_guids:
        return resultshapes_and_meta
    # noclusterphotos = [cleanedPhotoDict[x] for x in singlePhotoGuidList]
    for no_cluster_post in none_clustered_guids:
        post = self.cleaned_post_dict[no_cluster_post]
        x_point, y_point = self._proj_coords(
            post.lng, post.lat)
        p_center = geometry.Point(x_point, y_point)
        if p_center is not None and not p_center.is_empty:
            resultshapes_and_meta.append((p_center, 1))
    sys.stdout.flush()
    # log.debug(f'{resultshapes_and_meta[:10]}')
    return resultshapes_and_meta
def get_cluster_preview(self, item) ‑> 

Returns plt map for item cluster preview

Expand source code
@CGDec.input_topic_format
def get_cluster_preview(self, item) -> plt.figure:
    """Returns plt map for item cluster preview"""
    points = self.get_np_points(
        item=item,
        silent=True)
    self._cluster_points(
        points=points,
        preview_mode=True)

    clusters = self.cluster_item(
        item=item,
        preview_mode=True)
    if clusters is None:
        return
    fig = TPLT.get_cluster_preview(
        points=clusters.points, sel_colors=clusters.colors, item_text=item,
        bounds=self.bounds, mask_noisy=clusters.mask_noisy,
        cluster_distance=self.cluster_distance,
        number_of_clusters=clusters.cluster_count,
        auto_select_clusters=self.autoselect_clusters,
        cls_type=self.cls_type)
    return fig
def get_cluster_shapes(self)

For each cluster of points, calculate boundary shape and add statistics (HImpTag etc.)

Returns results as shapes_and_meta = list(), ClusterType, itemized = bool

Expand source code
def get_cluster_shapes(self):
    """For each cluster of points,
    calculate boundary shape and
    add statistics (HImpTag etc.)

    Returns results as shapes_and_meta
    = list(), ClusterType, itemized = bool
    """
    itemized = True
    saturation_exclude_count = 0
    shapes_and_meta = list()
    tnum = 0
    topitem_area = None
    if self.local_saturation_check and self.top_item:
        # calculate total area of Top1-Tag
        # for 80% saturation check for lower level tags
        topitem_area = self._get_item_clusterarea(
            self.top_item)
        if topitem_area == 0:
            raise ValueError(
                f'Something went wrong: '
                f'Could not get area for Top item '
                f'{self.top_item}')
    for item in self.top_list:
        tnum += 1
        shapes_tmp = self._get_item_shapeslist(
            item, topitem_area, tnum)
        if shapes_tmp is None:
            saturation_exclude_count += 1
            continue
        if not shapes_tmp:
            continue
        shapes_and_meta.extend(shapes_tmp)
    logging.getLogger("tagmaps").info(
        f'{len(shapes_and_meta)} '
        f'alpha shapes. Done.')
    if saturation_exclude_count > 0:
        logging.getLogger("tagmaps").info(
            f'Excluded {saturation_exclude_count} '
            f'{self.cls_type.rstrip("s")} on local saturation check.')
    return shapes_and_meta, self.cls_type, itemized
def get_clustershapes_preview(self, item) ‑> 

Returns plt map for item cluster preview

Expand source code
@CGDec.input_topic_format
def get_clustershapes_preview(self, item) -> plt.figure:
    """Returns plt map for item cluster preview"""
    # selected post guids: all posts for item
    # points: numpy-points for plotting
    # clusters: hdbscan labels for clustered items
    item = ItemCounter(item, 0)
    result = self.cluster_item(
        item=item.name,
        preview_mode=True)
    if result is None:
        return print("No items found.")
    # cluster_guids: those guids that are clustered
    cluster_guids = self._get_cluster_guids(
        result.clusters, result.guids)

    shapes_and_area = self._get_item_clustershapes(
        item, cluster_guids.clustered)
    # get only shapely shapes, not usercount and other info

    shapes = [meta.shape for meta in shapes_and_area.alphashape]
    shapes_wgs = self._project_centroids_back(shapes)
    fig = TPLT.get_cluster_preview(
        points=result.points, sel_colors=result.colors, item_text=item.name,
        bounds=self.bounds, mask_noisy=result.mask_noisy,
        cluster_distance=self.cluster_distance,
        number_of_clusters=result.cluster_count,
        auto_select_clusters=self.autoselect_clusters,
        shapes=shapes_wgs, cls_type=self.cls_type)
    return fig
def get_item_cluster_centroids(self, item, single_clusters=None)

Get centroids for item clustered data

Expand source code
def get_item_cluster_centroids(self, item, single_clusters=None):
    """Get centroids for item clustered data"""
    if single_clusters is None:
        single_clusters = True
    self._get_update_clusters(
        item=item)
    cluster_guids = self.clustered_items_dict[item]
    if single_clusters:
        none_clustered_guids = self.single_items_dict[item]
    else:
        none_clustered_guids = None
    resultshapes_and_meta = self.get_cluster_centroids(
        cluster_guids, none_clustered_guids)
    return resultshapes_and_meta
def get_itemized_clusters(self)

Get itemized clusters for top_list attached to self

Updates results as two Dict of Lists: self.single_items_dict self.clustered_items_dict

Expand source code
def get_itemized_clusters(self):
    """Get itemized clusters for top_list attached to self

    Updates results as two Dict of Lists:
        self.single_items_dict
        self.clustered_items_dict
    """
    # get clusters for top item
    if self.local_saturation_check:
        self._get_update_clusters(
            item=self.top_item.name)  # TODO: test .name
    tnum = 0
    # get remaining clusters
    for item in self.top_list:
        if (self.local_saturation_check and
                tnum == 0):
            # skip topitem if already
            # clustered due to local saturation
            continue
        tnum += 1
        self._get_update_clusters(
            item=item.name)
    # logging.getLogger("tagmaps").info(
    #    f'{len(self.clustered_items)} '
    #    f'{self.cls_type.rstrip("s")} clusters.\n'
    #    f'{len(self.single_items)} without neighbors.')
    # flush console output once
    sys.stdout.flush()
def get_np_points(self, item: str = None, silent: bool = None) ‑> numpy.ndarray

Wrapper that only returns points for _get_np_points_guids

Expand source code
def get_np_points(self, item: str = None, silent: bool = None
                  ) -> np.ndarray:
    """Wrapper that only returns points for _get_np_points_guids"""
    # decide if select all or specific item
    sel_all = bool(item is None)
    sel_items = self.get_np_points_guids(item, silent, sel_all)
    # ndarray.size: Number of elements in the array
    if len(sel_items.points) > 0 and sel_items.points.size:
        return sel_items.points
def get_np_points_guids(self, item: Optional[str] = None, silent: bool = None, sel_all: bool = None) ‑> SelItems

Gets numpy array of selected points with latlng containing _item

Args

item
tag, emoji, location; or topic (list of terms)
silent
if true, no console output (interface mode)

Returns

points
A list of lat/lng points to map
selected_postguids_list
List of selected post guids
Expand source code
def get_np_points_guids(self, item: Optional[str] = None,
                        silent: bool = None, sel_all: bool = None
                        ) -> SelItems:
    """Gets numpy array of selected points with latlng containing _item

    Args:
        item: tag, emoji, location; or topic (list of terms)
        silent: if true, no console output (interface mode)

    Returns:
        points: A list of lat/lng points to map
        selected_postguids_list: List of selected post guids
    """
    # no log reporting for selected points
    if silent is None:
        silent = False
    if sel_all is None:
        sel_all = False
    if sel_all:
        # select all post guids
        selected_postguids_list = list()
        for cleaned_post in self.cleaned_post_list:
            selected_postguids_list.append(
                cleaned_post.guid)
        selected_posts_list = self.cleaned_post_list
    else:
        selected_postguids_list = self._getselect_postguids(
            item, silent=silent)
        # clustering
        if len(selected_postguids_list) < 2:
            # return empty list of points
            return SelItems([], selected_postguids_list)
        selected_posts_list = self._getselect_posts(
            selected_postguids_list)
    # only used for tag clustering,
    # otherwise (photo location clusters),
    # global vars are used (dataframe, points)
    dataframe = pd.DataFrame(selected_posts_list, columns=POST_FIELDS)
    # converts pandas data to numpy array
    # (limit by list of column-names)
    points = dataframe.loc[:, ['lng', 'lat']].to_numpy()
    # only return preview fig without clustering
    return SelItems(points, selected_postguids_list)
def get_overall_clusters(self)

Get clusters for all items attached to self

Updates results as two lists: self.clustered_guids_all self.none_clustered_guids

Expand source code
def get_overall_clusters(self):
    """Get clusters for all items attached to self

    Updates results as two lists:
        self.clustered_guids_all
        self.none_clustered_guids
    """
    # update in case of locations removed
    # self.cleaned_post_list = list(
    #     self.cleaned_post_dict.values())
    self._get_update_clusters(itemized=False)
def get_sel_preview(self, item)

Returns plt map for item selection preview

Expand source code
@CGDec.input_topic_format
def get_sel_preview(self, item):
    """Returns plt map for item selection preview"""
    points = self.get_np_points(
        item=item,
        silent=True)
    fig = TPLT.get_sel_preview(
        points, item, self.bounds, self.cls_type)
    return fig
def get_singlelinkagetree_preview(self, item)

Returns figure for single linkage tree from HDBSCAN clustering

Expand source code
def get_singlelinkagetree_preview(self, item):
    """Returns figure for single linkage tree from HDBSCAN clustering"""
    if self.cls_type == TOPICS:
        item = Utils.concat_topic(item)
    cluster_results = self.cluster_item(
        item=item,
        preview_mode=True)
    axis = self.clusterer.single_linkage_tree_.plot(
        truncate_mode='lastp',
        p=max(50, min(cluster_results.cluster_count*10, 256)))
    fig = TPLT.get_single_linkage_tree_preview(
        item, axis.figure, self.cluster_distance,
        self.cls_type)
    return fig
class ClusterResults (clusters: Tuple[numpy.ndarray, Tuple[int, Optional[int]]], guids: List[str], points: Optional[List[numpy.ndarray]] = None, colors: Optional[List[Tuple[float, float, float]]] = None, mask_noisy: Optional[numpy.ndarray] = None, cluster_count: Optional[int] = None)

List of post guids and assigned cluster labels (from HDBSCAN)

Expand source code
@dataclass
class ClusterResults:
    """List of post guids and assigned cluster labels (from HDBSCAN)"""
    clusters: Tuple[np.ndarray, Tuple[int, Optional[int]]]
    guids: List[str]
    points: Optional[List[np.ndarray]] = None
    colors: Optional[List[Tuple[float, float, float]]] = None
    mask_noisy: Optional[np.ndarray] = None
    cluster_count: Optional[int] = None

    def __iter__(self):
        return iter(astuple(self))

Class variables

var cluster_count : Optional[int]
var clusters : Tuple[numpy.ndarray, Tuple[int, Optional[int]]]
var colors : Optional[List[Tuple[float, float, float]]]
var guids : List[str]
var mask_noisy : Optional[numpy.ndarray]
var points : Optional[List[numpy.ndarray]]
class ClusterShapes (data: List[Tuple[shapely.geometry.point.Point, int]], cls_type: str, itemized: bool)

Count of user per cluster centroid

data: List of Tuples with (1) Point = cluster centroid and (2) int = user count cls_type: cluster type (TAGS, EMOJI, ..) itemized: bool False: Overall Location clusters True: Itemoized clusters (TAGS, EMOJI)

Expand source code
@dataclass
class ClusterShapes:
    """Count of user per cluster centroid

    data: List of Tuples with
          (1) Point = cluster centroid and
          (2) int = user count
    cls_type: cluster type (TAGS, EMOJI, ..)
    itemized: bool
              False: Overall Location clusters
              True: Itemoized clusters (TAGS, EMOJI)
    """
    data: List[Tuple[geometry.Point, int]]
    cls_type: str
    itemized: bool

    def __iter__(self):
        return iter(astuple(self))

Class variables

var cls_type : str
var data : List[Tuple[shapely.geometry.point.Point, int]]
var itemized : bool
class Guids (clustered: List[str], nonclustered: List[str])

Guids(clustered: List[str], nonclustered: List[str])

Expand source code
@dataclass
class Guids:
    clustered: List[str]
    nonclustered: List[str]

Class variables

var clustered : List[str]
var nonclustered : List[str]
class SelItems (points: List[Optional[numpy.ndarray]], guids: List[str])

List of coordinates (points) with related post_guids

Expand source code
@dataclass
class SelItems:
    """List of coordinates (points) with related post_guids"""
    points: List[Optional[np.ndarray]]
    guids: List[str]

Class variables

var guids : List[str]
var points : List[Optional[numpy.ndarray]]
class SelectedItems (guids: List[str], location_count: int)

SelectedItems(guids: List[str], location_count: int)

Expand source code
@dataclass
class SelectedItems:
    guids: List[str]
    location_count: int

Class variables

var guids : List[str]
var location_count : int