Twitter 395 Million reactions visualized with datashader and parquet

Alexander Dunkel, TU Dresden; Institute of Cartography


TL;DR Visualization of 350 Million Geo-Social Media posts from Twitter (Europe, 2016-2018; USA, 2021-2022) with parquet (streamed processing) and datashader. The input here is a list of latitude and longitude coordinates (Decimal Degrees), no other information, as a single CSV (10GB). by using streamed processing and parquet, this notebook can therefore be used to process any lat/lng CSV, independent how big the input data is.

Sources:


•••
Out[1]:

Last updated: Apr-21-2022

Preparations

conda activate worker_env
conda install -c conda-forge pytables fastparquet

Use the lbsn Jupyterlab Docker container as a starting point.

Also, you need to install either pyarrow or fastparquet (recommended) from conda-forge

In [4]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.diagnostics as diag
import datashader.transfer_functions as tf
import datashader as ds
from datashader.utils import lnglat_to_meters
from IPython.display import clear_output
from pathlib import Path

Preparations

In [7]:
OUTPUT = Path.cwd().parents[0] / "out"
OUTPUT.mkdir(exist_ok=True)

Prepare jupytext folders

In [5]:
(Path.cwd().parents[0] / "notebooks").mkdir(exist_ok=True)
(Path.cwd().parents[0] / "py").mkdir(exist_ok=True)

Convert data to Parquet Format

The parquet format stores data indo individual files (partitions) and allows streamed processing in (e.g.) dask. See a comparison of supported data types for datashader here.

In [11]:
CHUNK_SIZE = 5000000 

filename = Path.cwd().parents[0] / 'data' / '2022-04-19_Twitter_EuropeUSALatLng.csv'
dtypes = {'latitude': float, 'longitude': float}

Loop over chunks, project and conversion to parquet format

In [22]:
%%time
iter_csv = pd.read_csv(
    filename, iterator=True,
    dtype=dtypes, encoding='utf-8', chunksize=CHUNK_SIZE)

cnt = 0
parquet_output = OUTPUT / "twitter_proj.snappy.parq"
for ix, chunk in enumerate(iter_csv):
    # read
    append=True
    cnt += CHUNK_SIZE
    clear_output(wait=True)
    print(f"Processed {cnt:,.0f} coordinates..")
    if ix==0:
        if parquet_output.exists():
            break
        append=False
    dd_chunk = dd.from_pandas(chunk, npartitions=1)
    # project
    web_mercator_x, web_mercator_y = lnglat_to_meters(
        chunk['longitude'], chunk['latitude'])
    projected_coordinates = dd.concat(
        [web_mercator_x, web_mercator_y], axis=1)
    transformed = projected_coordinates.rename(
        columns={'longitude':'x', 'latitude': 'y'})
    # store
    dd.to_parquet(transformed, parquet_output, append=append, compression="SNAPPY")
Processed 395,000,000 coordinates..

Visualize with datashader

Check the file size and if it fits in memory

In [8]:
datasize = sum(f.stat().st_size for f in parquet_output.glob('**/*') if f.is_file())/(1024*1024*1024)
print(
    f"Size: {datasize:,.1f} GB")
Size: 5.4 GB
In [24]:
df = dd.io.parquet.read_parquet(parquet_output)
if datasize < 8:
    df = df.persist()
In [25]:
df.columns
Out[25]:
Index(['x', 'y'], dtype='object')
In [26]:
df.head()
Out[26]:
x y
0 3.224762e+06 5.017829e+06
1 3.224724e+06 5.017872e+06
2 3.224781e+06 5.017860e+06
3 3.224731e+06 5.017901e+06
4 3.224784e+06 5.017908e+06
In [38]:
def bounds(x_range, y_range):
    x,y = lnglat_to_meters(x_range, y_range)
    return dict(x_range=x, y_range=y)

Earth       = ((-180.00, 180.00), (-59.00, 74.00))
France      = (( -12.00,  16.00), ( 41.26, 51.27))
Berlin      = (( 12.843018,  14.149704), ( 52.274880, 52.684292))
USA         = (( -126,  -64), ( 24.92, 49.35))
Paris       = ((   2.05,   2.65), ( 48.76, 48.97))

plot_width = 1000
plot_height = int(plot_width*0.5)
In [28]:
cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height, **bounds(*Earth))
In [29]:
with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
    agg = cvs.points(df, x='x', y='y')
[########################################] | 100% Completed | 40.7s
In [30]:
tf.shade(agg.where(agg > 15), cmap=["lightblue", "darkblue"])
Out[30]:
In [31]:
def plot(x_range, y_range):
    cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height, **bounds(x_range, y_range))
    with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:
        agg = cvs.points(df, x='x', y='y')
    return tf.shade(agg, cmap=["lightblue","darkblue"])
In [32]:
%time plot(*France)
[########################################] | 100% Completed | 38.4s
CPU times: user 22.7 s, sys: 20.5 s, total: 43.3 s
Wall time: 38.8 s
Out[32]:
In [36]:
%time plot(*Paris)
[########################################] | 100% Completed | 39.8s
CPU times: user 21.4 s, sys: 21.3 s, total: 42.8 s
Wall time: 40.3 s
Out[36]:
In [39]:
%time plot(*Berlin)
[########################################] | 100% Completed | 39.6s
CPU times: user 20.1 s, sys: 21.8 s, total: 41.9 s
Wall time: 40.3 s
Out[39]:
In [34]:
%time plot(*USA)
[########################################] | 100% Completed | 41.4s
CPU times: user 23.6 s, sys: 24 s, total: 47.5 s
Wall time: 41.8 s
Out[34]:

Create Notebook HTML

In [5]:
!jupyter nbconvert --to html_toc \
    --output-dir=../resources/ ./00_Twitter_datashader.ipynb \
    --template=../nbconvert.tpl \
    --ExtractOutputPreprocessor.enabled=False >&- 2>&- # create single output file
In [ ]: