# Code samples

End-to-end code samples demonstrating common PySpark workflows on e6data.

### Basic Analytics Pipeline

```python
from e6_spark_compat import SparkSession
from e6_spark_compat.sql.functions import col, count, sum, avg, upper

spark = (SparkSession.builder
    .appName("SalesAnalytics")
    .config("spark.e6data.host", "<cluster-host>")
    .config("spark.e6data.username", "<username>")
    .config("spark.e6data.password", "<access-token>")
    .config("spark.e6data.database", "sales_db")
    .config("spark.e6data.catalog", "main")
    .config("spark.e6data.cluster", "<cluster-name>")
    .config("spark.e6data.secure", True)
    .getOrCreate())

# Read and analyze
orders = spark.read.parquet("s3://data-lake/orders/")

summary = (orders
    .filter(col("status") == "completed")
    .groupBy("region")
    .agg(
        count("*").alias("order_count"),
        sum("amount").alias("total_revenue"),
        avg("amount").alias("avg_order_value")
    )
    .orderBy(col("total_revenue").desc()))

summary.show()
spark.stop()
```

### Window Functions: Running Totals and Rankings

```python
from e6_spark_compat.sql.functions import col, row_number, sum, lag
from e6_spark_compat.sql.window import Window

employees = spark.read.parquet("s3://data-lake/employees/")

# Rank employees by salary within each department
rank_window = Window.partitionBy("department").orderBy(col("salary").desc())

# Running total of salaries
running_window = (Window.partitionBy("department")
    .orderBy("hire_date")
    .rowsBetween(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW))

result = employees.select(
    "name", "department", "salary", "hire_date",
    row_number().over(rank_window).alias("salary_rank"),
    lag("salary", 1).over(rank_window).alias("prev_salary"),
    sum("salary").over(running_window).alias("cumulative_salary")
)

result.show()
```

### Multi-Table Join

```python
from e6_spark_compat.sql.functions import col, count, sum

orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")
products = spark.read.parquet("s3://data-lake/products/")

# Three-way join
result = (orders
    .join(customers, orders["customer_id"] == customers["id"], "inner")
    .join(products, orders["product_id"] == products["id"], "inner")
    .groupBy("customers.name", "products.category")
    .agg(
        count("*").alias("order_count"),
        sum("orders.amount").alias("total_spent")
    )
    .orderBy(col("total_spent").desc()))

result.show(20)
```

### CASE WHEN Logic

```python
from e6_spark_compat.sql.functions import col, when, count

transactions = spark.read.parquet("s3://data-lake/transactions/")

categorized = transactions.select(
    "*",
    when(col("amount") >= 1000, "high")
        .when(col("amount") >= 100, "medium")
        .otherwise("low")
        .alias("value_tier")
)

tier_summary = (categorized.groupBy("value_tier")
    .agg(count("*").alias("count"))
    .orderBy("value_tier"))

tier_summary.show()
```

### Spatial Analysis: Points in Polygons

```python
from e6_spark_compat.sedona import SedonaRegistrator
from e6_spark_compat.sql.functions import expr, col, count

SedonaRegistrator.registerAll(spark)

stores = spark.read.parquet("s3://data-lake/stores/")
regions = spark.read.parquet("s3://data-lake/regions/")

# Find which region each store belongs to
store_regions = stores.join(
    regions,
    expr("ST_Contains(regions.boundary, ST_Point(stores.longitude, stores.latitude))"),
    "inner"
)

# Count stores per region
(store_regions.groupBy("region_name")
    .agg(count("*").alias("store_count"))
    .orderBy(col("store_count").desc())
    .show())
```

### Working with Temp Views and SQL

```python
orders = spark.read.parquet("s3://data-lake/orders/")
customers = spark.read.parquet("s3://data-lake/customers/")

orders.createOrReplaceTempView("orders")
customers.createOrReplaceTempView("customers")

result = spark.sql("""
    SELECT c.name, COUNT(o.id) as order_count, SUM(o.amount) as total
    FROM orders o
    JOIN customers c ON o.customer_id = c.id
    WHERE o.status = 'completed'
    GROUP BY c.name
    ORDER BY total DESC
    LIMIT 10
""")

result.show()
```

### Export to Pandas

```python
from e6_spark_compat.sql.functions import col, avg

# Query and convert to Pandas for local analysis or visualization
monthly_avg = (orders
    .groupBy("month")
    .agg(avg("amount").alias("avg_amount"))
    .orderBy("month"))

pdf = monthly_avg.toPandas()

# Use with matplotlib, seaborn, etc.
print(pdf)
```

### Writing Results

```python
from e6_spark_compat.sql.functions import col, sum

# Aggregate and write back
daily_totals = (orders
    .groupBy("order_date", "region")
    .agg(sum("amount").alias("daily_total")))

# Write as partitioned Parquet
(daily_totals.write
    .mode("overwrite")
    .partitionBy("region")
    .parquet("s3://data-lake/daily-totals/"))
```
