flowchart LR
A["DuckDB<br/>Аналітика на<br/>одній машині"] --> B["Polars<br/>DataFrame<br/>на одній машині"]
B --> C["Apache Spark<br/>Розподілена обробка<br/>на кластері"]
style C fill:#f9b928,stroke:#333,stroke-width:3px
Моделі та методи обробки великих даних
КНУ імені Тараса Шевченка, ФІТ
flowchart LR
A["DuckDB<br/>Аналітика на<br/>одній машині"] --> B["Polars<br/>DataFrame<br/>на одній машині"]
B --> C["Apache Spark<br/>Розподілена обробка<br/>на кластері"]
style C fill:#f9b928,stroke:#333,stroke-width:3px
| DuckDB | Polars | Spark | |
|---|---|---|---|
| Масштаб | 1 машина | 1 машина | Кластер |
| Дані | ~100 GB | ~100 GB | Петабайти |
| Мова ядра | C++ | Rust | Scala / Java |
| API | SQL | Python / Rust | Python / Scala / SQL / R / Java |
| Lazy | Так | Так | Так |
DuckDB / Polars підходять, коли:
Spark потрібен, коли:
Великі дані — це дані, які при зберіганні та обробці не поміщаються в рамках одного комп’ютера або сервера.
Модель 3V:
graph TD
A["3V"] --> B["Volume<br/>Обсяг"]
A --> C["Velocity<br/>Швидкість"]
A --> D["Variety<br/>Різноманітність"]
style A fill:#f9b928,stroke:#333
Приклади: пости соцмереж, замовлення Amazon, пошукові запити Google, транзакції банків, дані IoT-сенсорів.
flowchart LR
A["Великий датасет<br/>10 TB"] --> B["Частина 1<br/>2.5 TB"]
A --> C["Частина 2<br/>2.5 TB"]
A --> D["Частина 3<br/>2.5 TB"]
A --> E["Частина 4<br/>2.5 TB"]
B --> F["Результат 1"]
C --> G["Результат 2"]
D --> H["Результат 3"]
E --> I["Результат 4"]
F --> J["Фінальний<br/>результат"]
G --> J
H --> J
I --> J
style J fill:#d9f6ec,stroke:#28a87d
Принцип:
| Перевага | Опис |
|---|---|
| Масштабованість | Додаємо більше серверів при зростанні даних (горизонтальне масштабування) |
| Паралельність | Задачі виконуються одночасно на багатьох серверах |
| Надійність | Якщо один сервер виходить з ладу — решта продовжують роботу |
Кластер — множина серверів, об’єднаних мережею для координації та взаємодії:
flowchart TD
M["Master Node<br/>(координатор)"]
M --> W1["Worker 1<br/>CPU: 16, RAM: 64 GB"]
M --> W2["Worker 2<br/>CPU: 16, RAM: 64 GB"]
M --> W3["Worker 3<br/>CPU: 16, RAM: 64 GB"]
M --> W4["Worker N<br/>CPU: 16, RAM: 64 GB"]
style M fill:#f9b928,stroke:#333,stroke-width:2px
flowchart LR
subgraph Sub1 ["Резервний Master"]
A1["Active Master"] -.->|failover| A2["Standby Master"]
end
subgraph Sub2 ["Синхронізатор"]
B1["ZooKeeper"] --> B2["Node 1"]
B1 --> B3["Node 2"]
B1 --> B4["Node 3"]
end
subgraph Sub3 ["Без Master"]
C1["Node A"] <--> C2["Node B"]
C2 <--> C3["Node C"]
C3 <--> C1
end
%% Невидимі зв'язки, які змушують блоки стати в один ряд зліва направо
Sub1 ~~~ Sub2 ~~~ Sub3
| Підхід | Приклад | Опис |
|---|---|---|
| Резервний Master | HDFS (NameNode) | Активний + Standby, автоматичний failover |
| Синхронізатор | Apache ZooKeeper | Зовнішній сервіс координації |
| Без Master | Cassandra | Усі вузли рівноправні |
Hadoop — система для зберігання та обробки великих даних, створена у 2006 році.
Основні характеристики:
flowchart TD
H["Hadoop"] --> HDFS["HDFS<br/>Distributed<br/>File System"]
H --> MR["MapReduce<br/>Processing<br/>Engine"]
style H fill:#f9b928,stroke:#333,stroke-width:2px
flowchart TD
NN["NameNode<br/>(метадані, структура файлів)"]
NN --> DN1["DataNode 1<br/>Block A, Block B"]
NN --> DN2["DataNode 2<br/>Block A, Block C"]
NN --> DN3["DataNode 3<br/>Block B, Block C"]
Client["Клієнт"] --> NN
Client -.->|"дані напряму"| DN1
Client -.->|"дані напряму"| DN2
style NN fill:#f9b928,stroke:#333,stroke-width:2px
Важливо
Дані передаються клієнту напряму від DataNode, минаючи NameNode, що знижує навантаження на Master.
flowchart LR
I["Вхідні дані"] --> S["Split"]
S --> M1["Map:<br/>підрахунок<br/>на частині 1"]
S --> M2["Map:<br/>підрахунок<br/>на частині 2"]
S --> M3["Map:<br/>підрахунок<br/>на частині 3"]
M1 --> Sh["Shuffle<br/>& Sort"]
M2 --> Sh
M3 --> Sh
Sh --> R1["Reduce:<br/>об'єднання"]
R1 --> O["Результат"]
style Sh fill:#fde8e8,stroke:#c10000
Приклад: підрахунок слів у великому тексті
Проблеми MapReduce:
Spark вирішує це:
flowchart LR
A["MapReduce<br/>Disk I/O<br/>між фазами"] -->|"до 100x повільніше"| B["Spark<br/>In-Memory<br/>Processing"]
style A fill:#fde8e8,stroke:#c10000
style B fill:#d9f6ec,stroke:#28a87d
Apache Spark — open source фреймворк для розподіленої обробки даних, широко використовуваний у задачах аналізу великих даних.
graph TD
S["Apache Spark"]
S --> P["Python<br/>PySpark"]
S --> SC["Scala"]
S --> J["Java"]
S --> R["R<br/>SparkR"]
S --> SQL["SQL<br/>Spark SQL"]
style S fill:#f9b928,stroke:#333,stroke-width:2px
Користувачі:
Задачі:
Batch vs Streaming:
flowchart TD
subgraph "Batch"
B1["Накопичені дані"] --> B2["Обробка"] --> B3["Результат"]
end
subgraph "Streaming"
S1["Потік подій<br/>(щосекунди)"] --> S2["Micro-batch<br/>обробка"] --> S3["Результат<br/>в реальному часі"]
end
| Варіант | Опис | Приклади |
|---|---|---|
| Hadoop-дистрибутиви | У складі повного Hadoop-стеку | Cloudera CDP, Arenadata ADH |
| Хмарні провайдери | Керований сервіс у хмарі | Databricks, AWS EMR, GCP Dataproc |
| Standalone | Автономний кластер Spark | Для тестування та розробки |
| Kubernetes | Запуск у контейнерах | Продакшн-середовище |
flowchart TD
subgraph "Бібліотеки"
L1["Spark SQL<br/>+ DataFrames"]
L2["Spark<br/>Streaming"]
L3["MLlib<br/>(ML)"]
L4["GraphX<br/>(Графи)"]
end
subgraph "Core API"
CA["Python · Scala · Java · R · SQL"]
end
subgraph "Processing Engine"
E["Spark Core<br/>(управління пам'яттю, оптимізація, серіалізація)"]
end
subgraph "Resource Management"
RM1["Standalone"]
RM2["YARN"]
RM3["Kubernetes"]
RM4["Mesos"]
end
subgraph "Data Storage"
ST1["HDFS"]
ST2["S3 / GCS"]
ST3["PostgreSQL"]
ST4["Kafka"]
end
L1 --> CA
L2 --> CA
L3 --> CA
L4 --> CA
CA --> E
E --> RM1
E --> RM2
E --> RM3
E --> RM4
E --> ST1
E --> ST2
E --> ST3
E --> ST4
style E fill:#f9b928,stroke:#333,stroke-width:2px
| Бібліотека | Призначення | Приклад задачі |
|---|---|---|
| Spark SQL + DataFrames | Робота зі структурованими даними через API або SQL | Агрегації, join’и, ETL |
| Spark Streaming | Обробка даних у реальному часі (micro-batch) | Моніторинг, аналітика подій |
| MLlib | Розподілене навчання ML-моделей | Класифікація, кластеризація на великих даних |
| GraphX | Робота з графовими структурами | Соціальні мережі, PageRank |
Порада
У цьому курсі ми зосередимось на Spark SQL + DataFrames через PySpark.
Spark має конектори до різноманітних сховищ:
Файлові системи:
Формати файлів:
Бази даних (JDBC):
Інші:
flowchart TD
subgraph "Driver (Master)"
D["SparkContext<br/>(SparkSession)"]
D --> P["Планування задач"]
D --> M["Моніторинг"]
end
subgraph "Cluster Manager"
CM["YARN / K8s / Standalone"]
end
subgraph "Worker Node 1"
E1["Executor 1"]
E1 --> T1["Task"]
E1 --> T2["Task"]
E1 --> C1["Cache"]
end
subgraph "Worker Node 2"
E2["Executor 2"]
E2 --> T3["Task"]
E2 --> T4["Task"]
E2 --> C2["Cache"]
end
D --> |"запит ресурсів"| CM
CM --> |"виділення"| E1
CM --> |"виділення"| E2
D --> |"розподіл задач"| E1
D --> |"розподіл задач"| E2
E1 --> |"статус"| D
E2 --> |"статус"| D
style D fill:#f9b928,stroke:#333,stroke-width:2px
| Концепція | Опис |
|---|---|
| Cluster | Група серверів (або ВМ) із встановленим Spark |
| Cluster Manager | Зовнішній сервіс для надання ресурсів (YARN, K8s, Standalone) |
| Driver | Master-процес: перетворює додаток на задачі, розподіляє їх |
| SparkContext | Точка входу для взаємодії з кластером |
| Worker Node | Сервер, на якому запускається Executor |
| Executor | Процес на Worker, що виконує Tasks |
| Task | Мінімальна одиниця роботи: 1 операція над 1 Partition |
| Partition | Мінімальний логічний обсяг даних для обчислень |
| Application | Spark-додаток: Driver + Executors |
flowchart LR
subgraph Sub1 ["Client Mode"]
U1["Ваш ноутбук<br/>(Driver)"] --> |"задачі"| CL1["Кластер<br/>(тільки Executors)"]
end
subgraph Sub2 ["Cluster Mode"]
CL2["Кластер<br/>(Driver + Executors)"]
end
%% Невидимий зв'язок для горизонтального вирівнювання блоків
Sub1 ~~~ Sub2
| Режим | Driver знаходиться | Коли використовувати |
|---|---|---|
| Client Mode | На машині користувача (ноутбук, Jupyter) | Розробка, дебаг, інтерактивна робота |
| Cluster Mode | Усередині кластера | Продакшн, автоматичні пайплайни |
Driver (Master):
Executor (Worker):
sequenceDiagram
Driver->>ClusterMgr: Запит ресурсів
ClusterMgr->>Executor: Виділення
Driver->>Executor: Task
Executor->>Driver: Результат
flowchart TD
APP["Application"] --> J1["Job 1"]
APP --> J2["Job 2"]
J1 --> S1["Stage 1"]
J1 --> S2["Stage 2"]
J1 --> S3["Stage 3"]
S1 --> T1["Task 1.1"]
S1 --> T2["Task 1.2"]
S1 --> T3["Task 1.3"]
S2 --> T4["Task 2.1"]
S2 --> T5["Task 2.2"]
S2 --> T6["Task 2.3"]
S3 --> T7["Task 3.1"]
S3 --> T8["Task 3.2"]
style APP fill:#f9b928,stroke:#333
style S2 fill:#fde8e8,stroke:#c10000
flowchart LR
subgraph "DataFrame: 1M рядків"
P1["Partition 1<br/>5000 рядків"]
P2["Partition 2<br/>5000 рядків"]
P3["Partition 3<br/>5000 рядків"]
PX["...<br/>200 partitions"]
end
P1 --> E1["Executor 1"]
P2 --> E2["Executor 2"]
P3 --> E3["Executor 3"]
PX --> EX["Executor N"]
E1 --> R["Результат"]
E2 --> R
E3 --> R
EX --> R
style R fill:#d9f6ec,stroke:#28a87d
Приклад: таблиця замовлень — 10 колонок, 1 мільйон рядків
Примітка
200 — кількість partitions за замовчуванням у Spark (spark.sql.shuffle.partitions).
flowchart LR
subgraph "Narrow (без Shuffle)"
A1["Partition 1"] --> B1["Partition 1'"]
A2["Partition 2"] --> B2["Partition 2'"]
A3["Partition 3"] --> B3["Partition 3'"]
end
subgraph "Wide (Shuffle!)"
C1["Partition 1"] --> D1["Partition A"]
C1 --> D2["Partition B"]
C2["Partition 2"] --> D1
C2 --> D2
C3["Partition 3"] --> D1
C3 --> D2
end
style D1 fill:#fde8e8,stroke:#c10000
style D2 fill:#fde8e8,stroke:#c10000
| Narrow | Wide | |
|---|---|---|
| Shuffle | Ні | Так |
| Приклади | filter, select, withColumn, map |
groupBy, join, repartition, distinct |
| Швидкість | Швидко | Повільно (мережа + диск) |
flowchart LR
subgraph "Stage 1 (Map)"
M1["Executor 1"]
M2["Executor 2"]
M3["Executor 3"]
end
M1 --> |"мережа"| SH["Shuffle<br/>(серіалізація → мережа → диск → десеріалізація)"]
M2 --> |"мережа"| SH
M3 --> |"мережа"| SH
subgraph "Stage 2 (Reduce)"
SH --> R1["Executor A"]
SH --> R2["Executor B"]
end
style SH fill:#fde8e8,stroke:#c10000,stroke-width:3px
Під час Shuffle:
Попередження
Кожен Shuffle створює новий Stage. Більше Stages = повільніший додаток.
Spark використовує принцип ледачих обчислень — нічого не виконується, доки не викликано Action:
flowchart LR
T1["filter()"] --> T2["select()"] --> T3["groupBy()"]
T3 --> |"Action!"| A["show() / count() / write()"]
T1 -.->|"план"| T2
T2 -.->|"план"| T3
style A fill:#d9f6ec,stroke:#28a87d,stroke-width:2px
Transformations (ледачі):
filter(), select(), withColumn()groupBy(), join()Actions (запускають виконання):
show() — вивести даніcount() — підрахувати рядкиcollect() — зібрати на Driverwrite() — зберегтиtoPandas() — конвертуватиflowchart LR
A["Ваш код<br/>(Transformations)"] --> B["Logical Plan"]
B --> C["Analyzed Plan"]
C --> D["Optimized Plan"]
D --> E["Physical Plans"]
E --> F["Best Plan"]
F --> G["Виконання<br/>на кластері"]
style D fill:#d9f6ec,stroke:#28a87d,stroke-width:2px
style F fill:#d9f6ec,stroke:#28a87d
Catalyst автоматично:
flowchart LR
RDD["RDD<br/>(2011)"] --> DF["DataFrame<br/>(2015)"]
DF --> DS["Dataset<br/>(2016)"]
style DF fill:#d9f6ec,stroke:#28a87d,stroke-width:2px
| RDD | DataFrame | Dataset | |
|---|---|---|---|
| Рівень API | Низький | Високий | Високий |
| Оптимізація Catalyst | Ні | Так | Так |
| Типізація | Ні | Частково | Повна (Scala/Java) |
| Python | Повільний | Швидкий | Недоступний |
| Використання | Рідко | Основний | Scala/Java |
Порада
У PySpark завжди використовуйте DataFrame — це основна абстракція для роботи з даними.
SparkSession — центральний компонент для розробки Spark-додатків:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("MyApp") # назва додатку
.master("local[*]") # URL кластера
.config("spark.executor.memory", "4g") # пам'ять executor
.config("spark.executor.cores", "4") # ядра executor
.config("spark.driver.memory", "2g") # пам'ять driver
.config("spark.jars", "path/to/connector.jar") # додаткові JAR
.getOrCreate() # створити нову або повернути існуючу
)Примітка
getOrCreate() — створює нову сесію або повертає існуючу. Один JVM-процес = одна SparkSession.
| Параметр | Опис | Приклад |
|---|---|---|
appName |
Назва додатку (для WebUI) | "ETL Pipeline" |
master |
URL кластера | local[*], yarn, k8s://... |
spark.executor.memory |
Пам’ять на Executor | "4g", "8g" |
spark.executor.cores |
Ядра на Executor | "4" |
spark.driver.memory |
Пам’ять Driver | "2g" |
spark.sql.shuffle.partitions |
К-сть partitions при Shuffle | 200 (за замовч.) |
spark.jars |
Додаткові JAR-файли | "/path/to/connector.jar" |
flowchart TD
DF["DataFrame"]
DF --> A["З колекцій Python<br/>(списки, кортежі, словники)"]
DF --> B["З Pandas DataFrame"]
DF --> C["З файлів<br/>(CSV, Parquet, JSON)"]
DF --> D["З баз даних<br/>(JDBC)"]
DF --> E["З SQL-запиту"]
style DF fill:#f9b928,stroke:#333,stroke-width:2px
+-----+----------+
| city|population|
+-----+----------+
| Київ| 2967000|
|Львів| 717000|
|Одеса| 1015000|
+-----+----------+
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("department", StringType(), nullable=True),
StructField("salary", IntegerType(), nullable=False),
StructField("start_date", DateType(), nullable=True),
])
data = [
("Олена", "DS", 45000, date(2022, 3, 15)),
("Андрій", "Eng", 52000, date(2021, 7, 1)),
("Марія", "DS", 48000, date(2023, 1, 10)),
]
df = spark.createDataFrame(data, schema=schema)
df.printSchema()root
|-- name: string (nullable = false)
|-- department: string (nullable = true)
|-- salary: integer (nullable = false)
|-- start_date: date (nullable = true)
graph TD
A["Spark Types"] --> B["Прості"]
A --> C["Складні"]
B --> B1["StringType"]
B --> B2["IntegerType / LongType"]
B --> B3["FloatType / DoubleType"]
B --> B4["BooleanType"]
B --> B5["DateType / TimestampType"]
B --> B6["BinaryType / ByteType"]
C --> C1["ArrayType<br/>(списки)"]
C --> C2["MapType<br/>(словники)"]
C --> C3["StructType<br/>(вкладені записи)"]
style A fill:#f9b928,stroke:#333
# Складні типи
complex_schema = StructType([
StructField("name", StringType()),
StructField("skills", ArrayType(StringType())),
StructField("scores", MapType(StringType(), IntegerType())),
])
df_complex = spark.createDataFrame([
("Олена", ["Python", "SQL"], {"math": 95, "code": 90}),
("Андрій", ["Scala", "Spark", "Java"], {"math": 88, "code": 95}),
], schema=complex_schema)
df_complex.show(truncate=False)+------+--------------------+------------------------+
|name |skills |scores |
+------+--------------------+------------------------+
|Олена |[Python, SQL] |{code -> 90, math -> 95}|
|Андрій|[Scala, Spark, Java]|{code -> 95, math -> 88}|
+------+--------------------+------------------------+
+-------+-------+--------+
|product| price|in_stock|
+-------+-------+--------+
|Ноутбук|25000.0| true|
|Телефон|15000.0| true|
|Планшет|12000.0| false|
+-------+-------+--------+
<class 'pandas.DataFrame'>
| product | price | in_stock | |
|---|---|---|---|
| 0 | Ноутбук | 25000.0 | True |
| 1 | Телефон | 15000.0 | True |
| 2 | Планшет | 12000.0 | False |
Попередження
toPandas() збирає ВСІ дані на Driver в оперативну пам’ять однієї машини! Для великих датасетів — OutOfMemory. Використовуйте тільки для невеликих результатів (агрегації, візуалізація).
# Створимо тестові дані та збережемо у різні формати
test_data = spark.createDataFrame([
("Олена", "DS", 45000), ("Андрій", "Eng", 52000),
("Марія", "DS", 48000), ("Петро", "Eng", 55000),
], schema=["name", "department", "salary"])
test_data.write.mode("overwrite").csv("spark_demo/csv", header=True)
test_data.write.mode("overwrite").parquet("spark_demo/parquet")
test_data.write.mode("overwrite").json("spark_demo/json")# Читання CSV
df_csv = spark.read.csv("spark_demo/csv", header=True, inferSchema=True)
df_csv.show()
# +------+----------+------+
# | name|department|salary|
# +------+----------+------+
# | Олена| DS| 45000|
# |Андрій| Eng| 52000|
# | Марія| DS| 48000|
# | Петро| Eng| 55000|
# +------+----------+------+| Формат | Стиснення | Схема | Швидкість | Рекомендація |
|---|---|---|---|---|
| CSV | Немає | Ні | Повільно | Тільки імпорт |
| JSON | Немає | Частково | Повільно | Напівструктуровані |
| Parquet | Стовпчасте | Так | Швидко | Основний формат |
| ORC | Стовпчасте | Так | Швидко | Hive-сумісність |
+------+------+
| name|salary|
+------+------+
| Олена| 45000|
|Андрій| 52000|
| Марія| 48000|
| Петро| 55000|
| Ірина| 42000|
|Сергій| 51000|
+------+------+
+------+------+-----------------+
| name|salary| salary_raised|
+------+------+-----------------+
| Олена| 45000|49500.00000000001|
|Андрій| 52000|57200.00000000001|
| Марія| 48000|52800.00000000001|
| Петро| 55000|60500.00000000001|
| Ірина| 42000|46200.00000000001|
|Сергій| 51000|56100.00000000001|
+------+------+-----------------+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
|Андрій| Eng| 52000| 32|
| Марія| DS| 48000| 25|
| Петро| Eng| 55000| 35|
|Сергій| DS| 51000| 31|
+------+----------+------+---+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
| Марія| DS| 48000| 25|
|Сергій| DS| 51000| 31|
+------+----------+------+---+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
| Марія| DS| 48000| 25|
|Сергій| DS| 51000| 31|
+------+----------+------+---+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
| Олена| DS| 45000| 28|
|Андрій| Eng| 52000| 32|
| Марія| DS| 48000| 25|
| Петро| Eng| 55000| 35|
|Сергій| DS| 51000| 31|
+------+----------+------+---+
Примітка
filter та where — повні синоніми. Комбінуючи умови, кожну треба брати в дужки: (A) & (B), (A) | (B), ~(A).
+------+----------+------+---+--------+---------+------+
| name|department|salary|age|salary_k|is_senior| level|
+------+----------+------+---+--------+---------+------+
| Олена| DS| 45000| 28| 45.0| false|Middle|
|Андрій| Eng| 52000| 32| 52.0| true|Senior|
| Марія| DS| 48000| 25| 48.0| false|Middle|
| Петро| Eng| 55000| 35| 55.0| true|Senior|
| Ірина| Mkt| 42000| 29| 42.0| false|Junior|
|Сергій| DS| 51000| 31| 51.0| true|Middle|
+------+----------+------+---+--------+---------+------+
+------+-------+----+
| name|country|year|
+------+-------+----+
| Олена|Україна|2025|
|Андрій|Україна|2025|
| Марія|Україна|2025|
| Петро|Україна|2025|
| Ірина|Україна|2025|
|Сергій|Україна|2025|
+------+-------+----+
+------+-------------+----------+
| name|salary_double|salary_str|
+------+-------------+----------+
| Олена| 45000.0| 45000|
|Андрій| 52000.0| 52000|
| Марія| 48000.0| 48000|
| Петро| 55000.0| 55000|
| Ірина| 42000.0| 42000|
|Сергій| 51000.0| 51000|
+------+-------------+----------+
F.lit(value) — створює стовпець-константу.cast(type) — змінює тип стовпця (аналог .cast() у Polars)df_strings = spark.createDataFrame([
(" Олена Петрівна ", "olena@example.com", "380-44-1234567", "Київ, Україна"),
("андрій іванович", "ANDRIY@EXAMPLE.COM", "380-32-9876543", "Львів, Україна"),
("МАРІЯ СЕРГІЇВНА", "maria@Example.Com", "380-48-1112233", "Одеса, Україна"),
], schema=["full_name", "email", "phone", "location"])+---------------+------------------+------------------+------------------+---+
|trimmed |upper |email_lower |initcap |len|
+---------------+------------------+------------------+------------------+---+
|Олена Петрівна | ОЛЕНА ПЕТРІВНА |olena@example.com | Олена Петрівна |18 |
|андрій іванович|АНДРІЙ ІВАНОВИЧ |andriy@example.com|Андрій Іванович |15 |
|МАРІЯ СЕРГІЇВНА|МАРІЯ СЕРГІЇВНА |maria@example.com |Марія Сергіївна |15 |
+---------------+------------------+------------------+------------------+---+
df_strings.select(
F.substring(F.col("phone"), 1, 3).alias("code"),
F.split(F.col("location"), ", ").getItem(0).alias("city"),
F.concat(F.lit("+"), F.col("phone")).alias("full"),
F.regexp_replace(F.col("phone"), "-", "").alias("clean"),
F.regexp_extract(F.col("email"), r"@(.+)", 1).alias("domain"),
).show(truncate=False)+----+-----+---------------+------------+-----------+
|code|city |full |clean |domain |
+----+-----+---------------+------------+-----------+
|380 |Київ |+380-44-1234567|380441234567|example.com|
|380 |Львів|+380-32-9876543|380329876543|EXAMPLE.COM|
|380 |Одеса|+380-48-1112233|380481112233|Example.Com|
+----+-----+---------------+------------+-----------+
df_dates.select(
F.to_date("start").alias("date"),
F.year(F.to_date("start")).alias("year"),
F.month(F.to_date("start")).alias("month"),
F.dayofmonth(F.to_date("start")).alias("day"),
F.dayofweek(F.to_date("start")).alias("weekday"),
F.datediff(F.to_date("end"), F.to_date("start")).alias("days"),
F.add_months(F.to_date("start"), 3).alias("plus_3m"),
F.date_format(F.to_date("start"), "dd.MM.yyyy").alias("fmt"),
F.months_between(F.to_date("end"), F.to_date("start")).alias("months"),
).show(truncate=False)+----------+----+-----+---+-------+----+----------+----------+-----------+
|date |year|month|day|weekday|days|plus_3m |fmt |months |
+----------+----+-----+---+-------+----+----------+----------+-----------+
|2024-03-15|2024|3 |15 |6 |97 |2024-06-15|15.03.2024|3.16129032 |
|2023-11-01|2023|11 |1 |4 |75 |2024-02-01|01.11.2023|2.4516129 |
|2024-01-20|2024|1 |20 |7 |346 |2024-04-20|20.01.2024|11.35483871|
+----------+----+-----+---+-------+----+----------+----------+-----------+
+---------+-----------------+------------------+--------+---------+
| product| total_revenue| avg_order|n_orders|max_order|
+---------+-----------------+------------------+--------+---------+
| Планшет| 541217.97|2027.0335955056178| 267| 12699.88|
| Телефон|530014.0300000004|2015.2624714828912| 263| 13026.35|
| Ноутбук|448507.4899999998| 2057.373807339449| 218| 11576.76|
|Навушники|447316.2600000003| 1775.064523809525| 252| 10369.71|
+---------+-----------------+------------------+--------+---------+
Агрегатні функції: sum, avg / mean, count, min, max, collect_list, collect_set, countDistinct, stddev, variance
+---------+------+----------+---------+----------------+
| product|region|avg_amount|total_qty|unique_customers|
+---------+------+----------+---------+----------------+
|Навушники| Одеса| 2154.93| 241| 49|
|Навушники| Київ| 1787.34| 224| 45|
|Навушники|Харків| 1682.48| 233| 43|
|Навушники|Дніпро| 1674.71| 277| 56|
|Навушники| Львів| 1566.18| 264| 47|
| Ноутбук| Одеса| 2538.05| 207| 34|
| Ноутбук| Львів| 2003.79| 250| 48|
| Ноутбук|Харків| 1979.31| 196| 37|
| Ноутбук| Київ| 1966.22| 226| 46|
| Ноутбук|Дніпро| 1905.6| 224| 41|
| Планшет| Одеса| 2232.79| 216| 46|
| Планшет| Львів| 2166.28| 268| 50|
| Планшет| Київ| 2125.65| 221| 48|
| Планшет|Дніпро| 1941.08| 295| 57|
| Планшет|Харків| 1722.12| 248| 49|
| Телефон| Львів| 2416.02| 202| 38|
| Телефон| Київ| 2096.44| 260| 53|
| Телефон|Дніпро| 1957.42| 302| 57|
| Телефон| Одеса| 1942.78| 236| 47|
| Телефон|Харків| 1759.45| 274| 52|
+---------+------+----------+---------+----------------+
+------------+-----------------+-----------------+
|total_orders| total_revenue| avg_order|
+------------+-----------------+-----------------+
| 1000|1967055.750000003|1967.055750000003|
+------------+-----------------+-----------------+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
| Петро| Eng| 55000| 35|
|Андрій| Eng| 52000| 32|
|Сергій| DS| 51000| 31|
| Марія| DS| 48000| 25|
| Олена| DS| 45000| 28|
| Ірина| Mkt| 42000| 29|
+------+----------+------+---+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
|Сергій| DS| 51000| 31|
| Марія| DS| 48000| 25|
| Олена| DS| 45000| 28|
| Петро| Eng| 55000| 35|
|Андрій| Eng| 52000| 32|
| Ірина| Mkt| 42000| 29|
+------+----------+------+---+
+------+----------+------+---+
| name|department|salary|age|
+------+----------+------+---+
| Петро| Eng| 55000| 35|
|Андрій| Eng| 52000| 32|
|Сергій| DS| 51000| 31|
| Марія| DS| 48000| 25|
| Олена| DS| 45000| 28|
+------+----------+------+---+
Примітка
orderBy + limit(N) — Spark оптимізує це і не сортує весь DataFrame.
+---+------+-----+--------+-----------+------+
| id| name| city|order_id|customer_id|amount|
+---+------+-----+--------+-----------+------+
| 2|Андрій|Львів| 103| 2| 800.0|
| 2|Андрій|Львів| 102| 2|2300.0|
| 1| Олена| Київ| 101| 1|1500.0|
+---+------+-----+--------+-----------+------+
| Тип Join | Опис | SQL |
|---|---|---|
inner |
Лише збіги з обох таблиць | INNER JOIN |
left |
Усі з лівої + збіги з правої | LEFT JOIN |
right |
Усі з правої + збіги з лівої | RIGHT JOIN |
full / outer |
Усі з обох таблиць | FULL OUTER JOIN |
anti |
З лівої, що НЕМАЄ в правій | NOT EXISTS |
semi |
З лівої, що Є в правій (без стовпців правої) | EXISTS |
cross |
Декартів добуток | CROSS JOIN |
+---+------+-----+--------+-----------+------+
| id| name| city|order_id|customer_id|amount|
+---+------+-----+--------+-----------+------+
| 2|Андрій|Львів| 103| 2| 800.0|
| 2|Андрій|Львів| 102| 2|2300.0|
| 1| Олена| Київ| 101| 1|1500.0|
+---+------+-----+--------+-----------+------+
+---+------+------+--------+-----------+------+
| id| name| city|order_id|customer_id|amount|
+---+------+------+--------+-----------+------+
| 2|Андрій| Львів| 103| 2| 800.0|
| 2|Андрій| Львів| 102| 2|2300.0|
| 4| Петро|Харків| NULL| NULL| NULL|
| 1| Олена| Київ| 101| 1|1500.0|
| 3| Марія| Одеса| NULL| NULL| NULL|
+---+------+------+--------+-----------+------+
+----------+----------+-----------+
|department|avg_salary|n_employees|
+----------+----------+-----------+
| Eng| 53500.0| 2|
| DS| 49500.0| 2|
+----------+----------+-----------+
Порада
DataFrame API та Spark SQL — взаємозамінні та мають однакову продуктивність (обидва оптимізуються Catalyst).
spark.sql("""
WITH customer_stats AS (
SELECT customer_id,
COUNT(*) AS n_orders,
ROUND(SUM(amount)) AS total_spent
FROM orders
GROUP BY customer_id
),
ranked AS (
SELECT *,
RANK() OVER (ORDER BY total_spent DESC) AS spending_rank
FROM customer_stats
)
SELECT *
FROM ranked
WHERE spending_rank <= 10
""").show()+-----------+--------+-----------+-------------+
|customer_id|n_orders|total_spent|spending_rank|
+-----------+--------+-----------+-------------+
| 146| 9| 29600.0| 1|
| 248| 4| 25021.0| 2|
| 232| 3| 16523.0| 3|
| 361| 7| 16478.0| 4|
| 100| 3| 16408.0| 5|
| 141| 2| 16395.0| 6|
| 466| 3| 16251.0| 7|
| 98| 4| 15748.0| 8|
| 205| 3| 14285.0| 9|
| 211| 6| 14254.0| 10|
+-----------+--------+-----------+-------------+
WITH) — покрокове побудова запитуOVER) — обчислення в контексті групи+----------+----------+-----+
|department|avg_salary|count|
+----------+----------+-----+
| Eng| 53500.0| 2|
| DS| 49500.0| 2|
+----------+----------+-----+
| Polars | PySpark | Коментар |
|---|---|---|
pl.col("x") |
F.col("x") |
Звернення до стовпця |
.filter() |
.filter() / .where() |
Ідентично |
.select() |
.select() |
Ідентично |
.with_columns() |
.withColumn() |
По одному стовпцю |
.group_by().agg() |
.groupBy().agg() |
camelCase |
.sort() |
.orderBy() / .sort() |
Синоніми |
pl.when().then().otherwise() |
F.when().otherwise() |
Без .then() |
.alias() |
.alias() |
Ідентично |
.collect() |
.show() / .collect() / .toPandas() |
Різні Actions |
.head(n) |
.limit(n) / .show(n) |
Різні назви |
pl.len() |
F.count("*") |
Кількість рядків |
.n_unique() |
F.countDistinct() |
Унікальні |
.over("group") |
.over(Window.partitionBy("group")) |
Віконні функції |
+------+----------+------+---+----+-------+--------+
| name|department|salary|age|rank|row_num|dept_avg|
+------+----------+------+---+----+-------+--------+
|Сергій| DS| 51000| 31| 1| 1| 48000.0|
| Марія| DS| 48000| 25| 2| 2| 48000.0|
| Олена| DS| 45000| 28| 3| 3| 48000.0|
| Петро| Eng| 55000| 35| 1| 1| 53500.0|
|Андрій| Eng| 52000| 32| 2| 2| 53500.0|
| Ірина| Mkt| 42000| 29| 1| 1| 42000.0|
+------+----------+------+---+----+-------+--------+
w = Window.partitionBy("product").orderBy("order_date")
orders_df.withColumn(
"running_total", F.sum("amount").over(w) # кумулятивна сума
).withColumn(
"prev_amount", F.lag("amount", 1).over(w) # попереднє значення
).withColumn(
"next_amount", F.lead("amount", 1).over(w) # наступне значення
).withColumn(
"moving_avg", F.avg("amount").over(
w.rowsBetween(-2, 0) # ковзне середнє (3 рядки)
)
).show()+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
|order_id|product| amount|order_date| running_total|prev_amount|next_amount| moving_avg|
+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
| 85|Ноутбук| 341.97|2024-01-22| 341.97| NULL| 230.15| 341.97|
| 81|Ноутбук| 230.15|2024-01-30| 572.12| 341.97| 235.18| 286.06|
| 29|Ноутбук| 235.18|2024-02-28| 807.3| 230.15| 0.48|269.09999999999997|
| 43|Ноутбук| 0.48|2024-03-10| 807.78| 235.18| 98.12| 155.27|
| 66|Ноутбук| 98.12|2024-03-13| 905.9| 0.48| 1238.09|111.25999999999999|
| 27|Ноутбук|1238.09|2024-04-09| 2143.99| 98.12| 437.8| 445.5633333333333|
| 98|Ноутбук| 437.8|2024-06-03| 2581.79| 1238.09| 753.81| 591.3366666666667|
| 2|Ноутбук| 753.81|2024-06-15| 3335.6| 437.8| 415.26| 809.9|
| 46|Ноутбук| 415.26|2024-06-17|3750.8599999999997| 753.81| 416.51| 535.6233333333333|
| 35|Ноутбук| 416.51|2024-07-06| 4167.37| 415.26| 1649.51| 528.5266666666666|
| 84|Ноутбук|1649.51|2024-07-08| 5816.88| 416.51| 405.34| 827.0933333333332|
| 83|Ноутбук| 405.34|2024-07-14| 6222.22| 1649.51| 47.81| 823.7866666666667|
| 59|Ноутбук| 47.81|2024-07-20| 6270.030000000001| 405.34| 423.51| 700.8866666666667|
| 64|Ноутбук| 423.51|2024-07-25| 6693.540000000001| 47.81| 608.42|292.21999999999997|
| 11|Ноутбук| 608.42|2024-07-28| 7301.960000000001| 423.51| 187.64|359.91333333333336|
| 61|Ноутбук| 187.64|2024-09-08| 7489.600000000001| 608.42| 151.83|406.52333333333326|
| 45|Ноутбук| 151.83|2024-09-19| 7641.430000000001| 187.64| 595.88| 315.9633333333333|
| 91|Ноутбук| 595.88|2024-09-28| 8237.310000000001| 151.83| 171.45|311.78333333333336|
| 75|Ноутбук| 171.45|2024-10-01| 8408.760000000002| 595.88| 2214.79| 306.3866666666667|
| 72|Ноутбук|2214.79|2024-10-04|10623.550000000003| 171.45| 1672.17| 994.04|
+--------+-------+-------+----------+------------------+-----------+-----------+------------------+
only showing top 20 rows
| Функція | Опис | SQL |
|---|---|---|
rank() |
Ранг з пропусками | RANK() |
row_number() |
Порядковий номер | ROW_NUMBER() |
dense_rank() |
Ранг без пропусків | DENSE_RANK() |
lag(col, n) |
Значення n рядків тому | LAG(col, n) |
lead(col, n) |
Значення через n рядків | LEAD(col, n) |
sum().over() |
Кумулятивна сума | SUM() OVER() |
uv add pyspark