Alexander Dunkel, TU Dresden, Institute of Cartography; Maximilian Hartmann, Universität Zürich (UZH), Geocomputation
Based on data from Instagram and Flickr, we'll explore global reactions to sunsets and sunrises in these notebooks.
This is the first notebook in a series of nine notebooks:
Use Shift+Enter to walk through the Notebook
Install dependencies
miniconda
in WSL.
conda create -n sunset_env -c conda-forge
conda activate sunset_env
conda config --env --set channel_priority strict
conda config --show channel_priority # verify
# jupyter dependencies
conda install -c conda-forge 'ipywidgets=7.5.*' jupyterlab jupyterlab-git \
jupytext jupyter_contrib_nbextensions jupyter_nbextensions_configurator
# visualization dependencies
conda install -c conda-forge geopandas jupyterlab "geoviews-core=1.9.5" \
descartes mapclassify jupyter_contrib_nbextensions xarray \
colorcet memory_profiler seaborn
# privacy aware dependencies
conda install -c conda-forge python-dotenv psycopg2
conda upgrade -n sunset_env --all -c conda-forge
conda activate sunset_env
jupyter lab
import os
import csv
import sys
import colorcet
import psycopg2 # Postgres API
import geoviews as gv
import holoviews as hv
import mapclassify as mc
import geopandas as gp
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import geoviews.feature as gf
from pathlib import Path
from collections import namedtuple
from pathlib import Path
from typing import List, Tuple, Dict, Optional
from pyproj import Transformer, CRS, Proj
from geoviews import opts
from shapely.geometry import shape, Point, Polygon
from shapely.ops import transform
from cartopy import crs
from matplotlib import colors
from IPython.display import clear_output, display, HTML
from bokeh.models import HoverTool, FuncTickFormatter, FixedTicker
# optionally, enable shapely.speedups
# which makes some of the spatial
# queries running faster
import shapely.speedups as speedups
from shapely import geometry
Load helper module from ../py/module/tools.py
. This also allows us to import code from other jupyter notebooks, synced to *.py
with jupytext.
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
from modules import tools, preparations
Initialize Bokeh and shapely.speedups
preparations.init_imports()
Define initial parameters that affect processing of data and graphics in all notebooks.
GRID_SIZE_METERS = 100000
"""The size of grid cells in meters
(spatial accuracy of worldwide measurement)"""
CHUNK_SIZE = 5000000
"""Process x number of hll records per chunk.
Increasing this number will consume more memory,
but reduce processing time because less SQL queries
are needed."""
EPSG_CODE = 54009
CRS_PROJ = f"esri:{EPSG_CODE}"
"""Target projection: Mollweide (epsg code).
note: Mollweide defined by _esri_
in epsg.io's database"""
CRS_WGS = "epsg:4326"
"""Input projection (Web Mercator)"""
OUTPUT = Path.cwd().parents[0] / "out"
"""Define path to output directory (figures etc.)""";
We want to modify intermediate file names if not using a 100 km grid:
km_size = GRID_SIZE_METERS/1000
if km_size == 100:
km_size_str = ""
else:
km_size_str = f"_{km_size:.0f}km"
Create paths
def create_paths(
output: str = OUTPUT, subfolders: List[str] = ["html", "pickles", "csv", "figures", "svg", "pdf"]):
"""Create subfolder for results to be stored"""
output.mkdir(exist_ok=True)
for subfolder in subfolders:
Path(OUTPUT / f'{subfolder}{km_size_str}').mkdir(exist_ok=True)
create_paths()
Plot used package versions for future use:
The notebooks are synced to markdown and python files, through jupytext, which in turn are managed in a git repository.
The data is exported from hlldb and stored as CSV files.
This notebook is based on the following folder structure:
.
├── 00_hll_data/
| ├── flickr_sunrise_hll.csv
| ├── flickr_sunset_hll.csv
| ├── flickr_all_hll.csv
| ├── instagram_sunrise_hll.csv
| ├── instagram_sunset_hll.csv
| └── instagram_random_hll.csv
|
└── 01_analysis
├──notebooks/
| ├──01_grid_agg.ipynb
| ├──02_visualization.ipynb
| ├──03_chimaps.ipynb
| ├──04_combine.ipynb
| └──...
├──out/
| ├──csv/
| ├──figures/
| ├──html/
| ├──...
| └──pickles/
├──md/
├──py/
├──.git
└── README.md
To make this notebook cross-compatible for Linux and Windows users, use pathlib to format (/
) paths:
root = Path.cwd().parents[1] / "00_hll_data"
SUNRISE_FLICKR = root / "flickr_sunrise_hll.csv"
SUNSET_FLICKR = root / "flickr_sunset_hll.csv"
ALL_FLICKR = root / "flickr_all_hll.csv"
SUNRISE_INSTAGRAM = root / "instagram_sunrise_hll.csv"
SUNSET_INSTAGRAM = root / "instagram_sunset_hll.csv"
RANDOM_INSTAGRAM = root / "instagram_random_hll.csv"
First, some statistics for these files:
Note that HLL-Data is already pre-aggregated using a GeoHash of 5. Thus, the actual number of records is lower than the total number of posts reflected in these HLL sets.
Data structure of sunrise and sunset is the same. Dataset 'Flickr_All_world' differs. Example structure of SUNRISE_INSTAGRAM
(first 3 records):
pd.set_option('display.max_colwidth', 150)
tools.record_preview_hll(SUNRISE_FLICKR)
Define credentials as environment variables and load here
DB_USER = "postgres"
DB_PASS = os.getenv('POSTGRES_PASSWORD')
# set connection variables
DB_HOST = "lbsn-hlldb-sunset"
DB_PORT = "5432"
DB_NAME = "lbsn-hlldb"
Connect to empty Postgres database running HLL Extension:
DB_CONN = psycopg2.connect(
host=DB_HOST,
port=DB_PORT ,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS
)
DB_CONN.set_session(
readonly=True)
DB_CALC = tools.DbConn(
DB_CONN)
CUR_HLL = DB_CONN.cursor()
Test connection:
CUR_HLL.execute("SELECT 1;")
print(CUR_HLL.statusmessage)
# define Transformer ahead of time
# with xy-order of coordinates
PROJ_TRANSFORMER = Transformer.from_crs(
CRS_WGS, CRS_PROJ, always_xy=True)
# also define reverse projection
PROJ_TRANSFORMER_BACK = Transformer.from_crs(
CRS_PROJ, CRS_WGS, always_xy=True)
XMIN = PROJ_TRANSFORMER.transform(
-180, 0)[0]
XMAX = PROJ_TRANSFORMER.transform(
180, 0)[0]
YMAX = PROJ_TRANSFORMER.transform(
0, 90)[1]
YMIN = PROJ_TRANSFORMER.transform(
0, -90)[1]
print(f'Projected bounds: {[XMIN, YMIN, XMAX, YMAX]}')
def create_grid_df(
grid_size: int = GRID_SIZE_METERS,
xmin: float = XMIN, ymin: float = YMIN,
xmax: float = XMAX, ymax: float = YMAX,
report: bool = None,
return_rows_cols: bool = None):
"""Creates dataframe polygon grid based on width and length in Meters"""
width = grid_size
length = grid_size
cols = list(range(int(np.floor(xmin)), int(np.ceil(xmax)), width))
rows = list(range(int(np.floor(ymin)), int(np.ceil(ymax)), length))
if report:
print(len(cols))
print(len(rows))
rows.reverse()
polygons = []
for x in cols:
for y in rows:
# combine to tuple: (x,y, poly)
# and append to list
polygons.append(
(x, y,
Polygon([
(x,y),
(x+width, y),
(x+width, y-length),
(x, y-length)])) )
# create a pandas dataframe
# from list of tuples
grid = pd.DataFrame(polygons)
# name columns
col_labels=['xbin', 'ybin', 'bin_poly']
grid.columns = col_labels
# use x and y as index columns
grid.set_index(['xbin', 'ybin'], inplace=True)
if return_rows_cols:
return grid, rows, cols
return grid
grid, rows, cols = create_grid_df(
grid_size=GRID_SIZE_METERS,
report=True, return_rows_cols=True)
grid.head()
Create a geodataframe from dataframe:
def grid_to_gdf(
grid: pd.DataFrame, crs_proj: str = CRS_PROJ) -> gp.GeoDataFrame:
"""Convert grid pandas DataFrame to geopandas Geodataframe"""
grid = gp.GeoDataFrame(
grid.drop(
columns=["bin_poly"]),
geometry=grid.bin_poly)
grid.crs = crs_proj
return grid
grid = grid_to_gdf(grid)
Add columns for aggregation
def reset_metrics(
grid: gp.GeoDataFrame,
metrics: List[str] = [
"postcount_est", "usercount_est", "userdays_est",
"postcount_hll", "usercount_hll", "userdays_hll"],
setzero: bool = None):
"""Remove columns from GeoDataFrame and optionally fill with 0"""
for metric in metrics:
try:
grid.drop(metric, axis=1, inplace=True)
grid.drop(f'{metric}_cat', axis=1, inplace=True)
except KeyError:
pass
if setzero:
grid.loc[:, metric] = 0
reset_metrics(grid)
display(grid)
Read World geometries data
%%time
world = gp.read_file(gp.datasets.get_path('naturalearth_lowres'), crs=CRS_WGS)
world = world.to_crs(CRS_PROJ)
base = grid.plot(figsize=(22,28), color='white', edgecolor='black', linewidth=0.1)
# combine with world geometry
plot = world.plot(ax=base)
The aggregation speed is important here and we should not use polygon intersection. Since we're working with a regular grid and floating point numbers, a binary search is likely one of the fastest ways for our context. numpy.digitize provides a binary search, but it must be adapted to for the spatial context. A lat or lng value is assigned to the nearest bin matching. We get our lat and lng bins from our original Mollweide grid, which are regularly spaced at 100km interval. Note that we need to do two binary searches, for lat and for lng values.
testpoint = Point(8.546377, 47.392323)
testpoint2 = Point(13.726359, 51.028512)
testpoint3 = Point(13.71389, 46.52278) # Dreiländereck
gdf_testpoints = gp.GeoSeries([testpoint, testpoint2, testpoint3], crs=CRS_WGS)
# project geometries to Mollweide
gdf_testpoints_proj = gdf_testpoints.to_crs(CRS_PROJ)
display(gdf_testpoints_proj[0].x)
Preview map for testpoint
base = world.plot(figsize=(22,28), color='white', edgecolor='black', linewidth=0.1)
plot = gdf_testpoints_proj.plot(ax=base, markersize=10)
np.digitize is implemented in terms of np.searchsorted. This means that a binary search is used to bin the values, which scales much better for larger number of bins than the previous linear search. It also removes the requirement for the input array to be 1-dimensional.
Create 2 bins for each axis of existing Mollweide rows/cols grid:
ybins = np.array(rows)
xbins = np.array(cols)
Create 2 lists with a single entry (testpoint coordinate)
test_point_list_x = np.array([p.x for p in gdf_testpoints_proj])
test_point_list_y = np.array([p.y for p in gdf_testpoints_proj])
Find the nearest bin for x coordinate (returns the bin-index):
x_bin = np.digitize(test_point_list_x, xbins) - 1
display(x_bin)
Check value of bin (the y coordinate) based on returned index:
testpoint_xbin_idx = xbins[[x_bin[x] for x in range(0, len(x_bin))]]
display(testpoint_xbin_idx)
Repeat the same for y-testpoint:
y_bin = np.digitize(test_point_list_y, ybins) - 1
display(y_bin)
testpoint_ybin_idx = ybins[[y_bin[x] for x in range(0, len(y_bin))]]
display(testpoint_ybin_idx)
➡️ 759904 / 5579952 and 1059904 / 5979952 are indexes that we can use in our geodataframe index to return the matching grid-poly for each point
Get grid-poly by index from testpoint
grid.loc[testpoint_xbin_idx[0], testpoint_ybin_idx[0]]
Convert shapely bin poly to Geoseries and plot
testpoint_grids = gp.GeoSeries(
[grid.loc[testpoint_xbin_idx[x], testpoint_ybin_idx[x]].geometry for x in range(0,len(gdf_testpoints))])
testpoint_grids.plot()
Set auto zoom with buffer:
minx, miny, maxx, maxy = testpoint_grids.total_bounds
buf = 1000000
# a figure with a 1x1 grid of Axes
fig, ax = plt.subplots(1, 1,figsize=(10,8))
ax.set_xlim(minx-buf, maxx+buf)
ax.set_ylim(miny-buf, maxy+buf)
base = world.plot(ax=ax, color='white', edgecolor='black', linewidth=0.1)
grid_base = testpoint_grids.plot(ax=base, facecolor='red', linewidth=0.1)
plot = gdf_testpoints_proj.plot(ax=grid_base, markersize=8, color='blue')
Now that it has been visually verified that the algorithm works, lets create functions for the main processing job.
def get_best_bins(
search_values_x: np.array, search_values_y: np.array,
xbins: np.array, ybins: np.array) -> Tuple[np.ndarray, np.ndarray]:
"""Will return best bin for a lat and lng input
Note: prepare bins and values in correct matching projection
Args:
search_values_y: A list of projected latitude values
search_values_x: A list of projected longitude values
xbins: 1-d array of bins to snap lat/lng values
ybins: 1-d array of bins to snap lat/lng values
Returns:
Tuple[int, int]: A list of tuples with 2 index positions for the best
matching bins for each lat/lng
"""
xbins_idx = np.digitize(search_values_x, xbins, right=False)
ybins_idx = np.digitize(search_values_y, ybins, right=False)
return (xbins[xbins_idx-1], ybins[ybins_idx-1])
We're going to test the binning of coordinates on a small subset (Europe).
Prepare lat/lng tuple of lower left corner and upper right corner to crop sample map:
# Part of Italy and Sicily
bbox_italy = [
7.8662109375, 36.84427318493909,
19.31396484375, 43.89320031385282]
bbox = bbox_italy
Calculate bounding box with 1000 km buffer. For that, project the bounding Box to Mollweide, apply the buffer, and project back to WGS1984:
# convert to Mollweide
minx, miny = PROJ_TRANSFORMER.transform(
bbox_italy[0], bbox_italy[1])
maxx, maxy = PROJ_TRANSFORMER.transform(
bbox_italy[2], bbox_italy[3])
# apply buffer and convetr back to WGS1984
min_buf = PROJ_TRANSFORMER_BACK.transform(minx-buf, miny-buf)
max_buf = PROJ_TRANSFORMER_BACK.transform(maxx+buf, maxy+buf)
bbox_italy_buf = [min_buf[0], min_buf[1], max_buf[0], max_buf[1]]
Select columns and types for improving speed
usecols = ['latitude', 'longitude', 'post_hll']
dtypes = {'latitude': float, 'longitude': float}
reset_metrics(grid)
%%time
df = pd.read_csv(SUNRISE_FLICKR, usecols=usecols, dtype=dtypes, encoding='utf-8')
print(len(df))
Filter on bounding box (Italy)
def filter_df_bbox(
df: pd.DataFrame, bbox: Tuple[float, float, float, float],
inplace: bool = True):
"""Filter dataframe with bbox on latitude and longitude column"""
df.query(
f'({bbox[0]} < longitude) & '
f'(longitude < {bbox[2]}) & '
f'({bbox[1]} < latitude) & '
f'(latitude < {bbox[3]})',
inplace=True)
# set index to asc integers
if inplace:
df.reset_index(inplace=True, drop=True)
return
return df.reset_index(inplace=False, drop=True)
Execute and count number of posts in the bounding box:
%%time
filter_df_bbox(df=df, bbox=bbox_italy_buf)
print(f"There're {len(df):,.0f} distinct lat-lng coordinates located within the bounding box.")
display(df.head())
def proj_df(df, proj_transformer: Transformer = PROJ_TRANSFORMER):
"""Project pandas dataframe latitude and longitude decimal degrees
using predefined proj_transformer"""
if 'longitude' not in df.columns:
return
xx, yy = proj_transformer.transform(
df['longitude'].values, df['latitude'].values)
# assign projected coordinates to
# new columns x and y
# the ':' means: replace all values in-place
df.loc[:, "x"] = xx
df.loc[:, "y"] = yy
# Drop WGS coordinates
df.drop(columns=['longitude', 'latitude'], inplace=True)
%%time
proj_df(df)
print(f'Projected {len(df.values)} coordinates')
display(df.head())
%%time
xbins_match, ybins_match = get_best_bins(
search_values_x=df['x'].to_numpy(),
search_values_y=df['y'].to_numpy(),
xbins=xbins, ybins=ybins)
len(xbins_match)
xbins_match[:10]
ybins_match[:10]
Attach target bins to original dataframe. The :
means: modify all values in-place
df.loc[:, 'xbins_match'] = xbins_match
df.loc[:, 'ybins_match'] = ybins_match
# set new index column
df.set_index(['xbins_match', 'ybins_match'], inplace=True)
# drop x and y columns not needed anymore
df.drop(columns=['x', 'y'], inplace=True)
df.head()
The next step is to union hll sets and (optionally) return the cardinality (the number of distinct elements). This can only be done by connecting to a postgres database with HLL extension installed. We're using our hlldb here, but it is equally possible to connect to an empty Postgres DB such as pg-hll-empty docker container.
def union_hll(
hll_series: pd.Series, cardinality: bool = True,
db_calc: tools.DbConn = DB_CALC) -> pd.Series:
"""HLL Union and (optional) cardinality estimation from series of hll sets
based on group by composite index.
Args:
hll_series: Indexed series (bins) of hll sets.
cardinality: If True, returns cardinality (counts). Otherwise,
the unioned hll set will be returned.
The method will combine all groups of hll sets first,
in a single SQL command. Union of hll hll-sets belonging
to the same group (bin) and (optionally) returning the cardinality
(the estimated count) per group will be done in postgres.
By utilizing Postgres´ GROUP BY (instead of, e.g. doing
the group with numpy), it is possible to reduce the number
of SQL calls to a single run, which saves overhead
(establishing the db connection, initializing the SQL query
etc.). Also note that ascending integers are used for groups,
instead of their full original bin-ids, which also reduces
transfer time.
cardinality = True should be used when calculating counts in
a single pass.
cardinality = False should be used when incrementally union
of hll sets is required, e.g. due to size of input data.
In the last run, set to cardinality = True.
"""
# group all hll-sets per index (bin-id)
series_grouped = hll_series.groupby(
hll_series.index).apply(list)
# From grouped hll-sets,
# construct a single SQL Value list;
# if the following nested list comprehension
# doesn't make sense to you, have a look at
# spapas.github.io/2016/04/27/python-nested-list-comprehensions/
# with a decription on how to 'unnest'
# nested list comprehensions to regular for-loops
hll_values_list = ",".join(
[f"({ix}::int,'{hll_item}'::hll)"
for ix, hll_items
in enumerate(series_grouped.values.tolist())
for hll_item in hll_items])
# Compilation of SQL query,
# depending on whether to return the cardinality
# of unioned hll or the unioned hll
return_col = "hll_union"
hll_calc_pre = ""
hll_calc_tail = "AS hll_union"
if cardinality:
# add sql syntax for cardinality
# estimation
# (get count distinct from hll)
return_col = "hll_cardinality"
hll_calc_pre = "hll_cardinality("
hll_calc_tail = ")::int"
db_query = f"""
SELECT sq.{return_col} FROM (
SELECT s.group_ix,
{hll_calc_pre}
hll_union_agg(s.hll_set)
{hll_calc_tail}
FROM (
VALUES {hll_values_list}
) s(group_ix, hll_set)
GROUP BY group_ix
ORDER BY group_ix ASC) sq
"""
df = db_calc.query(db_query)
# to merge values back to grouped dataframe,
# first reset index to ascending integers
# matching those of the returned df;
# this will turn series_grouped into a DataFrame;
# the previous index will still exist in column 'index'
df_grouped = series_grouped.reset_index()
# drop hll sets not needed anymore
df_grouped.drop(columns=[hll_series.name], inplace=True)
# append hll_cardinality counts
# using matching ascending integer indexes
df_grouped.loc[df.index, return_col] = df[return_col]
# set index back to original bin-ids
df_grouped.set_index("index", inplace=True)
# split tuple index to produce
# the multiindex of the original dataframe
# with xbin and ybin column names
df_grouped.index = pd.MultiIndex.from_tuples(
df_grouped.index, names=['xbin', 'ybin'])
# return column as indexed pd.Series
return df_grouped[return_col]
Optionally, split dataframe into chunks, so we're not the exceeding memory limit (e.g. use if memory < 16GB). A chunk size of 1 Million records is suitable for a computer with about 8 GB of memory and optional sparse HLL mode enabled. If sparse mode is disabled, decrease chunk_size accordingly, to compensate for increased space.
%%time
chunked_df = [
df[i:i+CHUNK_SIZE] for i in range(0, df.shape[0], CHUNK_SIZE)]
chunked_df[0].head()
To test, process the first chunk:
%%time
cardinality_series = union_hll(chunked_df[0]["post_hll"])
cardinality_series.head()
Remove possibly existing result column in grid from previous run:
reset_metrics(grid, ["postcount_est"], setzero=True)
Append Series with calculated counts to grid (as new column) based on index match:
grid.loc[cardinality_series.index, 'postcount_est'] = cardinality_series
grid[grid["postcount_est"] > 0].head()
Process all chunks:
The caveat here is to incrementally union hll sets until all records have been processed. On the last loop, instruct the hll worker to return the cardinality instead of the unioned hll set.
First, define method to join cardinality to grid
# reference metric names and column names
COLUMN_METRIC_REF = {
"postcount_est":"post_hll",
"usercount_est":"user_hll",
"userdays_est":"date_hll"}
def join_df_grid(
df: pd.DataFrame, grid: gp.GeoDataFrame,
metric: str = "postcount_est",
cardinality: bool = True,
column_metric_ref: Dict[str, str] = COLUMN_METRIC_REF):
"""Union HLL Sets and estimate postcount per
grid bin from lat/lng coordinates
Args:
df: A pandas dataframe with latitude and
longitude columns in WGS1984
grid: A geopandas geodataframe with indexes
x and y (projected coordinates) and grid polys
metric: target column for estimate aggregate.
Default: postcount_est.
cardinality: will compute cardinality of unioned
hll sets. Otherwise, unioned hll sets will be
returned for incremental updates.
"""
# optionally, bin assigment of projected coordinates,
# make sure to not bin twice:
# x/y columns are removed after binning
if 'x' in df.columns:
bin_coordinates(df, xbins, ybins)
# set index column
df.set_index(
['xbins_match', 'ybins_match'], inplace=True)
# union hll sets and
# optional estimate count distincts (cardinality)
column = column_metric_ref.get(metric)
# get series with grouped hll sets
hll_series = df[column]
# union of hll sets:
# to allow incremental union of already merged data
# and new data, concatenate series from grid and new df
# only if column with previous hll sets already exists
if metric in grid.columns:
# remove nan values from grid and
# rename series to match names
hll_series = pd.concat(
[hll_series, grid[metric].dropna()]
).rename(column)
cardinality_series = union_hll(
hll_series, cardinality=cardinality)
# add unioned hll sets/computed cardinality to grid
grid.loc[
cardinality_series.index, metric] = cardinality_series
if cardinality:
# set all remaining grid cells
# with no data to zero and
# downcast column type from float to int
grid[metric] = grid[metric].fillna(0).astype(int)
Define method to process chunks:
def join_chunkeddf_grid(
chunked_df: List[pd.DataFrame], grid: gp.GeoDataFrame,
metric: str = "postcount_est", chunk_size: int = CHUNK_SIZE,
keep_hll: Optional[bool] = None,
column_metric_ref: Dict[str, str] = COLUMN_METRIC_REF):
"""Incremental union of HLL Sets and estimate postcount per
grid bin from chunked list of dataframe records. Results will
be stored in grid.
Args:
chunked_df: A list of (chunked) dataframes with latitude and
longitude columns in WGS1984
grid: A geopandas geodataframe with indexes
x and y (projected coordinates) and grid polys
metric: target column for estimate aggregate.
Default: postcount_est.
keep_hll: If True, will not remove HLL sketches after
final cardinality estimation. Will not reset metrics.
"""
reset_metrics(grid, [metric])
if keep_hll:
# if existing hll sets present,
# rename column so it can be updates
metric_hll = metric.replace("_est", "_hll")
if metric_hll in grid.columns:
grid.rename(
columns={
metric_hll: metric.replace(
"_hll", "_est")},
inplace=True)
for ix, chunk_df in enumerate(chunked_df):
# compute cardinality only on last iteration
cardinality = False
if ix == len(chunked_df)-1:
cardinality = True
column = column_metric_ref.get(metric)
# get series with grouped hll sets
hll_series = chunk_df[column]
if metric in grid.columns:
# merge existing hll sets with new ones
# into one series (with duplicate indexes);
# remove nan values from grid and
# rename series to match names
hll_series = pd.concat(
[hll_series, grid[metric].dropna()]
).rename(column)
cardinality_series = union_hll(
hll_series, cardinality=cardinality)
if keep_hll:
# only if final hll sketches need to
# be kept (e.g. for benchmarking):
# do another union, without cardinality
# estimation, and store results
# in column "metric"_hll
hll_sketch_series = union_hll(
hll_series, cardinality=False)
grid.loc[
hll_sketch_series.index,
f'{metric.replace("_est","_hll")}'] = hll_sketch_series
# add unioned hll sets/computed cardinality to grid
grid.loc[
cardinality_series.index, metric] = cardinality_series
if cardinality:
# set all remaining grid cells
# with no data to zero and
# downcast column type from float to int
grid[metric] = grid[metric].fillna(0).astype(int)
clear_output(wait=True)
print(f'Mapped ~{(ix+1)*chunk_size} coordinates to bins')
Execute:
join_chunkeddf_grid(chunked_df, grid, chunk_size=CHUNK_SIZE)
All distinct counts are now attached to the bins of the grid:
grid[grid["postcount_est"]>10].head()
Use headtail_breaks classification scheme because it is specifically suited to map long tailed data, see Jiang 2013
# global legend font size setting
plt.rc('legend', **{'fontsize': 16})
def leg_format(leg):
"Format matplotlib legend entries"
for lbl in leg.get_texts():
label_text = lbl.get_text()
lower = label_text.split(",")[0].lstrip("[(")
upper = label_text.split(",")[1].rstrip(")]")
new_text = f'{float(lower):,.0f} - {float(upper):,.0f}'
lbl.set_text(new_text)
def title_mod(title, km_size: Optional[float] = km_size):
"""Update title/output name if grid size is not 100km"""
if not km_size or km_size == 100:
return title
return f'{title} ({km_size:.0f} km grid)'
def save_plot(
grid: gp.GeoDataFrame, title: str, column: str, save_fig: str = None,
output: Path = OUTPUT):
"""Plot GeoDataFrame with matplotlib backend, optionaly export as png"""
fig, ax = plt.subplots(1, 1,figsize=(10,12))
ax.set_xlim(minx-buf, maxx+buf)
ax.set_ylim(miny-buf, maxy+buf)
title = title_mod(title)
ax.set_title(title, fontsize=20)
base = grid.plot(
ax=ax, column=column, cmap='OrRd', scheme='headtail_breaks',
legend=True, legend_kwds={'loc': 'lower right'})
# combine with world geometry
plot = world.plot(
ax=base, color='none', edgecolor='black', linewidth=0.1)
leg = ax.get_legend()
leg_format(leg)
if not save_fig:
return
fig.savefig(output / f"figures{km_size_str}" / save_fig, dpi=300, format='PNG',
bbox_inches='tight', pad_inches=1)
save_plot(
grid=grid, title='Estimated Post Count',
column='postcount_est', save_fig=f'postcount_sample_est.png')
Note: the data anomaly seen in Germany is located in the grid cell for Berlin. It is related to a single user who uploaded 53961 photos with sunset and sunrise. This anomaly will disappear once aggregation is based on user_guid count.
When using HLL, aggregation of user_guids or user_days takes the same amount of time (unlike when working with original data, where memory consumption increases significantly). We'll only need to update the columns that are loaded from the database:
usecols = ['latitude', 'longitude', 'user_hll']
Adjust method for stream-reading from CSV in chunks:
iter_csv = pd.read_csv(
SUNSET_FLICKR, usecols=usecols, iterator=True,
dtype=dtypes, encoding='utf-8', chunksize=CHUNK_SIZE)
def proj_report(df, cnt, inplace: bool = False):
"""Project df with progress report"""
proj_df(df)
clear_output(wait=True)
print(f'Projected {cnt:,.0f} coordinates')
if inplace:
return
return df
%%time
# filter
chunked_df = [
filter_df_bbox(
df=chunk_df, bbox=bbox_italy_buf, inplace=False)
for chunk_df in iter_csv]
# project
projected_cnt = 0
for chunk_df in chunked_df:
projected_cnt += len(chunk_df)
proj_report(
chunk_df, projected_cnt, inplace=True)
display(chunked_df[0].head())
def bin_coordinates(
df: pd.DataFrame, xbins:
np.ndarray, ybins: np.ndarray) -> pd.DataFrame:
"""Bin coordinates using binary search and append to df as new index"""
xbins_match, ybins_match = get_best_bins(
search_values_x=df['x'].to_numpy(),
search_values_y=df['y'].to_numpy(),
xbins=xbins, ybins=ybins)
# append target bins to original dataframe
# use .loc to avoid chained indexing
df.loc[:, 'xbins_match'] = xbins_match
df.loc[:, 'ybins_match'] = ybins_match
# drop x and y columns not needed anymore
df.drop(columns=['x', 'y'], inplace=True)
def bin_chunked_coordinates(
chunked_df: List[pd.DataFrame]):
"""Bin coordinates of chunked dataframe"""
binned_cnt = 0
for ix, df in enumerate(chunked_df):
bin_coordinates(df, xbins, ybins)
df.set_index(['xbins_match', 'ybins_match'], inplace=True)
clear_output(wait=True)
binned_cnt += len(df)
print(f"Binned {binned_cnt:,.0f} coordinates..")
%%time
bin_chunked_coordinates(chunked_df)
display(chunked_df[0].head())
Union HLL Sets per grid-id and calculate cardinality (estimated distinct user count):
join_chunkeddf_grid(
chunked_df=chunked_df, grid=grid, metric="usercount_est")
grid[grid["usercount_est"]> 0].head()
Look at this. There're many polygons were thounsands of photos have been created by only few users. Lets see how this affects our test map..
save_plot(
grid=grid, title='Estimated User Count',
column='usercount_est', save_fig='usercount_sample_est.png')
The sequence of commands for userdays is exactly the same as for postcount and usercount above.
usecols = ['latitude', 'longitude', 'date_hll']
def read_project_chunked(filename: str,
usecols: List[str], chunk_size: int = CHUNK_SIZE,
bbox: Tuple[float, float, float, float] = None) -> List[pd.DataFrame]:
"""Read data from csv, optionally clip to bbox and projet"""
iter_csv = pd.read_csv(
filename, usecols=usecols, iterator=True,
dtype=dtypes, encoding='utf-8', chunksize=chunk_size)
if bbox:
chunked_df = [filter_df_bbox(
df=chunk_df, bbox=bbox, inplace=False)
for chunk_df in iter_csv]
else:
chunked_df = [chunk_df for chunk_df in iter_csv]
# project
projected_cnt = 0
for chunk_df in chunked_df:
projected_cnt += len(chunk_df)
proj_report(
chunk_df, projected_cnt, inplace=True)
return chunked_df
Run:
%%time
chunked_df = read_project_chunked(
filename=SUNRISE_FLICKR,
usecols=usecols,
bbox=bbox_italy_buf)
display(chunked_df[0].head())
%%time
bin_chunked_coordinates(chunked_df)
join_chunkeddf_grid(
chunked_df=chunked_df, grid=grid, metric="userdays_est")
grid[grid["userdays_est"]> 0].head()
Note below that our very active Berlin user doesn't affect the map as strong as with postcount. Of all three metrics, userdays per grid perhaps provides the most balanced capture of behaviour patterns.
save_plot(
grid=grid, title='Estimated User Days',
column='userdays_est', save_fig='userdays_sample_est.png')
There're other approaches for further reducing noise. For example, to reduce the impact of automatic capturing devices (such as webcams uploading x pictures per day), a possibility is to count distinct userlocations. For userlocations metric, a user would be counted multiple times per grid bin only for pictures with different lat/lng. Or the number of distinct userlocationdays (etc.). These metrics easy to implement using hll, but would be quite difficult to compute using raw data.
Lets summarize the above code in a few methods:
Plotting preparation
The below methods contain combined code from above, plus final plot style improvements.
def format_legend(
leg, bounds: List[str], inverse: bool = None,
metric: str = "postcount_est"):
"""Formats legend (numbers rounded, colors etc.)"""
leg.set_bbox_to_anchor((0., 0.3, 0.2, 0.3))
# get all the legend labels
legend_labels = leg.get_texts()
plt.setp(legend_labels, fontsize='12')
lcolor = 'black'
if inverse:
frame = leg.get_frame()
frame.set_facecolor('black')
frame.set_edgecolor('grey')
lcolor = "white"
plt.setp(legend_labels, color = lcolor)
if metric == "postcount_est":
leg.set_title("Estimated Post Count")
elif metric == "usercount_est":
leg.set_title("Estimated User Count")
else:
leg.set_title("Estimated User Days")
plt.setp(leg.get_title(),