●
Reading a CSV
File: spark.read.csv("path/to/file.csv")
●
Reading a JSON File: spark.read.json("path/to/file.json")
●
Reading a Parquet File: spark.read.parquet("path/to/file.parquet")
●
Writing
DataFrame to CSV: df.write.csv("path/to/output")
●
Writing
DataFrame to JSON: df.write.json("path/to/output")
●
Writing
DataFrame to Parquet: df.write.parquet("path/to/output")
●
Show
DataFrame: df.show()
●
Print Schema: df.printSchema()
●
Column Selection: df.select("column1", "column2")
●
Filtering
Rows: df.filter(df["column"] > value)
●
Adding
a New Column: df.withColumn("newColumn", expression)
●
Renaming a Column: df.withColumnRenamed("oldName", "newName")
●
Dropping
a Column: df.drop("column")
●
Describe
Data: df.describe().show()
●
Counting Rows: df.count()
●
Aggregating
Data: df.groupBy("column").agg({"column": "sum"})
●
Sorting Data: df.sort("column")
●
Distinct
Values: df.select("column").distinct()
●
Sample
Data: df.sample(withReplacement, fraction, seed)
● Union DataFrames: df1.union(df2)
● Pivot Table : df.groupBy("column").pivot("pivot_column").agg(count("another_column"))
● Window Functions: from pyspark.sql import Window; windowSpec = Window.partitionBy("column")
●
DataFrame
to RDD
Conversion: df.rdd
●
RDD to DataFrame Conversion: rdd.toDF()
●
Caching a DataFrame: df.cache()
●
Uncaching
a DataFrame: df.unpersist()
● Collecting Data to Driver: df.collect()
● Broadcast Join: from pyspark.sql.functions import broadcast; df1.join(broadcast(df2), "column")
●
Filling
Null Values: df.na.fill(value)
●
Dropping
Duplicate Rows: df.dropDuplicates()
●
Replacing Values: df.na.replace(["old_value"],
["new_value"])
● String Operations: from pyspark.sql.functions import lower, upper;
df.select(upper(df["column"]))
● Date and Time Functions: from pyspark.sql.functions import current_date, current_timestamp; df.select(current_date())
● Numeric Functions: from pyspark.sql.functions import abs, sqrt;
df.select(abs(df["column"]))
● Conditional Expressions: from pyspark.sql.functions import when; df.select(when(df["column"] > value, "true").otherwise("false"))
●
Type Casting: df.withColumn("column",
df["column"].cast("new_type"))
●
Explode
Function (Array to Rows): from
pyspark.sql.functions import
explode; df.withColumn("exploded_column", explode(df["array_column"]))
●
Pandas UDF: from pyspark.sql.functions import pandas_udf;
@pandas_udf("return_type") def pandas_function(col1, col2): return operation
● Aggregating with Custom Functions: df.groupBy("column").agg(custom_agg_function(df["another_column"]))
● Window Functions (Rank, Lead, Lag): from pyspark.sql.functions import rank, lead, lag; windowSpec = Window.orderBy("column"); df.withColumn("rank", rank().over(windowSpec))
● Handling JSON Columns: from pyspark.sql.functions import from_json, schema_of_json; df.withColumn("parsed_json", from_json(df["json_column"], schema_of_json))
●
Summary
Statistics for Numeric Columns: df.describe()
● Correlation Between Columns: df.stat.corr("column1", "column2")
● Crosstabulation and Contingency Tables: df.stat.crosstab("column1", "column2")
●
Frequent
Items in Columns: df.stat.freqItems(["column1",
"column2"])
●
Approximate
Quantile Calculation: df.approxQuantile("column", [0.25, 0.5, 0.75], relativeError)
● Histograms using Matplotlib: df.toPandas()["column"].hist()
● Box Plots using Seaborn: import seaborn as sns; sns.boxplot(x=df.toPandas()["column"])
●
Scatter
Plots using Matplotlib: df.toPandas().plot.scatter(x='col1',
y='col2')
●
Reading Data from HDFS: spark.read.text("hdfs://path/to/file")
●
Writing
Data to HDFS: df.write.save("hdfs://path/to/output")
●
Creating DataFrames from Hive Tables: spark.table("hive_table_name")
● Coalesce Partitions: df.coalesce(numPartitions)
● Reading Data in Chunks: spark.read.option("maxFilesPerTrigger", 1).csv("path/to/file.csv")
●
Optimizing
Data for Skewed Joins: df.repartition("skewed_column")
●
Handling
Data Skew in Joins: df1.join(df2.hint("broadcast"), "column")
● Registering UDF for SQL Queries: spark.udf.register("udf_name", lambda x: custom_function(x))
● Using SQL Functions in DataFrames: from pyspark.sql.functions import expr;
df.withColumn("new_column", expr("SQL_expression"))
● StandardScaler for Feature Scaling: from pyspark.ml.feature import StandardScaler; scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
● Building a Machine Learning Pipeline: from pyspark.ml import Pipeline; pipeline = Pipeline(stages=[assembler, scaler, ml_model])
● Train-Test Split: train, test = df.randomSplit([0.7, 0.3])
● Model Fitting and Predictions: model = pipeline.fit(train); predictions = model.transform(test)
● Cross-Validation for Model Tuning: from pyspark.ml.tuning import CrossValidator;
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid)
● Hyperparameter Tuning: from pyspark.ml.tuning import ParamGridBuilder; paramGrid = ParamGridBuilder().addGrid(model.param, [value1, value2]).build()
●
Running Graph Algorithms: results = g.pageRank(resetProbability=0.15,
maxIter=10)
●
Subgraphs
and Motif Finding: g.find("(a)-[e]->(b);
(b)-[e2]->(a)")
● Writing to a Stream: query = df.writeStream.outputMode("append").format("console").start()
● Window Operations on Streaming Data: df.groupBy(window(df["timestamp"],"1 hour")).count()
● Handling Late Data and Watermarks: df.withWatermark("timestamp", "2 hours")
● Triggering Streaming: df.writeStream.trigger(processingTime='1hour').start()
● Streaming Aggregations: df.groupBy("group_column").agg({"value": "sum"})
● Reading from Kafka: spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic").load()
●
Flattening
Nested Structures: df.selectExpr("struct_col.*")
●
Schema
Validation: df.schema == expected_schema
● Data Completeness Validation: df.count() == expected_count
● Column Value Range Validation: df.filter((df["column"] >= lower_bound) & (df["column"] <= upper_bound))
●
Writing
to Hive Tables: df.write.saveAsTable("hive_table")
●
Connecting to External Databases: df.write.jdbc(url, table, properties)
●
Using
Hadoop File System (HDFS): df.write.save("hdfs://path/to/save")
●
Caching Intermediate Data: df.cache()
●
Repartitioning for Parallel Processing: df.repartition(num_partitions)
●
Avoiding
Shuffle and Spill to Disk: df.coalesce(num_partitions)
● Quantile and Percentile Analysis: df.approxQuantile("column", [0.25, 0.5, 0.75], 0.0)
● Exploring Data with Spark SQL: spark.sql("SELECT * FROM df_table").show()
● Calculating Covariance and Correlation: df.stat.cov("col1", "col2"), df.stat.corr("col1", "col2")
●
Dealing
with ORC Files: df.write.orc("path"), spark.read.orc("path")
●
Handling
XML Data: (Using spark-xml library)
●
Dealing
with Binary Files: spark.read.format("binaryFile").load("path")
● Geospatial Joins and Calculations: (Using location-based joins and UDFs for distance calculations)
● Time Series Window Functions: from pyspark.sql.functions import window; df.groupBy(window("timestamp", "1 hour")).mean()
● Text Analysis with MLlib: from pyspark.ml.feature import Tokenizer;
tokenizer = Tokenizer(inputCol="text", outputCol="words")
● Model Evaluation and Metrics: from pyspark.ml.evaluation import BinaryClassificationEvaluator; evaluator = BinaryClassificationEvaluator()
●
Model
Persistence and Loading: model.save("path"), ModelType.load("path")
●
Vector
Operations for ML Features: from pyspark.ml.linalg import Vectors;
df.withColumn("vector_col", Vectors.dense("column"))
● Efficient Data Serialization: (Using Kryo serialization for performance)
● Optimizing Data Locality: (Ensuring data is close to computation resources)
● Error Handling and Recovery Strategies: (Implementing try-catch logic and checkpointing)
● GDPR Compliance and Data Anonymization: (Implementing data masking and anonymization)
● Complex Event Processing in Streams: (Using structured streaming for event pattern detection)
● Integrating with Cloud Storage Services: (Reading and writing data to cloud storage like S3, ADLS, GCS)