# DataFrame Operations

### Transformations

Transformations are lazy, they build a query plan without executing. Execution is triggered only when an action is called.

### Selection and Filtering

```python
# Select columns
df.select("name", "age", "city")
df.select(col("name"), col("age") + 1)

# Select with SQL expressions
df.selectExpr("name", "age + 1 as age_plus_one", "UPPER(city) as city_upper")

# Filter rows
df.filter(col("age") > 21)
df.where(col("status") == "active")
df.filter("age > 21 AND status = 'active'")
```

### Joins

```python
# Inner join
df1.join(df2, df1["id"] == df2["id"], "inner")

# Left outer join
df1.join(df2, "id", "left")

# Cross join
df1.crossJoin(df2)
```

Supported join types: `inner`, `left`, `right`, `full`, `cross`, `left_semi`, `left_anti`.

### Grouping and Aggregation

```python
# Group by with aggregation
df.groupBy("department").agg(
    count("*").alias("headcount"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
)

# Shorthand aggregations
df.groupBy("department").count()
df.groupBy("department").sum("salary")
df.groupBy("department").avg("salary")

# Pivot
df.groupBy("year").pivot("quarter").sum("revenue")
```

### Sorting

```python
# Ascending (default)
df.orderBy("name")

# Descending
df.orderBy(col("salary").desc())

# Multiple columns
df.sort(col("department").asc(), col("salary").desc())
```

### Set Operations

```python
df1.union(df2)           # Union all
df1.unionByName(df2)     # Union matching by column name
df1.intersect(df2)       # Intersection
df1.exceptAll(df2)       # Difference
```

### Column Operations

```python
# Add or replace a column
df.withColumn("full_name", concat(col("first"), lit(" "), col("last")))

# Rename a column
df.withColumnRenamed("old_name", "new_name")

# Drop columns
df.drop("temp_col", "debug_col")

# Distinct rows
df.distinct()

# Limit
df.limit(100)
```

### All Transformations

| Method                          | Description                  |
| ------------------------------- | ---------------------------- |
| `select(*cols)`                 | Select columns / expressions |
| `selectExpr(*exprs)`            | Select using SQL expressions |
| `filter(condition)` / `where()` | Filter rows                  |
| `join(other, on, how)`          | Join DataFrames              |
| `crossJoin(other)`              | Cross join                   |
| `groupBy(*cols)`                | Group for aggregation        |
| `orderBy(*cols)` / `sort()`     | Sort rows                    |
| `limit(n)`                      | Limit to first n rows        |
| `distinct()`                    | Remove duplicate rows        |
| `union(other)`                  | Union (all)                  |
| `unionByName(other)`            | Union matching by name       |
| `intersect(other)`              | Set intersection             |
| `exceptAll(other)`              | Set difference               |
| `withColumn(name, col)`         | Add / replace a column       |
| `withColumnRenamed(old, new)`   | Rename a column              |
| `drop(*cols)`                   | Drop columns                 |
| `cache()` / `persist()`         | Cache hint (pass-through)    |
| `coalesce(n)`                   | Repartition hint             |

### Actions

Actions trigger query execution on e6data and return results.

| Method            | Description                           |
| ----------------- | ------------------------------------- |
| `collect()`       | Return all rows as a list of Rows     |
| `count()`         | Return the total row count            |
| `show(n)`         | Print the first n rows (default 20)   |
| `first()`         | Return the first row                  |
| `head(n)`         | Return the first n rows               |
| `take(n)`         | Return the first n rows as a list     |
| `toPandas()`      | Convert results to a Pandas DataFrame |
| `explain()`       | Print the generated SQL query         |
| `describe(*cols)` | Compute summary statistics            |

#### Examples

```python
# Collect all rows
rows = df.collect()

# Show first 10 rows
df.show(10)

# Count
total = df.count()

# Convert to Pandas
pdf = df.toPandas()

# View generated SQL
df.explain()
```

### Temporary Views

Register a DataFrame as a temporary view to query it with SQL.

```python
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT department, COUNT(*) FROM employees GROUP BY department")

# Global temp view (accessible across sessions)
df.createGlobalTempView("global_employees")
```

### Read Operations

Use `spark.read` to load data from various file formats.

```python
# Parquet
df = spark.read.parquet("s3://bucket/data.parquet")

# CSV with options
df = spark.read.format("csv") 
    .option("header", True) 
    .option("inferSchema", True) 
    .load("s3://bucket/data.csv")

# ORC
df = spark.read.orc("s3://bucket/data.orc")

# JSON
df = spark.read.json("s3://bucket/data.json")

# GeoParquet (requires spatial extras)
df = spark.read.format("geoparquet").load("s3://bucket/geo.parquet")

# Delta
df = spark.read.format("delta").load("s3://bucket/delta-table/")
```

### Supported Read Formats

| Format     | Method                                       |
| ---------- | -------------------------------------------- |
| Parquet    | `spark.read.parquet(path)`                   |
| ORC        | `spark.read.orc(path)`                       |
| CSV        | `spark.read.csv(path)`                       |
| JSON       | `spark.read.json(path)`                      |
| GeoParquet | `spark.read.format("geoparquet").load(path)` |
| GeoJSON    | `spark.read.format("geojson").load(path)`    |
| Delta      | `spark.read.format("delta").load(path)`      |
| Text       | `spark.read.text(path)`                      |

### Write Operations

Use `df.write` to save results.

```python
# Parquet with overwrite
df.write.mode("overwrite").parquet("s3://bucket/output/")

# CSV with options
df.write.mode("append") 
    .option("header", True) 
    .csv("s3://bucket/output/")

# Partitioned output
df.write.partitionBy("year", "month").parquet("s3://bucket/output/")

# Insert into existing table
df.write.insertInto("target_table")

# Save as new table
df.write.saveAsTable("new_table")
```

### Write Modes

| Mode        | Behavior                                        |
| ----------- | ----------------------------------------------- |
| `error`     | Throw an error if data already exists (default) |
| `append`    | Append to existing data                         |
| `overwrite` | Overwrite existing data                         |
| `ignore`    | Silently skip if data already exists            |
