Alexander Dunkel, TU Dresden, Institute of Cartography; Maximilian Hartmann, Universität Zürich (UZH), Geocomputation
In this notebook, aggregate data per grid bin is used to generate summary data (chi square) per country. We'll use the 50 km grid data, to reduce errors from MAUP. Our goal is to see whether some countries feature a bias towards either sunset or sunrise, to support discussion of possible context factors.
Import code from other jupyter notebooks, synced to *.py with jupytext:
import sys
from pathlib import Path
module_path = str(Path.cwd().parents[0] / "py")
if module_path not in sys.path:
sys.path.append(module_path)
# import all previous chained notebooks
from _04_combine import *
Load additional dependencies
import requests, zipfile, io
Activate autoreload of changed python files:
%load_ext autoreload
%autoreload 2
Via gp.datasets.get_path('naturalearth_lowres')
, country geometries area easily available. However, these do not include separated spatial region subunits, which would combine all overseas regions of e.g. France together. In the admin-0 natural earth subunits dataset, these subunit areas are available.
NE_PATH = Path.cwd().parents[0] / "resources" / "naturalearth"
NE_URI = "https://www.naturalearthdata.com/http//www.naturalearthdata.com/download/50m/cultural/"
NE_FILENAME = "ne_50m_admin_0_map_subunits.zip"
grid_empty = create_grid_df(
grid_size=50000)
grid_empty = grid_to_gdf(grid_empty)
def get_zip_extract(
uri: str, filename: str, output_path: Path,
create_path: bool = True, skip_exists: bool = True,
report: bool = False):
"""Get Zip file and extract to output_path.
Create Path if not exists."""
if create_path:
output_path.mkdir(
exist_ok=True)
if skip_exists and Path(
output_path / filename.replace(".zip", ".shp")).exists():
if report:
print("File already exists.. skipping download..")
return
r = requests.get(f'{uri}{filename}', stream=True)
z = zipfile.ZipFile(io.BytesIO(r.content))
z.extractall(output_path)
get_zip_extract(
uri=NE_URI,
filename=NE_FILENAME,
output_path=NE_PATH,
report=True)
Read country shapefile to GeoDataFrame:
world = gp.read_file(
NE_PATH / NE_FILENAME.replace(".zip", ".shp"))
world = world.to_crs(CRS_PROJ)
COLUMNS_KEEP = ['geometry','ADM0_A3','SOV_A3','ADMIN','SOVEREIGNT', 'ISO_A3', 'SU_A3']
def drop_cols_except(df: pd.DataFrame, columns_keep: List[str] = COLUMNS_KEEP):
"""Drop all columns from DataFrame except those specified in cols_except"""
df.drop(
df.columns.difference(columns_keep), axis=1, inplace=True)
drop_cols_except(world)
world.head()
world[world["SOVEREIGNT"] == "France"].head(10)
world.plot()
Define column to use for country-aggregation:
COUNTRY_COL = 'SU_A3'
These SU_A3 country codes are extended ISO codes, see this ref table.
Only keep COUNTRY_COL column, SOVEREIGNT, and geometry:
columns_keep = ['geometry', 'SOVEREIGNT', COUNTRY_COL]
drop_cols_except(world, columns_keep)
First, write multi-index to columns, to later re-create the index:
grid_empty['xbin'] = grid_empty.index.get_level_values(0)
grid_empty['ybin'] = grid_empty.index.get_level_values(1)
Create an overlay, only stroing country-grid intersection:
%%time
grid_overlay = gp.overlay(
grid_empty, world,
how='intersection')
grid_overlay[grid_overlay[COUNTRY_COL] == "DEU"].head()
grid_overlay[
grid_overlay[COUNTRY_COL].isin(["DEU", "FXX"])].plot(
edgecolor='white', column=COUNTRY_COL, linewidth=0.3)
Calculate area:
grid_overlay["area"] = grid_overlay.area
grid_overlay[grid_overlay[COUNTRY_COL] == "DEU"].head()
grid_overlay.groupby(["xbin", "ybin"], sort=False).head()
Next steps:
idx_maxarea = grid_overlay.groupby(
["xbin", "ybin"], sort=False)['area'].idxmax()
idx_maxarea.head()
bin_adm_maxarea = grid_overlay.loc[
idx_maxarea, ["xbin", "ybin", COUNTRY_COL]]
Recreate index (Note: duplicate xbin/ybin indexes exist):
bin_adm_maxarea.set_index(
['xbin', 'ybin'], inplace=True)
bin_adm_maxarea.head()
Assign back to grid:
grid_empty.loc[
bin_adm_maxarea.index,
COUNTRY_COL] = bin_adm_maxarea[COUNTRY_COL]
Set nan to Empty class
grid_empty.loc[
grid_empty[COUNTRY_COL].isna(),
COUNTRY_COL] = "Empty"
grid_empty[grid_empty[COUNTRY_COL] != "Empty"].head()
Check assignment
fig, ax = plt.subplots(1, 1, figsize=(10,12))
bbox_italy = (
7.8662109375, 36.24427318493909,
19.31396484375, 43.29320031385282)
buf = 1000000
# create bounds from WGS1984 italy and project to Mollweide
minx, miny = PROJ_TRANSFORMER.transform(
bbox_italy[0], bbox_italy[1])
maxx, maxy = PROJ_TRANSFORMER.transform(
bbox_italy[2], bbox_italy[3])
ax.set_xlim(minx-buf, maxx+buf)
ax.set_ylim(miny-buf, maxy+buf)
empty = grid_empty[grid_empty[COUNTRY_COL] == "Empty"].plot(
ax=ax, edgecolor=None, facecolor='white', linewidth=0)
base = grid_empty[grid_empty[COUNTRY_COL] != "Empty"].plot(
ax=ax, edgecolor='white', column=COUNTRY_COL, linewidth=0.3)
world.plot(
ax=base, color='none', edgecolor='black', linewidth=0.2)
grid_empty.head()
Combine in a single method:
def grid_assign_country(
grid: gp.GeoDataFrame, countries: gp.GeoDataFrame,
country_col: Optional[str] = COUNTRY_COL):
"""Assign countries code based on max area overlay to grid"""
# get index as column
grid['xbin'] = grid.index.get_level_values(0)
grid['ybin'] = grid.index.get_level_values(1)
# intersect
grid_overlay = gp.overlay(
grid, countries,
how='intersection')
# calculate area
grid_overlay["area"] = grid_overlay.area
# select indexes based on area overlay
idx_maxarea = grid_overlay.groupby(
["xbin", "ybin"], sort=False)["area"].idxmax()
bin_country_maxarea = grid_overlay.loc[
idx_maxarea, ["xbin", "ybin", country_col]]
# recreate grid index
bin_country_maxarea.set_index(
['xbin', 'ybin'], inplace=True)
# assign country back to grid
grid.loc[
bin_country_maxarea.index,
country_col] = bin_country_maxarea[country_col]
# drop index columns not needed anymore
grid.drop(
["xbin", "ybin"], axis=1, inplace=True)
For exploring TFIDF per country, we'll also export country data here for PostGIS
Grid to Postgis
For further work, we'll export the SQL syntax here to import the 100km Grid to to Postgis (optional). In addition,
CREATE TABLE spatial. "grid100km" (
xbin int,
ybin int,
PRIMARY KEY (xbin, ybin),
su_a3 char(3),
geometry geometry(Polygon, 54009)
);
grid_empty[grid_empty[COUNTRY_COL] == "USB"].plot()
from shapely.wkt import dumps as wkt_dumps
def add_wkt_col(gdf: gp.GeoDataFrame):
"""Converts gdf.geometry to WKT (Well-Known-Text) as new column
Requires `from shapely.wkt import dumps as wkt_dumps`"""
gdf.loc[gdf.index, "geom_wkt"] = gdf["geometry"].apply(lambda x: wkt_dumps(x))
add_wkt_col(grid_empty)
value_list = ',\n'.join(
f"({index[0]},{index[1]}, '{row[COUNTRY_COL]}', 'SRID=54009;{row.geom_wkt}')"
for index, row in grid_empty[grid_empty[COUNTRY_COL] != "Empty"].iterrows())
with open(OUTPUT / "csv" / "grid_100km_Mollweide_WKT.sql", 'w') as wkt_file:
wkt_file.write(
f'''
INSERT INTO spatial."grid100km" (
xbin,ybin,su_a3,geometry)
VALUES
{value_list};
''')
Country (su_a3) to Postgis
CREATE TABLE spatial. "country_sua3" (
su_a3 char(3),
PRIMARY KEY (su_a3),
geometry geometry(Geometry, 54009)
);
world_empty = world.copy().set_index("SU_A3").drop(columns=['SOVEREIGNT'])
add_wkt_col(world_empty)
world_empty.head()
value_list = ',\n'.join(
f"('{index}', 'SRID=54009;{row.geom_wkt}')"
for index, row in world_empty.iterrows())
with open(OUTPUT / "csv" / "country_sua3_Mollweide_WKT.sql", 'w') as wkt_file:
wkt_file.write(
f'''
INSERT INTO spatial. "country_sua3" (
su_a3,geometry)
VALUES
{value_list};
''')
grid = pd.read_pickle(
OUTPUT / f"pickles_50km" / "flickr_userdays_all_est_hll.pkl")
grid["userdays_hll"].dropna().head()
grid.dropna().head()
The actual assignment takes pretty long. We will later store the assignment (bin-idx, country-idx) in a pickle that can be loaded.
%%time
grid_assign_country(
grid, world, country_col=COUNTRY_COL)
grid.head()
grid.plot(edgecolor='white', column=COUNTRY_COL, figsize=(22,28), linewidth=0.3)
Merge hll sets per country id, connect to hll worker db:
DB_CONN = psycopg2.connect(
host=DB_HOST,
port=DB_PORT ,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS
)
DB_CONN.set_session(
readonly=True)
DB_CALC = tools.DbConn(
DB_CONN)
CUR_HLL = DB_CONN.cursor()
db_conn = tools.DbConn(DB_CONN)
db_conn.query("SELECT 1;")
def union_hll(
hll_series: pd.Series, db_conn: tools.DbConn, cardinality: bool = True,
group_by: Optional[pd.Series] = None) -> pd.Series:
"""HLL Union and (optional) cardinality estimation from series of hll sets
based on group by composite index.
Args:
hll_series: Indexed series (bins) of hll sets.
cardinality: If True, returns cardinality (counts). Otherwise,
the unioned hll set will be returned.
group_by: Optional Provide Series to group hll sets by. If None,
Index will be used.
The method will combine all groups of hll sets first,
in a single SQL command. Union of hll hll-sets belonging
to the same group (bin) and (optionally) returning the cardinality
(the estimated count) per group will be done in postgres.
By utilizing Postgres´ GROUP BY (instead of, e.g. doing
the group with numpy), it is possible to reduce the number
of SQL calls to a single run, which saves overhead
(establishing the db connection, initializing the SQL query
etc.). Also note that ascending integers are used for groups,
instead of their full original bin-ids, which also reduces
transfer time.
cardinality = True should be used when calculating counts in
a single pass.
cardinality = False should be used when incrementally union
of hll sets is required, e.g. due to size of input data.
In the last run, set to cardinality = True.
"""
if group_by is None:
group_by_series = hll_series.index
else:
group_by_series = group_by
# group all hll-sets per index (bin-id)
series_grouped = hll_series.groupby(
group_by_series).apply(list)
# From grouped hll-sets,
# construct a single SQL Value list;
# if the following nested list comprehension
# doesn't make sense to you, have a look at
# spapas.github.io/2016/04/27/python-nested-list-comprehensions/
# with a decription on how to 'unnest'
# nested list comprehensions to regular for-loops
hll_values_list = ",".join(
[f"({ix}::int,'{hll_item}'::hll)"
for ix, hll_items
in enumerate(series_grouped.values.tolist())
for hll_item in hll_items])
# Compilation of SQL query,
# depending on whether to return the cardinality
# of unioned hll or the unioned hll
return_col = "hll_union"
hll_calc_pre = ""
hll_calc_tail = "AS hll_union"
if cardinality:
# add sql syntax for cardinality
# estimation
# (get count distinct from hll)
return_col = "hll_cardinality"
hll_calc_pre = "hll_cardinality("
hll_calc_tail = ")::int"
db_query = f"""
SELECT sq.{return_col} FROM (
SELECT s.group_ix,
{hll_calc_pre}
hll_union_agg(s.hll_set)
{hll_calc_tail}
FROM (
VALUES {hll_values_list}
) s(group_ix, hll_set)
GROUP BY group_ix
ORDER BY group_ix ASC) sq
"""
df = db_conn.query(db_query)
# to merge values back to grouped dataframe,
# first reset index to ascending integers
# matching those of the returned df;
# this will turn series_grouped into a DataFrame;
# the previous index will still exist in column 'index'
df_grouped = series_grouped.reset_index()
# drop hll sets not needed anymore
df_grouped.drop(columns=[hll_series.name], inplace=True)
# append hll_cardinality counts
# using matching ascending integer indexes
df_grouped.loc[df.index, return_col] = df[return_col]
# set index back to original bin-ids
df_grouped.set_index(group_by_series.name, inplace=True)
# return column as indexed pd.Series
return df_grouped[return_col]
%%time
cardinality_series = union_hll(
hll_series=grid["userdays_hll"].dropna(),
group_by=grid[COUNTRY_COL],
db_conn=db_conn)
cardinality_series.head()
world.set_index(COUNTRY_COL, inplace=True)
world.loc[
cardinality_series.index,
"userdays_est"] = cardinality_series
Calculate area and normalize:
world["area"] = world.area
world["userdays_est_norm"] = ((world.userdays_est ** 2) / world.area)
world.head()
Preview plot:
fig, ax = plt.subplots(1, 1, figsize=(22,28))
world.plot(
column='userdays_est_norm',
cmap='OrRd',
ax=ax,
linewidth=0.2,
edgecolor='grey',
legend=True,
scheme='headtail_breaks')
Prepare method:
def merge_countries_grid(
grid_countries: gp.GeoDataFrame, grid: gp.GeoDataFrame,
usecols: List[str], mask: Optional[pd.Series] = None):
"""Merge temporary country assigned grid data to grid geodataframe
Args:
grid_countries: Indexed GeoDataFrame with country data
grid: Indexed GeoDataFrame target
usecols: Col names to merge from countries
mask: Optional boolean mask (pd.Series)
to partially merge country data
"""
for col in usecols:
if mask is None:
grid.loc[grid_countries.index, col] = grid_countries[col]
continue
grid_countries_mask = grid_countries.loc[mask, col]
grid.loc[grid_countries_mask.index, col] = grid_countries_mask
def group_union_cardinality(
countries: gp.GeoDataFrame, grid: gp.GeoDataFrame,
db_conn: tools.DbConn,
metric_hll: str = "usercount_hll",
country_col: str = COUNTRY_COL,
grid_countries_pickle: Optional[Path] = None):
"""Group hll sets per country and assign cardinality to countries
Args:
grid: Indexed GeoDataFrame with hll data to use in union
countries: Country GeoDataFrame to group grid-bins and assign
cardinality.
country_col: Name/ID of column in country to use in group by
metric_hll: the name of HLL column that is used in hll union
db_conn: A (read-only) DB connection to PG HLL Worker, for HLL
calculation.
grid_countries_pickle: Optional path to store and load intermediate
grid-country assignment
"""
if grid_countries_pickle and grid_countries_pickle.exists():
grid_countries_tmp = pd.read_pickle(
grid_countries_pickle).to_frame()
merge_countries_grid(
grid_countries=grid_countries_tmp,
grid=grid,
usecols=[country_col])
else:
grid_assign_country(
grid, countries, country_col=country_col)
# store intermediate, to speed up later runs
if grid_countries_pickle:
grid[country_col].to_pickle(
grid_countries_pickle)
print("Intermediate country-grid assignment written..")
# calculate cardinality by hll union
cardinality_series = union_hll(
hll_series=grid[metric_hll].dropna(),
group_by=grid[country_col],
db_conn=db_conn)
# set index
if not countries.index.name == country_col:
countries.set_index(country_col, inplace=True)
# assign cardinality
countries.loc[
cardinality_series.index,
metric_hll.replace("_hll", "_est")] = cardinality_series
def country_agg_frompickle(
grid_pickle: Path, db_conn: tools.DbConn,
ne_path: Path = NE_PATH, ne_filename: str = NE_FILENAME,
ne_uri: str = NE_URI, country_col: str = COUNTRY_COL,
metric_hll: str = "usercount_hll") -> gp.GeoDataFrame:
"""Load grid pickle, load country shapefile, join cardinality to country
and return country GeoDataFrame"""
# prepare country gdf
get_zip_extract(
uri=ne_uri,
filename=ne_filename,
output_path=ne_path)
world = gp.read_file(
ne_path / ne_filename.replace(".zip", ".shp"))
world = world.to_crs(CRS_PROJ)
columns_keep = ['geometry', country_col]
drop_cols_except(world, columns_keep)
# prepare grid gdf
grid = pd.read_pickle(
grid_pickle)
# check if intermediate country agg file already exists
grid_countries_pickle = Path(
ne_path / f'{km_size_str}_{ne_filename.replace(".zip", ".pickle")}')
group_union_cardinality(
world, grid, country_col=country_col, db_conn=db_conn,
grid_countries_pickle=grid_countries_pickle, metric_hll=metric_hll)
return world
Test:
metrics = ["userdays", "usercount"]
%%time
for metric in metrics:
world = country_agg_frompickle(
grid_pickle=OUTPUT / f"pickles_50km" / f"flickr_{metric}_all_est_hll.pkl",
db_conn=db_conn, metric_hll=f"{metric}_hll")
fig, ax = plt.subplots(1, 1, figsize=(22,28))
ax.set_title(metric.capitalize())
world.plot(
column=f'{metric}_est',
cmap='OrRd',
ax=ax,
linewidth=0.2,
edgecolor='grey',
legend=True,
scheme='headtail_breaks')