Вступ до Apache Spark

Моделі та методи обробки великих даних

Ігор Мірошниченко

КНУ імені Тараса Шевченка, ФІТ

Від DuckDB/Polars до Apache Spark

Де ми зараз?

flowchart LR
    A["DuckDB<br/>Аналітика на<br/>одній машині"] --> B["Polars<br/>DataFrame<br/>на одній машині"]
    B --> C["Apache Spark<br/>Розподілена обробка<br/>на кластері"]
    style C fill:#f9b928,stroke:#333,stroke-width:3px

DuckDB Polars Spark
Масштаб 1 машина 1 машина Кластер
Дані ~100 GB ~100 GB Петабайти
Мова ядра C++ Rust Scala / Java
API SQL Python / Rust Python / Scala / SQL / R / Java
Lazy Так Так Так

Коли потрібен Spark?

DuckDB / Polars підходять, коли:

  • Дані поміщаються на одному сервері
  • Потрібна максимальна швидкість на невеликих обсягах
  • Працюєте локально (ноутбук, CI/CD)
  • Прототипування та EDA

Spark потрібен, коли:

  • Дані не поміщаються на одній машині
  • Потрібна горизонтальна масштабованість
  • Batch + Streaming обробка
  • Інтеграція з Hadoop-екосистемою
  • Потрібна відмовостійкість на рівні кластера

Великі дані та розподілена обробка

Що таке Big Data?

Великі дані — це дані, які при зберіганні та обробці не поміщаються в рамках одного комп’ютера або сервера.

Модель 3V:

  • Volume — обсяг: терабайти, петабайти
  • Velocity — швидкість: дані генеруються щосекунди
  • Variety — різноманітність: таблиці, JSON, логи, зображення

graph TD
    A["3V"] --> B["Volume<br/>Обсяг"]
    A --> C["Velocity<br/>Швидкість"]
    A --> D["Variety<br/>Різноманітність"]
    style A fill:#f9b928,stroke:#333

Приклади: пости соцмереж, замовлення Amazon, пошукові запити Google, транзакції банків, дані IoT-сенсорів.

Розподілена обробка

flowchart LR
    A["Великий датасет<br/>10 TB"] --> B["Частина 1<br/>2.5 TB"]
    A --> C["Частина 2<br/>2.5 TB"]
    A --> D["Частина 3<br/>2.5 TB"]
    A --> E["Частина 4<br/>2.5 TB"]
    B --> F["Результат 1"]
    C --> G["Результат 2"]
    D --> H["Результат 3"]
    E --> I["Результат 4"]
    F --> J["Фінальний<br/>результат"]
    G --> J
    H --> J
    I --> J
    style J fill:#d9f6ec,stroke:#28a87d

Принцип:

  1. Дані діляться на частини
  2. Кожна частина обробляється окремим сервером
  3. Результати об’єднуються

Переваги розподіленої обробки

Перевага Опис
Масштабованість Додаємо більше серверів при зростанні даних (горизонтальне масштабування)
Паралельність Задачі виконуються одночасно на багатьох серверах
Надійність Якщо один сервер виходить з ладу — решта продовжують роботу

Кластер

Кластер — множина серверів, об’єднаних мережею для координації та взаємодії:

flowchart TD
    M["Master Node<br/>(координатор)"]
    M --> W1["Worker 1<br/>CPU: 16, RAM: 64 GB"]
    M --> W2["Worker 2<br/>CPU: 16, RAM: 64 GB"]
    M --> W3["Worker 3<br/>CPU: 16, RAM: 64 GB"]
    M --> W4["Worker N<br/>CPU: 16, RAM: 64 GB"]
    style M fill:#f9b928,stroke:#333,stroke-width:2px

  • Worker (нода / вузол) — сервер, що зберігає та обробляє свою частину даних
  • Master — головний сервер, що координує роботу кластера та розподіляє задачі

Забезпечення надійності

flowchart LR
    subgraph Sub1 ["Резервний Master"]
        A1["Active Master"] -.->|failover| A2["Standby Master"]
    end
    
    subgraph Sub2 ["Синхронізатор"]
        B1["ZooKeeper"] --> B2["Node 1"]
        B1 --> B3["Node 2"]
        B1 --> B4["Node 3"]
    end
    
    subgraph Sub3 ["Без Master"]
        C1["Node A"] <--> C2["Node B"]
        C2 <--> C3["Node C"]
        C3 <--> C1
    end

    %% Невидимі зв'язки, які змушують блоки стати в один ряд зліва направо
    Sub1 ~~~ Sub2 ~~~ Sub3

Підхід Приклад Опис
Резервний Master HDFS (NameNode) Активний + Standby, автоматичний failover
Синхронізатор Apache ZooKeeper Зовнішній сервіс координації
Без Master Cassandra Усі вузли рівноправні

Hadoop: попередник Spark

Що таке Hadoop?

Hadoop — система для зберігання та обробки великих даних, створена у 2006 році.

Основні характеристики:

  • Розподілене зберігання та обробка
  • Відмовостійкість (автоматичне резервування)
  • Горизонтальна масштабованість
  • Відкритий код

flowchart TD
    H["Hadoop"] --> HDFS["HDFS<br/>Distributed<br/>File System"]
    H --> MR["MapReduce<br/>Processing<br/>Engine"]
    style H fill:#f9b928,stroke:#333,stroke-width:2px

HDFS: розподілена файлова система

flowchart TD
    NN["NameNode<br/>(метадані, структура файлів)"]
    NN --> DN1["DataNode 1<br/>Block A, Block B"]
    NN --> DN2["DataNode 2<br/>Block A, Block C"]
    NN --> DN3["DataNode 3<br/>Block B, Block C"]
    Client["Клієнт"] --> NN
    Client -.->|"дані напряму"| DN1
    Client -.->|"дані напряму"| DN2
    style NN fill:#f9b928,stroke:#333,stroke-width:2px

  • NameNode — зберігає метадані: структуру файлового дерева, розташування блоків
  • DataNode — зберігає блоки даних та виконує файлові операції
  • Реплікація — кожен блок зберігається у 3 копіях на різних нодах

Важливо

Дані передаються клієнту напряму від DataNode, минаючи NameNode, що знижує навантаження на Master.

MapReduce: принцип обробки

flowchart LR
    I["Вхідні дані"] --> S["Split"]
    S --> M1["Map:<br/>підрахунок<br/>на частині 1"]
    S --> M2["Map:<br/>підрахунок<br/>на частині 2"]
    S --> M3["Map:<br/>підрахунок<br/>на частині 3"]
    M1 --> Sh["Shuffle<br/>& Sort"]
    M2 --> Sh
    M3 --> Sh
    Sh --> R1["Reduce:<br/>об'єднання"]
    R1 --> O["Результат"]
    style Sh fill:#fde8e8,stroke:#c10000

Приклад: підрахунок слів у великому тексті

  1. Map — кожна нода рахує слова у своїй частині
  2. Shuffle & Sort — результати групуються за ключем (словом)
  3. Reduce — підсумовуються значення для кожного ключа

Обмеження MapReduce

Проблеми MapReduce:

  • Записує проміжні дані на диск між фазами
  • Повільний для ітеративних алгоритмів (ML)
  • Складний API
  • Лише batch-обробка
  • Висока латентність

Spark вирішує це:

  • Обробка в оперативній пам’яті
  • До 100x швидше для ітеративних задач
  • Зручний високорівневий API
  • Batch + Streaming
  • Низька латентність

flowchart LR
    A["MapReduce<br/>Disk I/O<br/>між фазами"] -->|"до 100x повільніше"| B["Spark<br/>In-Memory<br/>Processing"]
    style A fill:#fde8e8,stroke:#c10000
    style B fill:#d9f6ec,stroke:#28a87d

Що таке Apache Spark?

Apache Spark

Apache Spark — open source фреймворк для розподіленої обробки даних, широко використовуваний у задачах аналізу великих даних.

  • Обробка даних в оперативній пам’яті
  • Підтримка різних типів даних: структуровані, напівструктуровані, неструктуровані
  • Заміна MapReduce з набагато вищою продуктивністю
  • API для Python, Scala, Java, R та SQL
  • Batch та Streaming обробка

graph TD
    S["Apache Spark"]
    S --> P["Python<br/>PySpark"]
    S --> SC["Scala"]
    S --> J["Java"]
    S --> R["R<br/>SparkR"]
    S --> SQL["SQL<br/>Spark SQL"]
    style S fill:#f9b928,stroke:#333,stroke-width:2px

Хто використовує Spark?

Користувачі:

  • Data Engineers — ETL-пайплайни
  • Data Scientists — розподілений ML
  • Data Analysts — SQL-аналітика на великих даних
  • Великі корпорації та стартапи

Задачі:

  • Підключення до зовнішніх джерел (Postgres, S3, HDFS, Kafka)
  • ETL/ELT-процеси
  • Batch та real-time аналітика

Batch vs Streaming:

flowchart TD
    subgraph "Batch"
        B1["Накопичені дані"] --> B2["Обробка"] --> B3["Результат"]
    end
    subgraph "Streaming"
        S1["Потік подій<br/>(щосекунди)"] --> S2["Micro-batch<br/>обробка"] --> S3["Результат<br/>в реальному часі"]
    end

Де розгортають Spark?

Варіант Опис Приклади
Hadoop-дистрибутиви У складі повного Hadoop-стеку Cloudera CDP, Arenadata ADH
Хмарні провайдери Керований сервіс у хмарі Databricks, AWS EMR, GCP Dataproc
Standalone Автономний кластер Spark Для тестування та розробки
Kubernetes Запуск у контейнерах Продакшн-середовище

Структура Spark

Рівні архітектури

flowchart TD
    subgraph "Бібліотеки"
        L1["Spark SQL<br/>+ DataFrames"]
        L2["Spark<br/>Streaming"]
        L3["MLlib<br/>(ML)"]
        L4["GraphX<br/>(Графи)"]
    end
    subgraph "Core API"
        CA["Python · Scala · Java · R · SQL"]
    end
    subgraph "Processing Engine"
        E["Spark Core<br/>(управління пам'яттю, оптимізація, серіалізація)"]
    end
    subgraph "Resource Management"
        RM1["Standalone"]
        RM2["YARN"]
        RM3["Kubernetes"]
        RM4["Mesos"]
    end
    subgraph "Data Storage"
        ST1["HDFS"]
        ST2["S3 / GCS"]
        ST3["PostgreSQL"]
        ST4["Kafka"]
    end
    L1 --> CA
    L2 --> CA
    L3 --> CA
    L4 --> CA
    CA --> E
    E --> RM1
    E --> RM2
    E --> RM3
    E --> RM4
    E --> ST1
    E --> ST2
    E --> ST3
    E --> ST4
    style E fill:#f9b928,stroke:#333,stroke-width:2px

Бібліотеки Spark

Бібліотека Призначення Приклад задачі
Spark SQL + DataFrames Робота зі структурованими даними через API або SQL Агрегації, join’и, ETL
Spark Streaming Обробка даних у реальному часі (micro-batch) Моніторинг, аналітика подій
MLlib Розподілене навчання ML-моделей Класифікація, кластеризація на великих даних
GraphX Робота з графовими структурами Соціальні мережі, PageRank

Порада

У цьому курсі ми зосередимось на Spark SQL + DataFrames через PySpark.

Data Storage — конектори

Spark має конектори до різноманітних сховищ:

Файлові системи:

  • HDFS
  • AWS S3, GCS, Azure Blob
  • Локальна ФС

Формати файлів:

  • CSV, JSON
  • Parquet, ORC, Avro
  • Delta Lake, Iceberg

Бази даних (JDBC):

  • PostgreSQL, MySQL
  • ClickHouse, Greenplum
  • Oracle, SQL Server

Інші:

  • MongoDB, Cassandra, Elastic
  • Apache Kafka
  • Apache Hive

Архітектура Spark

Driver та Executors

flowchart TD
    subgraph "Driver (Master)"
        D["SparkContext<br/>(SparkSession)"]
        D --> P["Планування задач"]
        D --> M["Моніторинг"]
    end
    subgraph "Cluster Manager"
        CM["YARN / K8s / Standalone"]
    end
    subgraph "Worker Node 1"
        E1["Executor 1"]
        E1 --> T1["Task"]
        E1 --> T2["Task"]
        E1 --> C1["Cache"]
    end
    subgraph "Worker Node 2"
        E2["Executor 2"]
        E2 --> T3["Task"]
        E2 --> T4["Task"]
        E2 --> C2["Cache"]
    end
    D --> |"запит ресурсів"| CM
    CM --> |"виділення"| E1
    CM --> |"виділення"| E2
    D --> |"розподіл задач"| E1
    D --> |"розподіл задач"| E2
    E1 --> |"статус"| D
    E2 --> |"статус"| D
    style D fill:#f9b928,stroke:#333,stroke-width:2px

Ключові концепції

Концепція Опис
Cluster Група серверів (або ВМ) із встановленим Spark
Cluster Manager Зовнішній сервіс для надання ресурсів (YARN, K8s, Standalone)
Driver Master-процес: перетворює додаток на задачі, розподіляє їх
SparkContext Точка входу для взаємодії з кластером
Worker Node Сервер, на якому запускається Executor
Executor Процес на Worker, що виконує Tasks
Task Мінімальна одиниця роботи: 1 операція над 1 Partition
Partition Мінімальний логічний обсяг даних для обчислень
Application Spark-додаток: Driver + Executors

Deploy Mode

flowchart LR
    subgraph Sub1 ["Client Mode"]
        U1["Ваш ноутбук<br/>(Driver)"] --> |"задачі"| CL1["Кластер<br/>(тільки Executors)"]
    end
    
    subgraph Sub2 ["Cluster Mode"]
        CL2["Кластер<br/>(Driver + Executors)"]
    end

    %% Невидимий зв'язок для горизонтального вирівнювання блоків
    Sub1 ~~~ Sub2

Режим Driver знаходиться Коли використовувати
Client Mode На машині користувача (ноутбук, Jupyter) Розробка, дебаг, інтерактивна робота
Cluster Mode Усередині кластера Продакшн, автоматичні пайплайни

Задачі Driver та Executor

Driver (Master):

  1. Створення SparkContext
  2. Перетворення завдання на Job
  3. Запит ресурсів у Cluster Manager
  4. Розподіл Tasks між Executors
  5. Перезапуск Tasks при збоях
  6. Збір та повернення результатів
  7. Моніторинг через WebUI

Executor (Worker):

  1. Виконання Tasks, призначених Driver
  2. Повернення статусу та прогресу
  3. Зберігання даних у кеші

sequenceDiagram
    Driver->>ClusterMgr: Запит ресурсів
    ClusterMgr->>Executor: Виділення
    Driver->>Executor: Task
    Executor->>Driver: Результат

Основні концепції Spark

Job → Stage → Task

flowchart TD
    APP["Application"] --> J1["Job 1"]
    APP --> J2["Job 2"]
    J1 --> S1["Stage 1"]
    J1 --> S2["Stage 2"]
    J1 --> S3["Stage 3"]
    S1 --> T1["Task 1.1"]
    S1 --> T2["Task 1.2"]
    S1 --> T3["Task 1.3"]
    S2 --> T4["Task 2.1"]
    S2 --> T5["Task 2.2"]
    S2 --> T6["Task 2.3"]
    S3 --> T7["Task 3.1"]
    S3 --> T8["Task 3.2"]
    style APP fill:#f9b928,stroke:#333
    style S2 fill:#fde8e8,stroke:#c10000

  • Job — набір послідовних Stages, створюється при виклику Action
  • Stage — набір Tasks, які виконуються паралельно без Shuffle
  • Task — одна операція над однією Partition одним Executor
  • Shuffle — пересилання даних по мережі між Stages

Partition та паралелізм

flowchart LR
    subgraph "DataFrame: 1M рядків"
        P1["Partition 1<br/>5000 рядків"]
        P2["Partition 2<br/>5000 рядків"]
        P3["Partition 3<br/>5000 рядків"]
        PX["...<br/>200 partitions"]
    end
    P1 --> E1["Executor 1"]
    P2 --> E2["Executor 2"]
    P3 --> E3["Executor 3"]
    PX --> EX["Executor N"]
    E1 --> R["Результат"]
    E2 --> R
    E3 --> R
    EX --> R
    style R fill:#d9f6ec,stroke:#28a87d

Приклад: таблиця замовлень — 10 колонок, 1 мільйон рядків

  • Розбивається на 200 partitions (по ~5000 рядків)
  • Кожен Executor обробляє свою partition паралельно
  • Замість 1M послідовних операцій — 200 паралельних по 5K

Примітка

200 — кількість partitions за замовчуванням у Spark (spark.sql.shuffle.partitions).

Narrow vs Wide Transformations

flowchart LR
    subgraph "Narrow (без Shuffle)"
        A1["Partition 1"] --> B1["Partition 1'"]
        A2["Partition 2"] --> B2["Partition 2'"]
        A3["Partition 3"] --> B3["Partition 3'"]
    end
    subgraph "Wide (Shuffle!)"
        C1["Partition 1"] --> D1["Partition A"]
        C1 --> D2["Partition B"]
        C2["Partition 2"] --> D1
        C2 --> D2
        C3["Partition 3"] --> D1
        C3 --> D2
    end
    style D1 fill:#fde8e8,stroke:#c10000
    style D2 fill:#fde8e8,stroke:#c10000

Narrow Wide
Shuffle Ні Так
Приклади filter, select, withColumn, map groupBy, join, repartition, distinct
Швидкість Швидко Повільно (мережа + диск)

Shuffle — найдорожча операція

flowchart LR
    subgraph "Stage 1 (Map)"
        M1["Executor 1"]
        M2["Executor 2"]
        M3["Executor 3"]
    end
    M1 --> |"мережа"| SH["Shuffle<br/>(серіалізація → мережа → диск → десеріалізація)"]
    M2 --> |"мережа"| SH
    M3 --> |"мережа"| SH
    subgraph "Stage 2 (Reduce)"
        SH --> R1["Executor A"]
        SH --> R2["Executor B"]
    end
    style SH fill:#fde8e8,stroke:#c10000,stroke-width:3px

Під час Shuffle:

  • Дані серіалізуються та пересилаються по мережі
  • Проміжні результати записуються на диск
  • Дані десеріалізуються на приймаючому боці

Попередження

Кожен Shuffle створює новий Stage. Більше Stages = повільніший додаток.

Lazy Evaluation

Spark використовує принцип ледачих обчислень — нічого не виконується, доки не викликано Action:

flowchart LR
    T1["filter()"] --> T2["select()"] --> T3["groupBy()"]
    T3 --> |"Action!"| A["show() / count() / write()"]
    T1 -.->|"план"| T2
    T2 -.->|"план"| T3
    style A fill:#d9f6ec,stroke:#28a87d,stroke-width:2px

Transformations (ледачі):

  • filter(), select(), withColumn()
  • groupBy(), join()
  • Лише будують план виконання

Actions (запускають виконання):

  • show() — вивести дані
  • count() — підрахувати рядки
  • collect() — зібрати на Driver
  • write() — зберегти
  • toPandas() — конвертувати

Оптимізатор Catalyst

flowchart LR
    A["Ваш код<br/>(Transformations)"] --> B["Logical Plan"]
    B --> C["Analyzed Plan"]
    C --> D["Optimized Plan"]
    D --> E["Physical Plans"]
    E --> F["Best Plan"]
    F --> G["Виконання<br/>на кластері"]
    style D fill:#d9f6ec,stroke:#28a87d,stroke-width:2px
    style F fill:#d9f6ec,stroke:#28a87d

Catalyst автоматично:

  • Переміщує фільтри до джерел даних (Predicate Pushdown)
  • Обирає лише потрібні стовпці (Projection Pushdown)
  • Об’єднує суміжні операції
  • Обирає оптимальну стратегію Join

Базові структури даних

RDD → DataFrame → Dataset

flowchart LR
    RDD["RDD<br/>(2011)"] --> DF["DataFrame<br/>(2015)"]
    DF --> DS["Dataset<br/>(2016)"]
    style DF fill:#d9f6ec,stroke:#28a87d,stroke-width:2px

RDD DataFrame Dataset
Рівень API Низький Високий Високий
Оптимізація Catalyst Ні Так Так
Типізація Ні Частково Повна (Scala/Java)
Python Повільний Швидкий Недоступний
Використання Рідко Основний Scala/Java

Порада

У PySpark завжди використовуйте DataFrame — це основна абстракція для роботи з даними.

SparkSession

Створення SparkSession

SparkSession — центральний компонент для розробки Spark-додатків:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("MyApp")                              # назва додатку
    .master("local[*]")                            # URL кластера
    .config("spark.executor.memory", "4g")         # пам'ять executor
    .config("spark.executor.cores", "4")           # ядра executor
    .config("spark.driver.memory", "2g")           # пам'ять driver
    .config("spark.jars", "path/to/connector.jar") # додаткові JAR
    .getOrCreate()  # створити нову або повернути існуючу
)

Примітка

getOrCreate() — створює нову сесію або повертає існуючу. Один JVM-процес = одна SparkSession.

Параметри SparkSession

Параметр Опис Приклад
appName Назва додатку (для WebUI) "ETL Pipeline"
master URL кластера local[*], yarn, k8s://...
spark.executor.memory Пам’ять на Executor "4g", "8g"
spark.executor.cores Ядра на Executor "4"
spark.driver.memory Пам’ять Driver "2g"
spark.sql.shuffle.partitions К-сть partitions при Shuffle 200 (за замовч.)
spark.jars Додаткові JAR-файли "/path/to/connector.jar"
# Перевірка конфігурації
spark.sparkContext.getConf().getAll()

# Зупинка сесії (обов'язково після завершення!)
spark.stop()

Створення DataFrame

Способи створення

flowchart TD
    DF["DataFrame"]
    DF --> A["З колекцій Python<br/>(списки, кортежі, словники)"]
    DF --> B["З Pandas DataFrame"]
    DF --> C["З файлів<br/>(CSV, Parquet, JSON)"]
    DF --> D["З баз даних<br/>(JDBC)"]
    DF --> E["З SQL-запиту"]
    style DF fill:#f9b928,stroke:#333,stroke-width:2px

З Python-колекцій

# Зі списку кортежів
df = spark.createDataFrame(
    [
        ("Київ", 2967000),
        ("Львів", 717000),
        ("Одеса", 1015000),
    ],
    schema=["city", "population"],
)
df.show()
+-----+----------+
| city|population|
+-----+----------+
| Київ|   2967000|
|Львів|    717000|
|Одеса|   1015000|
+-----+----------+
# Зі списку словників
df = spark.createDataFrame([
    {"name": "Олена", "age": 28, "active": True},
    {"name": "Андрій", "age": 32, "active": False},
])
df.show()
+------+---+------+
|active|age|  name|
+------+---+------+
|  true| 28| Олена|
| false| 32|Андрій|
+------+---+------+

Визначення схеми (StructType)

schema = StructType([
    StructField("name", StringType(), nullable=False),
    StructField("department", StringType(), nullable=True),
    StructField("salary", IntegerType(), nullable=False),
    StructField("start_date", DateType(), nullable=True),
])

data = [
    ("Олена", "DS", 45000, date(2022, 3, 15)),
    ("Андрій", "Eng", 52000, date(2021, 7, 1)),
    ("Марія", "DS", 48000, date(2023, 1, 10)),
]

df = spark.createDataFrame(data, schema=schema)
df.printSchema()
root
 |-- name: string (nullable = false)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = false)
 |-- start_date: date (nullable = true)

Типи даних Spark

graph TD
    A["Spark Types"] --> B["Прості"]
    A --> C["Складні"]

    B --> B1["StringType"]
    B --> B2["IntegerType / LongType"]
    B --> B3["FloatType / DoubleType"]
    B --> B4["BooleanType"]
    B --> B5["DateType / TimestampType"]
    B --> B6["BinaryType / ByteType"]

    C --> C1["ArrayType<br/>(списки)"]
    C --> C2["MapType<br/>(словники)"]
    C --> C3["StructType<br/>(вкладені записи)"]

    style A fill:#f9b928,stroke:#333

# Складні типи
complex_schema = StructType([
    StructField("name", StringType()),
    StructField("skills", ArrayType(StringType())),
    StructField("scores", MapType(StringType(), IntegerType())),
])

df_complex = spark.createDataFrame([
    ("Олена", ["Python", "SQL"], {"math": 95, "code": 90}),
    ("Андрій", ["Scala", "Spark", "Java"], {"math": 88, "code": 95}),
], schema=complex_schema)
df_complex.show(truncate=False)
+------+--------------------+------------------------+
|name  |skills              |scores                  |
+------+--------------------+------------------------+
|Олена |[Python, SQL]       |{code -> 90, math -> 95}|
|Андрій|[Scala, Spark, Java]|{code -> 95, math -> 88}|
+------+--------------------+------------------------+

З Pandas DataFrame

pdf = pd.DataFrame({
    "product": ["Ноутбук", "Телефон", "Планшет"],
    "price": [25000.0, 15000.0, 12000.0],
    "in_stock": [True, True, False],
})

# Pandas → Spark
sdf = spark.createDataFrame(pdf)
sdf.show()

# Spark → Pandas
back_to_pandas = sdf.toPandas()
print(type(back_to_pandas))
back_to_pandas
+-------+-------+--------+
|product|  price|in_stock|
+-------+-------+--------+
|Ноутбук|25000.0|    true|
|Телефон|15000.0|    true|
|Планшет|12000.0|   false|
+-------+-------+--------+

<class 'pandas.DataFrame'>
product price in_stock
0 Ноутбук 25000.0 True
1 Телефон 15000.0 True
2 Планшет 12000.0 False

Попередження

toPandas() збирає ВСІ дані на Driver в оперативну пам’ять однієї машини! Для великих датасетів — OutOfMemory. Використовуйте тільки для невеликих результатів (агрегації, візуалізація).

З файлів

# Створимо тестові дані та збережемо у різні формати
test_data = spark.createDataFrame([
    ("Олена", "DS", 45000), ("Андрій", "Eng", 52000),
    ("Марія", "DS", 48000), ("Петро", "Eng", 55000),
], schema=["name", "department", "salary"])

test_data.write.mode("overwrite").csv("spark_demo/csv", header=True)
test_data.write.mode("overwrite").parquet("spark_demo/parquet")
test_data.write.mode("overwrite").json("spark_demo/json")
# Читання CSV
df_csv = spark.read.csv("spark_demo/csv", header=True, inferSchema=True)
df_csv.show()
# +------+----------+------+
# |  name|department|salary|
# +------+----------+------+
# | Олена|        DS| 45000|
# |Андрій|       Eng| 52000|
# | Марія|        DS| 48000|
# | Петро|       Eng| 55000|
# +------+----------+------+
# Parquet — зберігає схему автоматично
df_pq = spark.read.parquet("spark_demo/parquet")
df_pq.printSchema()
# root
#  |-- name: string (nullable = true)
#  |-- department: string (nullable = true)
#  |-- salary: long (nullable = true)
Формат Стиснення Схема Швидкість Рекомендація
CSV Немає Ні Повільно Тільки імпорт
JSON Немає Частково Повільно Напівструктуровані
Parquet Стовпчасте Так Швидко Основний формат
ORC Стовпчасте Так Швидко Hive-сумісність

Перші операції з DataFrame

select — вибір стовпців

# Вибір стовпців
df.select("name", "salary").show()
+------+------+
|  name|salary|
+------+------+
| Олена| 45000|
|Андрій| 52000|
| Марія| 48000|
| Петро| 55000|
| Ірина| 42000|
|Сергій| 51000|
+------+------+
# Вибір з трансформацією
df.select(
    F.col("name"),
    F.col("salary"),
    (F.col("salary") * 1.1).alias("salary_raised"),
).show()
+------+------+-----------------+
|  name|salary|    salary_raised|
+------+------+-----------------+
| Олена| 45000|49500.00000000001|
|Андрій| 52000|57200.00000000001|
| Марія| 48000|52800.00000000001|
| Петро| 55000|60500.00000000001|
| Ірина| 42000|46200.00000000001|
|Сергій| 51000|56100.00000000001|
+------+------+-----------------+

filter / where — фільтрація

# filter та where — синоніми
df.filter(F.col("salary") > 45000).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
|Андрій|       Eng| 52000| 32|
| Марія|        DS| 48000| 25|
| Петро|       Eng| 55000| 35|
|Сергій|        DS| 51000| 31|
+------+----------+------+---+
# Комбіновані умови
df.filter(
    (F.col("salary") > 45000) & (F.col("department") == "DS")
).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
| Марія|        DS| 48000| 25|
|Сергій|        DS| 51000| 31|
+------+----------+------+---+
# Через SQL-вираз (рядок)
df.filter("salary > 45000 AND department = 'DS'").show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
| Марія|        DS| 48000| 25|
|Сергій|        DS| 51000| 31|
+------+----------+------+---+
# is_in — аналог .is_in() у Polars
df.filter(F.col("department").isin(["DS", "Eng"])).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
| Олена|        DS| 45000| 28|
|Андрій|       Eng| 52000| 32|
| Марія|        DS| 48000| 25|
| Петро|       Eng| 55000| 35|
|Сергій|        DS| 51000| 31|
+------+----------+------+---+

Примітка

filter та where — повні синоніми. Комбінуючи умови, кожну треба брати в дужки: (A) & (B), (A) | (B), ~(A).

withColumn — додавання стовпців

df_enriched = (
    df
    .withColumn("salary_k", F.col("salary") / 1000)
    .withColumn("is_senior", F.col("age") >= 30)
    .withColumn(
        "level",
        F.when(F.col("salary") >= 52000, "Senior")
         .when(F.col("salary") >= 45000, "Middle")
         .otherwise("Junior")
    )
)
df_enriched.show()
+------+----------+------+---+--------+---------+------+
|  name|department|salary|age|salary_k|is_senior| level|
+------+----------+------+---+--------+---------+------+
| Олена|        DS| 45000| 28|    45.0|    false|Middle|
|Андрій|       Eng| 52000| 32|    52.0|     true|Senior|
| Марія|        DS| 48000| 25|    48.0|    false|Middle|
| Петро|       Eng| 55000| 35|    55.0|     true|Senior|
| Ірина|       Mkt| 42000| 29|    42.0|    false|Junior|
|Сергій|        DS| 51000| 31|    51.0|     true|Middle|
+------+----------+------+---+--------+---------+------+

lit та cast

# lit — константне значення для кожного рядка
df.select(
    F.col("name"),
    F.lit("Україна").alias("country"),
    F.lit(2025).alias("year"),
).show()
+------+-------+----+
|  name|country|year|
+------+-------+----+
| Олена|Україна|2025|
|Андрій|Україна|2025|
| Марія|Україна|2025|
| Петро|Україна|2025|
| Ірина|Україна|2025|
|Сергій|Україна|2025|
+------+-------+----+
# cast — зміна типу даних
df.select(
    F.col("name"),
    F.col("salary").cast("double").alias("salary_double"),
    F.col("salary").cast(StringType()).alias("salary_str"),
).show()
+------+-------------+----------+
|  name|salary_double|salary_str|
+------+-------------+----------+
| Олена|      45000.0|     45000|
|Андрій|      52000.0|     52000|
| Марія|      48000.0|     48000|
| Петро|      55000.0|     55000|
| Ірина|      42000.0|     42000|
|Сергій|      51000.0|     51000|
+------+-------------+----------+
  • F.lit(value) — створює стовпець-константу
  • .cast(type) — змінює тип стовпця (аналог .cast() у Polars)

Рядкові функції

df_strings = spark.createDataFrame([
    ("  Олена Петрівна  ", "olena@example.com", "380-44-1234567", "Київ, Україна"),
    ("андрій іванович", "ANDRIY@EXAMPLE.COM", "380-32-9876543", "Львів, Україна"),
    ("МАРІЯ СЕРГІЇВНА", "maria@Example.Com", "380-48-1112233", "Одеса, Україна"),
], schema=["full_name", "email", "phone", "location"])
df_strings.select(
    F.trim(F.col("full_name")).alias("trimmed"),
    F.upper(F.col("full_name")).alias("upper"),
    F.lower(F.col("email")).alias("email_lower"),
    F.initcap(F.col("full_name")).alias("initcap"),
    F.length(F.col("full_name")).alias("len"),
).show(truncate=False)
+---------------+------------------+------------------+------------------+---+
|trimmed        |upper             |email_lower       |initcap           |len|
+---------------+------------------+------------------+------------------+---+
|Олена Петрівна |  ОЛЕНА ПЕТРІВНА  |olena@example.com |  Олена Петрівна  |18 |
|андрій іванович|АНДРІЙ ІВАНОВИЧ   |andriy@example.com|Андрій Іванович   |15 |
|МАРІЯ СЕРГІЇВНА|МАРІЯ СЕРГІЇВНА   |maria@example.com |Марія Сергіївна   |15 |
+---------------+------------------+------------------+------------------+---+
df_strings.select(
    F.substring(F.col("phone"), 1, 3).alias("code"),
    F.split(F.col("location"), ", ").getItem(0).alias("city"),
    F.concat(F.lit("+"), F.col("phone")).alias("full"),
    F.regexp_replace(F.col("phone"), "-", "").alias("clean"),
    F.regexp_extract(F.col("email"), r"@(.+)", 1).alias("domain"),
).show(truncate=False)
+----+-----+---------------+------------+-----------+
|code|city |full           |clean       |domain     |
+----+-----+---------------+------------+-----------+
|380 |Київ |+380-44-1234567|380441234567|example.com|
|380 |Львів|+380-32-9876543|380329876543|EXAMPLE.COM|
|380 |Одеса|+380-48-1112233|380481112233|Example.Com|
+----+-----+---------------+------------+-----------+

Функції дати та часу

df_dates = spark.createDataFrame([
    ("2024-03-15", "2024-06-20"),
    ("2023-11-01", "2024-01-15"),
    ("2024-01-20", "2024-12-31"),
], schema=["start", "end"])
df_dates.select(
    F.to_date("start").alias("date"),
    F.year(F.to_date("start")).alias("year"),
    F.month(F.to_date("start")).alias("month"),
    F.dayofmonth(F.to_date("start")).alias("day"),
    F.dayofweek(F.to_date("start")).alias("weekday"),
    F.datediff(F.to_date("end"), F.to_date("start")).alias("days"),
    F.add_months(F.to_date("start"), 3).alias("plus_3m"),
    F.date_format(F.to_date("start"), "dd.MM.yyyy").alias("fmt"),
    F.months_between(F.to_date("end"), F.to_date("start")).alias("months"),
).show(truncate=False)
+----------+----+-----+---+-------+----+----------+----------+-----------+
|date      |year|month|day|weekday|days|plus_3m   |fmt       |months     |
+----------+----+-----+---+-------+----+----------+----------+-----------+
|2024-03-15|2024|3    |15 |6      |97  |2024-06-15|15.03.2024|3.16129032 |
|2023-11-01|2023|11   |1  |4      |75  |2024-02-01|01.11.2023|2.4516129  |
|2024-01-20|2024|1    |20 |7      |346 |2024-04-20|20.01.2024|11.35483871|
+----------+----+-----+---+-------+----+----------+----------+-----------+

Агрегація та Group By

groupBy + agg

sales.groupBy("product").agg(
    F.sum("amount").alias("total_revenue"),
    F.avg("amount").alias("avg_order"),
    F.count("*").alias("n_orders"),
    F.max("amount").alias("max_order"),
).orderBy(F.desc("total_revenue")).show()
+---------+-----------------+------------------+--------+---------+
|  product|    total_revenue|         avg_order|n_orders|max_order|
+---------+-----------------+------------------+--------+---------+
|  Планшет|        541217.97|2027.0335955056178|     267| 12699.88|
|  Телефон|530014.0300000004|2015.2624714828912|     263| 13026.35|
|  Ноутбук|448507.4899999998| 2057.373807339449|     218| 11576.76|
|Навушники|447316.2600000003| 1775.064523809525|     252| 10369.71|
+---------+-----------------+------------------+--------+---------+

Агрегатні функції: sum, avg / mean, count, min, max, collect_list, collect_set, countDistinct, stddev, variance

Множинна агрегація

# Групування по кількох стовпцях
sales.groupBy("product", "region").agg(
    F.round(F.avg("amount"), 2).alias("avg_amount"),
    F.sum("quantity").alias("total_qty"),
    F.countDistinct("customer_id").alias("unique_customers"),
).orderBy("product", F.desc("avg_amount")).show()
+---------+------+----------+---------+----------------+
|  product|region|avg_amount|total_qty|unique_customers|
+---------+------+----------+---------+----------------+
|Навушники| Одеса|   2154.93|      241|              49|
|Навушники|  Київ|   1787.34|      224|              45|
|Навушники|Харків|   1682.48|      233|              43|
|Навушники|Дніпро|   1674.71|      277|              56|
|Навушники| Львів|   1566.18|      264|              47|
|  Ноутбук| Одеса|   2538.05|      207|              34|
|  Ноутбук| Львів|   2003.79|      250|              48|
|  Ноутбук|Харків|   1979.31|      196|              37|
|  Ноутбук|  Київ|   1966.22|      226|              46|
|  Ноутбук|Дніпро|    1905.6|      224|              41|
|  Планшет| Одеса|   2232.79|      216|              46|
|  Планшет| Львів|   2166.28|      268|              50|
|  Планшет|  Київ|   2125.65|      221|              48|
|  Планшет|Дніпро|   1941.08|      295|              57|
|  Планшет|Харків|   1722.12|      248|              49|
|  Телефон| Львів|   2416.02|      202|              38|
|  Телефон|  Київ|   2096.44|      260|              53|
|  Телефон|Дніпро|   1957.42|      302|              57|
|  Телефон| Одеса|   1942.78|      236|              47|
|  Телефон|Харків|   1759.45|      274|              52|
+---------+------+----------+---------+----------------+
# Агрегація без groupBy — по всьому DataFrame
sales.agg(
    F.count("*").alias("total_orders"),
    F.sum("amount").alias("total_revenue"),
    F.avg("amount").alias("avg_order"),
).show()
+------------+-----------------+-----------------+
|total_orders|    total_revenue|        avg_order|
+------------+-----------------+-----------------+
|        1000|1967055.750000003|1967.055750000003|
+------------+-----------------+-----------------+

Візуалізація: Порівняння інструментів

Сортування

# orderBy та sort — синоніми
df.orderBy(F.desc("salary")).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
| Петро|       Eng| 55000| 35|
|Андрій|       Eng| 52000| 32|
|Сергій|        DS| 51000| 31|
| Марія|        DS| 48000| 25|
| Олена|        DS| 45000| 28|
| Ірина|       Mkt| 42000| 29|
+------+----------+------+---+
# Кілька стовпців
df.sort(
    F.col("department").asc(),
    F.col("salary").desc(),
).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
|Сергій|        DS| 51000| 31|
| Марія|        DS| 48000| 25|
| Олена|        DS| 45000| 28|
| Петро|       Eng| 55000| 35|
|Андрій|       Eng| 52000| 32|
| Ірина|       Mkt| 42000| 29|
+------+----------+------+---+
# Топ-N з оптимізацією
df.orderBy(F.desc("salary")).limit(5).show()
+------+----------+------+---+
|  name|department|salary|age|
+------+----------+------+---+
| Петро|       Eng| 55000| 35|
|Андрій|       Eng| 52000| 32|
|Сергій|        DS| 51000| 31|
| Марія|        DS| 48000| 25|
| Олена|        DS| 45000| 28|
+------+----------+------+---+

Примітка

orderBy + limit(N) — Spark оптимізує це і не сортує весь DataFrame.

Joins

Типи Join у Spark

# Inner Join
customers.join(orders, customers.id == orders.customer_id, "inner").show()
+---+------+-----+--------+-----------+------+
| id|  name| city|order_id|customer_id|amount|
+---+------+-----+--------+-----------+------+
|  2|Андрій|Львів|     103|          2| 800.0|
|  2|Андрій|Львів|     102|          2|2300.0|
|  1| Олена| Київ|     101|          1|1500.0|
+---+------+-----+--------+-----------+------+
Тип Join Опис SQL
inner Лише збіги з обох таблиць INNER JOIN
left Усі з лівої + збіги з правої LEFT JOIN
right Усі з правої + збіги з лівої RIGHT JOIN
full / outer Усі з обох таблиць FULL OUTER JOIN
anti З лівої, що НЕМАЄ в правій NOT EXISTS
semi З лівої, що Є в правій (без стовпців правої) EXISTS
cross Декартів добуток CROSS JOIN

Приклади Join

customers.join(orders, customers.id == orders.customer_id, "inner").show()
# Тільки Олена (id=1) та Андрій (id=2) — мають замовлення
+---+------+-----+--------+-----------+------+
| id|  name| city|order_id|customer_id|amount|
+---+------+-----+--------+-----------+------+
|  2|Андрій|Львів|     103|          2| 800.0|
|  2|Андрій|Львів|     102|          2|2300.0|
|  1| Олена| Київ|     101|          1|1500.0|
+---+------+-----+--------+-----------+------+
customers.join(orders, customers.id == orders.customer_id, "left").show()
# Усі клієнти, Марія та Петро — з null у полях замовлення
+---+------+------+--------+-----------+------+
| id|  name|  city|order_id|customer_id|amount|
+---+------+------+--------+-----------+------+
|  2|Андрій| Львів|     103|          2| 800.0|
|  2|Андрій| Львів|     102|          2|2300.0|
|  4| Петро|Харків|    NULL|       NULL|  NULL|
|  1| Олена|  Київ|     101|          1|1500.0|
|  3| Марія| Одеса|    NULL|       NULL|  NULL|
+---+------+------+--------+-----------+------+
# Клієнти БЕЗ замовлень
customers.join(orders, customers.id == orders.customer_id, "anti").show()
# Марія (id=3) та Петро (id=4)
+---+-----+------+
| id| name|  city|
+---+-----+------+
|  4|Петро|Харків|
|  3|Марія| Одеса|
+---+-----+------+
# Клієнти, ЩО мають замовлення (без стовпців orders)
customers.join(orders, customers.id == orders.customer_id, "semi").show()
# Олена та Андрій — без дублювання
+---+------+-----+
| id|  name| city|
+---+------+-----+
|  2|Андрій|Львів|
|  1| Олена| Київ|
+---+------+-----+

Spark SQL

SQL як альтернативний інтерфейс

# Реєстрація DataFrame як тимчасової таблиці
df.createOrReplaceTempView("employees")

# Тепер можна писати SQL!
spark.sql("""
    SELECT department,
           ROUND(AVG(salary), 0) AS avg_salary,
           COUNT(*) AS n_employees
    FROM employees
    WHERE salary > 45000
    GROUP BY department
    ORDER BY avg_salary DESC
""").show()
+----------+----------+-----------+
|department|avg_salary|n_employees|
+----------+----------+-----------+
|       Eng|   53500.0|          2|
|        DS|   49500.0|          2|
+----------+----------+-----------+

Порада

DataFrame API та Spark SQL — взаємозамінні та мають однакову продуктивність (обидва оптимізуються Catalyst).

CTE та віконні функції в SQL

spark.sql("""
    WITH customer_stats AS (
        SELECT customer_id,
               COUNT(*) AS n_orders,
               ROUND(SUM(amount)) AS total_spent
        FROM orders
        GROUP BY customer_id
    ),
    ranked AS (
        SELECT *,
               RANK() OVER (ORDER BY total_spent DESC) AS spending_rank
        FROM customer_stats
    )
    SELECT *
    FROM ranked
    WHERE spending_rank <= 10
""").show()
+-----------+--------+-----------+-------------+
|customer_id|n_orders|total_spent|spending_rank|
+-----------+--------+-----------+-------------+
|        146|       9|    29600.0|            1|
|        248|       4|    25021.0|            2|
|        232|       3|    16523.0|            3|
|        361|       7|    16478.0|            4|
|        100|       3|    16408.0|            5|
|        141|       2|    16395.0|            6|
|        466|       3|    16251.0|            7|
|         98|       4|    15748.0|            8|
|        205|       3|    14285.0|            9|
|        211|       6|    14254.0|           10|
+-----------+--------+-----------+-------------+
  • CTE (WITH) — покрокове побудова запиту
  • Віконні функції (OVER) — обчислення в контексті групи
  • RANK, ROW_NUMBER, DENSE_RANK, LAG, LEAD — усі підтримуються

Порівняння синтаксису

PySpark vs Polars vs SQL

(
    df
    .filter(F.col("salary") > 45000)
    .groupBy("department")
    .agg(
        F.avg("salary").alias("avg_salary"),
        F.count("*").alias("count"),
    )
    .orderBy(F.desc("avg_salary"))
).show()
+----------+----------+-----+
|department|avg_salary|count|
+----------+----------+-----+
|       Eng|   53500.0|    2|
|        DS|   49500.0|    2|
+----------+----------+-----+
(
    df_pl
    .filter(pl.col("salary") > 45000)
    .group_by("department")
    .agg(
        pl.col("salary").mean().alias("avg_salary"),
        pl.len().alias("count"),
    )
    .sort("avg_salary", descending=True)
)
SELECT department,
       AVG(salary) AS avg_salary,
       COUNT(*) AS count
FROM employees
WHERE salary > 45000
GROUP BY department
ORDER BY avg_salary DESC

Словник: Polars → PySpark

Polars PySpark Коментар
pl.col("x") F.col("x") Звернення до стовпця
.filter() .filter() / .where() Ідентично
.select() .select() Ідентично
.with_columns() .withColumn() По одному стовпцю
.group_by().agg() .groupBy().agg() camelCase
.sort() .orderBy() / .sort() Синоніми
pl.when().then().otherwise() F.when().otherwise() Без .then()
.alias() .alias() Ідентично
.collect() .show() / .collect() / .toPandas() Різні Actions
.head(n) .limit(n) / .show(n) Різні назви
pl.len() F.count("*") Кількість рядків
.n_unique() F.countDistinct() Унікальні
.over("group") .over(Window.partitionBy("group")) Віконні функції

Віконні функції

Window Functions

# Визначення вікна
w = Window.partitionBy("department").orderBy(F.desc("salary"))

df.withColumn("rank", F.rank().over(w)) \
  .withColumn("row_num", F.row_number().over(w)) \
  .withColumn("dept_avg", F.avg("salary").over(Window.partitionBy("department"))) \
  .show()
+------+----------+------+---+----+-------+--------+
|  name|department|salary|age|rank|row_num|dept_avg|
+------+----------+------+---+----+-------+--------+
|Сергій|        DS| 51000| 31|   1|      1| 48000.0|
| Марія|        DS| 48000| 25|   2|      2| 48000.0|
| Олена|        DS| 45000| 28|   3|      3| 48000.0|
| Петро|       Eng| 55000| 35|   1|      1| 53500.0|
|Андрій|       Eng| 52000| 32|   2|      2| 53500.0|
| Ірина|       Mkt| 42000| 29|   1|      1| 42000.0|
+------+----------+------+---+----+-------+--------+

Приклади віконних функцій

w = Window.partitionBy("product").orderBy("order_date")

orders_df.withColumn(
    "running_total", F.sum("amount").over(w)    # кумулятивна сума
).withColumn(
    "prev_amount", F.lag("amount", 1).over(w)   # попереднє значення
).withColumn(
    "next_amount", F.lead("amount", 1).over(w)  # наступне значення
).withColumn(
    "moving_avg", F.avg("amount").over(
        w.rowsBetween(-2, 0)                    # ковзне середнє (3 рядки)
    )
).show()
+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
|order_id|product| amount|order_date|     running_total|prev_amount|next_amount|        moving_avg|
+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
|      85|Ноутбук| 341.97|2024-01-22|            341.97|       NULL|     230.15|            341.97|
|      81|Ноутбук| 230.15|2024-01-30|            572.12|     341.97|     235.18|            286.06|
|      29|Ноутбук| 235.18|2024-02-28|             807.3|     230.15|       0.48|269.09999999999997|
|      43|Ноутбук|   0.48|2024-03-10|            807.78|     235.18|      98.12|            155.27|
|      66|Ноутбук|  98.12|2024-03-13|             905.9|       0.48|    1238.09|111.25999999999999|
|      27|Ноутбук|1238.09|2024-04-09|           2143.99|      98.12|      437.8| 445.5633333333333|
|      98|Ноутбук|  437.8|2024-06-03|           2581.79|    1238.09|     753.81| 591.3366666666667|
|       2|Ноутбук| 753.81|2024-06-15|            3335.6|      437.8|     415.26|             809.9|
|      46|Ноутбук| 415.26|2024-06-17|3750.8599999999997|     753.81|     416.51| 535.6233333333333|
|      35|Ноутбук| 416.51|2024-07-06|           4167.37|     415.26|    1649.51| 528.5266666666666|
|      84|Ноутбук|1649.51|2024-07-08|           5816.88|     416.51|     405.34| 827.0933333333332|
|      83|Ноутбук| 405.34|2024-07-14|           6222.22|    1649.51|      47.81| 823.7866666666667|
|      59|Ноутбук|  47.81|2024-07-20| 6270.030000000001|     405.34|     423.51| 700.8866666666667|
|      64|Ноутбук| 423.51|2024-07-25| 6693.540000000001|      47.81|     608.42|292.21999999999997|
|      11|Ноутбук| 608.42|2024-07-28| 7301.960000000001|     423.51|     187.64|359.91333333333336|
|      61|Ноутбук| 187.64|2024-09-08| 7489.600000000001|     608.42|     151.83|406.52333333333326|
|      45|Ноутбук| 151.83|2024-09-19| 7641.430000000001|     187.64|     595.88| 315.9633333333333|
|      91|Ноутбук| 595.88|2024-09-28| 8237.310000000001|     151.83|     171.45|311.78333333333336|
|      75|Ноутбук| 171.45|2024-10-01| 8408.760000000002|     595.88|    2214.79| 306.3866666666667|
|      72|Ноутбук|2214.79|2024-10-04|10623.550000000003|     171.45|    1672.17|            994.04|
+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
only showing top 20 rows
Функція Опис SQL
rank() Ранг з пропусками RANK()
row_number() Порядковий номер ROW_NUMBER()
dense_rank() Ранг без пропусків DENSE_RANK()
lag(col, n) Значення n рядків тому LAG(col, n)
lead(col, n) Значення через n рядків LEAD(col, n)
sum().over() Кумулятивна сума SUM() OVER()

Практичний приклад

Візуалізація: Структура кластера

Візуалізація: Пам’ять та формати

Візуалізація: Масштабування

Підсумок

Що ми вивчили

  1. Big Data та розподілена обробка
  2. Кластер, Master, Worker
  3. Hadoop: HDFS + MapReduce
  4. Архітектура Spark
  5. Driver, Executor, Partition
  6. Job → Stage → Task
  7. Narrow vs Wide Transformations
  8. Shuffle
  1. Lazy Evaluation + Catalyst
  2. RDD → DataFrame → Dataset
  3. SparkSession
  4. Створення DataFrame + StructType
  5. select, filter, withColumn
  6. groupBy + agg
  7. Joins (inner, left, anti, semi)
  8. Spark SQL + віконні функції

Ресурси

Домашнє завдання

  1. Встановіть PySpark: uv add pyspark
  2. Створіть SparkSession та DataFrame з >10K рядків
  3. Виконайте:
    • Фільтрацію з кількома умовами
    • Агрегацію з groupBy по 2+ стовпцях
    • Віконні функції (rank, row_number)
    • Мінімум 2 типи Join
    • Spark SQL з CTE
  4. Збережіть результати у Parquet
  5. Побудуйте 3+ візуалізації

Дякую за увагу!



Матеріали курсу

ihor.miroshnychenko@knu.ua

Data Mirosh

@ihormiroshnychenko

@aranaur

aranaur.rbind.io