Alexander Dunkel, TU Dresden; Institute of Cartography
TL;DR Visualization of 350 Million Geo-Social Media posts from Twitter (Europe, 2016-2018; USA, 2021-2022) with parquet (streamed processing) and datashader. The input here is a list of latitude and longitude coordinates (Decimal Degrees), no other information, as a single CSV (10GB). by using streamed processing and parquet, this notebook can therefore be used to process any lat/lng CSV, independent how big the input data is.
Sources:
conda activate worker_env
conda install -c conda-forge pytables fastparquet
Use the lbsn Jupyterlab Docker container as a starting point.
Also, you need to install either pyarrow
or fastparquet
(recommended) from conda-forge
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.diagnostics as diag
import datashader.transfer_functions as tf
import datashader as ds
from datashader.utils import lnglat_to_meters
from IPython.display import clear_output
from pathlib import Path
Preparations
OUTPUT = Path.cwd().parents[0] / "out"
OUTPUT.mkdir(exist_ok=True)
Prepare jupytext folders
(Path.cwd().parents[0] / "notebooks").mkdir(exist_ok=True)
(Path.cwd().parents[0] / "py").mkdir(exist_ok=True)
CHUNK_SIZE = 5000000
filename = Path.cwd().parents[0] / 'data' / '2022-04-19_Twitter_EuropeUSALatLng.csv'
dtypes = {'latitude': float, 'longitude': float}
Loop over chunks, project and conversion to parquet format
%%time
iter_csv = pd.read_csv(
filename, iterator=True,
dtype=dtypes, encoding='utf-8', chunksize=CHUNK_SIZE)
cnt = 0
parquet_output = OUTPUT / "twitter_proj.snappy.parq"
for ix, chunk in enumerate(iter_csv):
# read
append=True
cnt += CHUNK_SIZE
clear_output(wait=True)
print(f"Processed {cnt:,.0f} coordinates..")
if ix==0:
if parquet_output.exists():
break
append=False
dd_chunk = dd.from_pandas(chunk, npartitions=1)
# project
web_mercator_x, web_mercator_y = lnglat_to_meters(
chunk['longitude'], chunk['latitude'])
projected_coordinates = dd.concat(
[web_mercator_x, web_mercator_y], axis=1)
transformed = projected_coordinates.rename(
columns={'longitude':'x', 'latitude': 'y'})
# store
dd.to_parquet(transformed, parquet_output, append=append, compression="SNAPPY")
Check the file size and if it fits in memory
datasize = sum(f.stat().st_size for f in parquet_output.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
f"Size: {datasize:,.1f} GB")
df = dd.io.parquet.read_parquet(parquet_output)
if datasize < 8:
df = df.persist()
df.columns
df.head()
def bounds(x_range, y_range):
x,y = lnglat_to_meters(x_range, y_range)
return dict(x_range=x, y_range=y)
Earth = ((-180.00, 180.00), (-59.00, 74.00))
France = (( -12.00, 16.00), ( 41.26, 51.27))
Berlin = (( 12.843018, 14.149704), ( 52.274880, 52.684292))
USA = (( -126, -64), ( 24.92, 49.35))
Paris = (( 2.05, 2.65), ( 48.76, 48.97))
plot_width = 1000
plot_height = int(plot_width*0.5)
cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height, **bounds(*Earth))
with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
agg = cvs.points(df, x='x', y='y')
tf.shade(agg.where(agg > 15), cmap=["lightblue", "darkblue"])
def plot(x_range, y_range):
cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height, **bounds(x_range, y_range))
with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
agg = cvs.points(df, x='x', y='y')
return tf.shade(agg, cmap=["lightblue","darkblue"])
%time plot(*France)
%time plot(*Paris)
%time plot(*Berlin)
%time plot(*USA)
!jupyter nbconvert --to html_toc \
--output-dir=../resources/ ./00_Twitter_datashader.ipynb \
--template=../nbconvert.tpl \
--ExtractOutputPreprocessor.enabled=False >&- 2>&- # create single output file